Skip to content

Commit

Permalink
Merge pull request #6 from sharedstreets/dev
Browse files Browse the repository at this point in the history
1.1 merge
  • Loading branch information
kpwebb authored Aug 3, 2018
2 parents 175f20f + f8f342f commit 436e956
Show file tree
Hide file tree
Showing 18 changed files with 537 additions and 154 deletions.
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group 'io.sharedstreets'
version '1.0'
version '1.1'

buildscript {
repositories { jcenter()
Expand All @@ -22,7 +22,7 @@ repositories {
mavenCentral()
}

def flinkversion = "1.4.1"
def flinkversion = "1.4.2"
def protobufversion = "3.4.0"

dependencies {
Expand All @@ -37,8 +37,8 @@ dependencies {
compile "com.jsoniter:jsoniter:0.9.15"
compile "javassist:javassist:3.12.1.GA"
compile "net.sf.geographiclib:GeographicLib-Java:1.49"
compile 'org.slf4j:slf4j-api:1.7.10'
compile 'org.slf4j:slf4j-log4j12:1.7.10'
compile 'org.slf4j:slf4j-api:1.7.7'
compile 'org.slf4j:slf4j-log4j12:1.7.7'
compile 'commons-cli:commons-cli:1.4'
compile 'org.apache.commons:commons-lang3:3.7'
compile 'com.github.davidmoten:rtree:0.8-RC10'
Expand Down
52 changes: 48 additions & 4 deletions ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.sharedstreets.matcher.ingest;

import io.sharedstreets.matcher.ingest.input.CsvEventExtractor;
import io.sharedstreets.matcher.ingest.input.DcfhvEventExtractor;
import io.sharedstreets.matcher.ingest.input.JsonEventExtractor;
import io.sharedstreets.matcher.ingest.input.gpx.GpxInputFormat;
import io.sharedstreets.matcher.ingest.model.Ingest;
Expand Down Expand Up @@ -50,14 +51,33 @@ public static void main(String[] args) throws Exception {
.withArgName("OUTPUT-DIR")
.create() );

options.addOption( OptionBuilder.withLongOpt( "tileUrl" )
.withDescription( "url for map tiles " )
.hasArg()
.withArgName("TILE-URL")
.create() );

options.addOption( OptionBuilder.withLongOpt( "tileLevel" )
.withDescription( "" )
.hasArg()
.withArgName("TILE-LEVEL")
.create() );

options.addOption("speeds", "track GPS speed when available");
options.addOption("verbose", "verbose error output" );

String inputPath = "";

String outputPath = "";

String inputType = "";

String tileLevel = "6";

boolean verbose = false;

String tileUrl = "https://tiles.sharedstreets.io/osm/planet-180312/";

boolean gpsSpeeds = false;

try {
Expand All @@ -81,6 +101,20 @@ public static void main(String[] args) throws Exception {
gpsSpeeds = true;
}

if( line.hasOption( "verbose" ) ) {
verbose = true;
}

if( line.hasOption( "tileUrl" ) ) {
// print the value of block-size
tileUrl = line.getOptionValue( "tileUrl" ).trim();;
}

if( line.hasOption( "tileLevel" ) ) {
// print the value of block-size
tileLevel = line.getOptionValue( "tileLevel" ).trim();;
}

if( line.hasOption( "type" ) ) {
// print the value of block-size
inputType = line.getOptionValue( "type" ).trim().toUpperCase();
Expand All @@ -93,6 +127,8 @@ else if(fileParts[fileParts.length-1].toLowerCase().equals("json"))
inputType = "JSON";
else if(fileParts[fileParts.length-1].toLowerCase().equals("gpx"))
inputType = "GPX";
else if(fileParts[fileParts.length-1].toLowerCase().equals("dcfhv"))
inputType = "DCFHV";
}

}
Expand All @@ -105,6 +141,9 @@ else if(fileParts[fileParts.length-1].toLowerCase().equals("gpx"))
}

final String finalInputType = inputType;
final String finalTileUrl = tileUrl;
final String finalTileLevel = tileLevel;
final boolean finalVerbose = verbose;

// let's go...

Expand All @@ -124,7 +163,7 @@ else if(fileParts[fileParts.length-1].toLowerCase().equals("gpx"))
DataSet<InputEvent> inputEvents = null;

if (finalInputType.equals("GPX")) {
inputEvents = env.createInput(new GpxInputFormat(inputPath, gpsSpeeds));
inputEvents = env.createInput(new GpxInputFormat(inputPath, gpsSpeeds, finalVerbose));
}
else {
DataSet<String> inputStream = env.readTextFile(inputPath);
Expand All @@ -136,13 +175,18 @@ else if(fileParts[fileParts.length-1].toLowerCase().equals("gpx"))
public void flatMap(String value, Collector<InputEvent> out) throws Exception {

if (finalInputType.equals("CSV")) {
List<InputEvent> inputEvents = CsvEventExtractor.extractEvents(value);
List<InputEvent> inputEvents = CsvEventExtractor.extractEvents(value, finalVerbose);

for (InputEvent inputEvent : inputEvents)
out.collect(inputEvent);

} else if (finalInputType.equals("JSON")) {
List<InputEvent> inputEvents = JsonEventExtractor.extractEvents(value);
List<InputEvent> inputEvents = JsonEventExtractor.extractEvents(value, finalVerbose);

for (InputEvent inputEvent : inputEvents)
out.collect(inputEvent);
} else if (finalInputType.equals("DCFHV")) {
List<InputEvent> inputEvents = DcfhvEventExtractor.extractEvents(value, finalVerbose);

for (InputEvent inputEvent : inputEvents)
out.collect(inputEvent);
Expand All @@ -166,7 +210,7 @@ public TileId map(InputEvent value) throws Exception {
@Override
public String map(TileId value) throws Exception {
// TODO allow selection of tile build -- defaulting to current build for moment
return "https://tiles.sharedstreets.io/planet-180312/" + value.toString() + ".geometry.pbf";
return finalTileUrl + value.toString() + ".geometry." + finalTileLevel + ".pbf";
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package io.sharedstreets.matcher.ingest.input;


import io.sharedstreets.matcher.ingest.Ingester;
import io.sharedstreets.matcher.ingest.model.Point;
import io.sharedstreets.matcher.ingest.model.InputEvent;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.nio.ch.IOStatus;

import java.io.IOException;
Expand All @@ -12,7 +15,9 @@
import java.util.List;

public class CsvEventExtractor {
public static List<InputEvent> extractEvents(String value) throws IOException {

static Logger logger = LoggerFactory.getLogger(CsvEventExtractor.class);
public static List<InputEvent> extractEvents(String value, boolean verbose) throws IOException {

ArrayList list = new ArrayList<InputEvent>();

Expand All @@ -28,17 +33,20 @@ public static List<InputEvent> extractEvents(String value) throws IOException {

try {
event.time = Long.parseLong(splitValue[1]);
} catch (Exception e) {
} catch (Exception e1) {
// not a number fallback to date string
}
if(event.time == null) {
try {
event.time = new DateTime(splitValue[1]).getMillis();
} catch (Exception e) {
// not a date
throw new IOException("Unable to parse data value");
if(event.time == null) {
try {
event.time = new DateTime(splitValue[1]).getMillis();
} catch (Exception e2) {
// not a date
if(verbose)
logger.error("Unable to parse date value" + splitValue[1]);
throw new IOException("Unable to parse date value" + splitValue[1]);
}
}
}

event.point.lat = Double.parseDouble(splitValue[2]);
event.point.lon = Double.parseDouble(splitValue[3]);

Expand All @@ -56,6 +64,11 @@ public static List<InputEvent> extractEvents(String value) throws IOException {
// single event per line
list.add(event);
}
else {
if(verbose) {
logger.error("Unable to parse line, only " + splitValue.length + " fields: " + value);
}
}



Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package io.sharedstreets.matcher.ingest.input;

import com.jsoniter.JsonIterator;
import com.jsoniter.ValueType;
import io.sharedstreets.matcher.ingest.model.InputEvent;
import io.sharedstreets.matcher.ingest.model.Point;
import org.apache.commons.lang3.time.FastDateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.ParseException;
import java.util.*;

public class DcfhvEventExtractor {


static Logger logger = LoggerFactory.getLogger(DcfhvEventExtractor.class);

static FastDateFormat formatter = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", TimeZone.getTimeZone("UTC"));

static InputEvent parseJsonLocation(String value, boolean verbose) {

try {
InputEvent event = new InputEvent();
event.point = new Point();

JsonIterator iter = JsonIterator.parse(value);

for (String field = iter.readObject(); field != null; field = iter.readObject()) {

if(field.toLowerCase().equals("longitude")) {
if (iter.whatIsNext() == ValueType.NULL)
return null;
else
event.point.lon = iter.readDouble();
}
else if(field.toLowerCase().equals("latitude")) {
if(iter.whatIsNext() == ValueType.NULL)
return null;
else
event.point.lat = iter.readDouble();
}

else
iter.read();
}

return event;
} catch (Exception e) {
if(verbose)
logger.error("Unable to parse line: " + value);
return null;
}

}

public static List<InputEvent> extractEvents(String value, boolean verbose) throws IOException, ParseException {

// replace Excel quote escaping
String cleanString = value.replace("\"{", "{").replace("}\"", "}").replace("\"\"", "\"");

// split tab delimited lines
String[] parts = cleanString.split("\t");

// line parts key

// 0 = "ID"
// 1 = "AssignmentID"
// 2 = "DateCreated"
// 3 = "Milage"
// 4 = "PassengerNum"
// 5 = "StartTime"
// 6 = "DestinationID"
// 7 = "OriginID"
// 8 = "EndTime"
// 9 = "TripFareAmount"
// 10 = "ExtraFareAmount"
// 11 = "GratuityAmount"
// 12 = "SurchargeAmount"
// 13 = "TollAmount"
// 14 = "TotalAmount"
// 15 = "ClientID"
// 16 = "VehicleID"
// 17 = "DriverID"
// 18 = "Origin"
// 19 = "Destination"
// 20 = "FacecardID"
// 21 = "PVIN"
// 22 = "Status"
// 23 = "DiscountAmount"
// 24 = "FareType"
// 25 = "PaymentType"

ArrayList list = new ArrayList<InputEvent>();

if(parts.length > 21 && !parts[0].equals("ID")) {

InputEvent pickupEvent = parseJsonLocation(parts[18], verbose);

if(pickupEvent != null) {
Date date = formatter.parse(parts[5].split("\\.")[0]);

if (date.getDate() == 10 || date.getDate() == 11) {
pickupEvent.eventData = new HashMap<>();
pickupEvent.vehicleId = parts[21];
pickupEvent.time = date.getTime();
pickupEvent.eventData.put("pickup", null);
list.add(pickupEvent);
}
else {
if(verbose)
logger.error("Unable to parse line: " + value);

}
}

InputEvent dropoffEvent = parseJsonLocation(parts[19], verbose);

if(dropoffEvent != null) {
Date date = formatter.parse(parts[8].split("\\.")[0]);

if (date.getDate() == 10 || date.getDate() == 11) {
dropoffEvent.eventData = new HashMap<>();
dropoffEvent.vehicleId = parts[21];
dropoffEvent.time = date.getTime();
dropoffEvent.eventData.put("dropoff", null);
list.add(dropoffEvent);
}
else {
if(verbose)
logger.error("Unable to parse line: " + value);

}

}
}
else {
if(verbose)
logger.error("Unable to parse line: " + value);

}


return list;
}
}
Loading

0 comments on commit 436e956

Please sign in to comment.