Skip to content

Commit

Permalink
Add 'date format' parameter for import_csv (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmsuehir authored and blbarker committed Oct 15, 2016
1 parent 521b163 commit c118349
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 7 deletions.
2 changes: 2 additions & 0 deletions integration-tests/datasets/datetimes.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
2015-01-03Z,01-02-2015 11:30 Z
2015-04-12Z,04-12-2015 04:25 Z
27 changes: 26 additions & 1 deletion integration-tests/tests/test_frame_import_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,29 @@ def test_import_csv_with_duplicate_coluns(tc):
# Try to create a frame from csv, using a schema that has duplicate column names
tc.frame.import_csv(path, schema=schema, header=True, infer_schema=False)
except Exception as e:
assert("schema has duplicate column names: ['numeric']" in str(e))
assert("schema has duplicate column names: ['numeric']" in str(e))

def test_import_csv_datetime_format(tc):
path = "../datasets/datetimes.csv"

# Load with the date format that matches column a
f = tc.frame.import_csv(path, schema=[("a",dtypes.datetime),("b",str)], datetime_format="yyyy-MM-ddX")

expected = ["2015-01-03T00:00:00.000000Z","2015-04-12T00:00:00.000000Z"]
actual_data = f.take(f.count())

for row, expected_str in zip(actual_data, expected):
assert(isinstance(row[0], long)) # 'a' datetime column should be a long (number of ms since epoch)
assert(dtypes.ms_to_datetime_str(row[0]) == expected_str)
assert(isinstance(row[1], basestring)) # column 'b' should be a str

# Load with the date format that matches column b
f = tc.frame.import_csv(path, schema=[("a",str),("b",dtypes.datetime)], datetime_format="MM-dd-yyyy kk:mm X")

expected = ["2015-01-02T11:30:00.000000Z","2015-04-12T04:25:00.000000Z"]
actual_data = f.take(f.count())

for row, expected_str in zip(actual_data, expected):
assert(isinstance(row[0], basestring)) # column 'a' should be a str
assert(isinstance(row[1], long)) # column 'b' should be a long (number of ms since epoch)
assert(dtypes.ms_to_datetime_str(row[1]) == expected_str)
2 changes: 1 addition & 1 deletion python/sparktk/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def datetime_to_ms(date_time):
if isinstance(date_time, datetime):
ms = long(date_time.strftime("%s")) * 1000.0
ms += date_time.microsecond // 1000
return ms
return long(ms)
else:
raise TypeError("Unable to calculate the number of milliseconds since epoch for type: %s" % type(date_time))

Expand Down
8 changes: 5 additions & 3 deletions python/sparktk/frame/constructors/import_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from datetime import datetime
from sparktk.frame import schema as sparktk_schema

def import_csv(path, delimiter=",", header=False, infer_schema=True, schema=None, tc=TkContext.implicit):
def import_csv(path, delimiter=",", header=False, infer_schema=True, schema=None, datetime_format="yyyy-MM-dd'T'HH:mm:ss.SSSX", tc=TkContext.implicit):
"""
Creates a frame with data from a csv file.
Expand All @@ -37,11 +37,13 @@ def import_csv(path, delimiter=",", header=False, infer_schema=True, schema=None
and not be included in the data. The default value is false.
:param infer_schema:(Optional[bool]) Boolean value indicating if the column types will be automatically inferred.
It requires one extra pass over the data and is false by default.
:param: schema: (Optional[List[tuple(str, type)]]) Optionally specify the schema for the dataset. Number of
:param schema: (Optional[List[tuple(str, type)]]) Optionally specify the schema for the dataset. Number of
columns specified in the schema must match the number of columns in the csv file provided. If the
value from the csv file cannot be converted to the data type specified by the schema (for example,
if the csv file has a string, and the schema specifies an int), the value will show up as missing
(None) in the frame.
:param datetime_format: (str) String specifying how date/time columns are formatted, using the java.text.SimpleDateFormat
specified at https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html
:return: (Frame) Frame that contains the data from the csv file
Examples
Expand Down Expand Up @@ -115,7 +117,7 @@ def import_csv(path, delimiter=",", header=False, infer_schema=True, schema=None
"com.databricks.spark.csv.org.trustedanalytics.sparktk").options(
delimiter=delimiter,
header=header_str,
dateformat="yyyy-MM-dd'T'HH:mm:ss.SSSX",
dateformat=datetime_format,
inferschema=infer_schema_str).load(path, schema=pyspark_schema)

df_schema = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ object Import {
* and not be included in the data. The default value is false.
* @param inferSchema Boolean value indicating if the column types will be automatically inferred. It
* requires one extra pass over the data and is false by default.
* @param dateTimeFormat String specifying how date/time columns are formatted, using the java.text.SimpleDateFormat
* specified at https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html
* @return Frame with data from the csv file
*/
def importCsv(sc: SparkContext,
path: String,
delimiter: String = ",",
header: Boolean = false,
inferSchema: Boolean = false,
schema: Option[Schema] = None): Frame = {
schema: Option[Schema] = None,
dateTimeFormat: String = "yyyy-MM-dd'T'HH:mm:ss.SSSX"): Frame = {

// If a custom schema is provided there's no reason to infer the schema during the load
val loadWithInferSchema = if (schema.isDefined) false else inferSchema
Expand All @@ -57,7 +60,7 @@ object Import {
.option("header", headerStr)
.option("inferSchema", inferSchemaStr)
.option("delimiter", delimiter)
.option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSX")
.option("dateFormat", dateTimeFormat)

if (!inferSchema && schema.isDefined) {
dfr = dfr.schema(StructType(schema.get.columns.map(column =>
Expand Down

0 comments on commit c118349

Please sign in to comment.