-
Notifications
You must be signed in to change notification settings - Fork 7
/
FirstRate 1min data ETL.py
74 lines (53 loc) · 2.47 KB
/
FirstRate 1min data ETL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# Databricks notebook source
#Mount an Azure Blob storage container
dbutils.fs.mount(
source = "wasbs://finance@finstorage6ef5xpkr7mo3s.blob.core.windows.net",
mount_point = "/mnt/finance",
extra_configs = {"fs.azure.account.key.finstorage6ef5xpkr7mo3s.blob.core.windows.net":"n1cT5j8fFP+qHHI6ve/K2rWAIT/xf/yrTA19WmMZSneFYKYvHt3ux2KRcvIfqZ365meXDXzAOqMX+AStJdrpEA=="})
# COMMAND ----------
#Unmount a mount point
#dbutils.fs.unmount("/mnt/finance")
# COMMAND ----------
# MAGIC %sql
# MAGIC CREATE DATABASE IF NOT EXISTS deltabase
# COMMAND ----------
# MAGIC %sql
# MAGIC Use deltabase
# COMMAND ----------
# MAGIC %sql
# MAGIC --Enable Auto Optimization
# MAGIC set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
# MAGIC set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;
# COMMAND ----------
import os
import numpy as np
import pandas as pd
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import * #import avg, col, udf
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql.types import *
import json
#LIST, RENAME, AND SAVE ALL FILES AS DELTA LAKE AUTOMATICALLY
path_1min = '/dbfs/mnt/finance/FirstRate1min'
filename_lists_1min = os.listdir(path_1min)
df_1min_ = {}
delta_1min ={}
for filename_1min in os.listdir(path_1min):
#split file name
rawname_1min = filename_1min.split('_')[0]
name_1min = rawname_1min.split('-')[0]
#create clolumn header names
temp_1min = StructType([StructField(name_1min+"_dateTime", StringType(), True),StructField(name_1min+"_adjOpen", FloatType(), True),StructField(name_1min+"_adjHigh", FloatType(), True),StructField(name_1min+"_adjLow", FloatType(), True),StructField(name_1min+"_adjClose", FloatType(), True),StructField(name_1min+"_adjVolume", IntegerType(), True)])
#list and create csv dataframes
temp_df_1min = spark.read.format("csv").option("header", "false").schema(temp_1min).load("/mnt/finance/FirstRate1min/"+filename_1min).withColumn("Ticker", lit(name_1min))
#name each dataframes
df_1min_[name_1min] = temp_df_1min
#name each table
table_name_1min = name_1min+'_1min_delta'
print(table_name_1min)
#create delta lake for each dataframes
df_1min_[name_1min].write.format("delta").mode("overwrite").option("overwriteSchema","True").saveAsTable(table_name_1min)