diff --git a/integration-tests/datasets/datetimes.csv b/integration-tests/datasets/datetimes.csv
new file mode 100644
index 00000000..ab03e3d8
--- /dev/null
+++ b/integration-tests/datasets/datetimes.csv
@@ -0,0 +1,2 @@
+2015-01-03Z,01-02-2015 11:30 Z
+2015-04-12Z,04-12-2015 04:25 Z
diff --git a/integration-tests/tests/test_frame_import_csv.py b/integration-tests/tests/test_frame_import_csv.py
index 6b06452e..b98a786f 100644
--- a/integration-tests/tests/test_frame_import_csv.py
+++ b/integration-tests/tests/test_frame_import_csv.py
@@ -174,4 +174,29 @@ def test_import_csv_with_duplicate_coluns(tc):
         # Try to create a frame from csv, using a schema that has duplicate column names
         tc.frame.import_csv(path, schema=schema, header=True, infer_schema=False)
     except Exception as e:
-        assert("schema has duplicate column names: ['numeric']" in str(e))
\ No newline at end of file
+        assert("schema has duplicate column names: ['numeric']" in str(e))
+
+def test_import_csv_datetime_format(tc):
+    path = "../datasets/datetimes.csv"
+
+    # Load with the date format that matches column a
+    f = tc.frame.import_csv(path, schema=[("a",dtypes.datetime),("b",str)], datetime_format="yyyy-MM-ddX")
+
+    expected = ["2015-01-03T00:00:00.000000Z","2015-04-12T00:00:00.000000Z"]
+    actual_data = f.take(f.count())
+
+    for row, expected_str in zip(actual_data, expected):
+        assert(isinstance(row[0], long))    # 'a' datetime column should be a long (number of ms since epoch)
+        assert(dtypes.ms_to_datetime_str(row[0]) == expected_str)
+        assert(isinstance(row[1], basestring))     # column 'b' should be a str
+
+    # Load with the date format that matches column b
+    f = tc.frame.import_csv(path, schema=[("a",str),("b",dtypes.datetime)], datetime_format="MM-dd-yyyy kk:mm X")
+
+    expected = ["2015-01-02T11:30:00.000000Z","2015-04-12T04:25:00.000000Z"]
+    actual_data = f.take(f.count())
+
+    for row, expected_str in zip(actual_data, expected):
+        assert(isinstance(row[0], basestring))     # column 'a' should be a str
+        assert(isinstance(row[1], long))    # column 'b' should be a long (number of ms since epoch)
+        assert(dtypes.ms_to_datetime_str(row[1]) == expected_str)
\ No newline at end of file
diff --git a/python/sparktk/dtypes.py b/python/sparktk/dtypes.py
index 946980ae..65964cff 100644
--- a/python/sparktk/dtypes.py
+++ b/python/sparktk/dtypes.py
@@ -219,7 +219,7 @@ def datetime_to_ms(date_time):
     if isinstance(date_time, datetime):
         ms = long(date_time.strftime("%s")) * 1000.0
         ms += date_time.microsecond // 1000
-        return ms
+        return long(ms)
     else:
         raise TypeError("Unable to calculate the number of milliseconds since epoch for type: %s" % type(date_time))
 
diff --git a/python/sparktk/frame/constructors/import_csv.py b/python/sparktk/frame/constructors/import_csv.py
index f43666eb..a4929236 100644
--- a/python/sparktk/frame/constructors/import_csv.py
+++ b/python/sparktk/frame/constructors/import_csv.py
@@ -22,7 +22,7 @@
 from datetime import datetime
 from sparktk.frame import schema as sparktk_schema
 
-def import_csv(path, delimiter=",", header=False, infer_schema=True, schema=None, tc=TkContext.implicit):
+def import_csv(path, delimiter=",", header=False, infer_schema=True, schema=None, datetime_format="yyyy-MM-dd'T'HH:mm:ss.SSSX", tc=TkContext.implicit):
     """
     Creates a frame with data from a csv file.
 
@@ -37,11 +37,13 @@ def import_csv(path, delimiter=",", header=False, infer_schema=True, schema=None
                    and not be included in the data.  The default value is false.
     :param infer_schema:(Optional[bool]) Boolean value indicating if the column types will be automatically inferred.
                        It requires one extra pass over the data and is false by default.
-    :param: schema: (Optional[List[tuple(str, type)]]) Optionally specify the schema for the dataset.  Number of
+    :param schema: (Optional[List[tuple(str, type)]]) Optionally specify the schema for the dataset.  Number of
                     columns specified in the schema must match the number of columns in the csv file provided.  If the
                     value from the csv file cannot be converted to the data type specified by the schema (for example,
                     if the csv file has a string, and the schema specifies an int), the value will show up as missing
                     (None) in the frame.
+    :param datetime_format: (str) String specifying how date/time columns are formatted, using the java.text.SimpleDateFormat
+                        specified at https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html
     :return: (Frame) Frame that contains the data from the csv file
 
     Examples
@@ -115,7 +117,7 @@ def import_csv(path, delimiter=",", header=False, infer_schema=True, schema=None
         "com.databricks.spark.csv.org.trustedanalytics.sparktk").options(
             delimiter=delimiter,
             header=header_str,
-            dateformat="yyyy-MM-dd'T'HH:mm:ss.SSSX",
+            dateformat=datetime_format,
             inferschema=infer_schema_str).load(path, schema=pyspark_schema)
 
     df_schema = []
diff --git a/sparktk-core/src/main/scala/org/trustedanalytics/sparktk/frame/internal/constructors/Import.scala b/sparktk-core/src/main/scala/org/trustedanalytics/sparktk/frame/internal/constructors/Import.scala
index 9f42ffc8..4d36a3cf 100644
--- a/sparktk-core/src/main/scala/org/trustedanalytics/sparktk/frame/internal/constructors/Import.scala
+++ b/sparktk-core/src/main/scala/org/trustedanalytics/sparktk/frame/internal/constructors/Import.scala
@@ -35,6 +35,8 @@ object Import {
    *               and not be included in the data.  The default value is false.
    * @param inferSchema Boolean value indicating if the column types will be automatically inferred.  It
    *                    requires one extra pass over the data and is false by default.
+   * @param dateTimeFormat String specifying how date/time columns are formatted, using the java.text.SimpleDateFormat
+   * specified at https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html
    * @return Frame with data from the csv file
    */
   def importCsv(sc: SparkContext,
@@ -42,7 +44,8 @@ object Import {
                 delimiter: String = ",",
                 header: Boolean = false,
                 inferSchema: Boolean = false,
-                schema: Option[Schema] = None): Frame = {
+                schema: Option[Schema] = None,
+                dateTimeFormat: String = "yyyy-MM-dd'T'HH:mm:ss.SSSX"): Frame = {
 
     // If a custom schema is provided there's no reason to infer the schema during the load
     val loadWithInferSchema = if (schema.isDefined) false else inferSchema
@@ -57,7 +60,7 @@ object Import {
       .option("header", headerStr)
       .option("inferSchema", inferSchemaStr)
       .option("delimiter", delimiter)
-      .option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSX")
+      .option("dateFormat", dateTimeFormat)
 
     if (!inferSchema && schema.isDefined) {
       dfr = dfr.schema(StructType(schema.get.columns.map(column =>