-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfreq_item_sets.py
94 lines (78 loc) · 3.09 KB
/
freq_item_sets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import apache_beam as beam
from beamio.bq import makeBigQuerySink
# normalizes an order stored in an array
def normalize(order):
for p in order[0]:
yield (p, 1)
# trim orders to items in a specified list
def trimOrders(order, items, min_length = 0):
order_trimed = [o for o in order if o in items]
if len(order_trimed) >= min_length:
yield (order_trimed)
# create all combinations
def extractCombos(order, size):
import itertools
subsets = itertools.combinations(order, size)
for s in subsets:
yield (s, 1)
# builds and runs pipeline
def run(config):
# initialize pipeline
runner = "DataflowRunner"
#runner = "DirectRunner"
pipeline = beam.Pipeline(runner = runner, argv = [
"--job_name", "instacart-freq-item-sets",
"--project", config['project'],
"--staging_location", config['staging_location'],
"--temp_location", config['temp_location']
])
# get list of orders
query = """
SELECT order_id, ARRAY_AGG(product_id ORDER BY product_id) AS products
FROM instacart.order_products__prior AS orders
GROUP BY 1
"""
orders = (pipeline
| 'GetOrders' >> beam.io.Read(beam.io.BigQuerySource(query = query, use_standard_sql = True))
| 'CleanOrders' >> beam.Map(lambda x: x['products']))
# extract length 1 itemsets
freq1 = (orders
| 'Get1' >> beam.FlatMap(extractCombos, 1)
| 'Count1' >> beam.CombinePerKey(sum)
| 'Filter1' >> beam.Filter(lambda x: x[1] >= config['supp_cutoff']))
freq1_items = (freq1
| 'SelectFreq1' >> beam.Map(lambda x: x[0][0]))
# extract length 2 itemsets
freq2 = (orders
| 'Trim2' >> beam.FlatMap(trimOrders, beam.pvalue.AsList(freq1_items), 2)
| 'Get2' >> beam.FlatMap(extractCombos, 2)
| 'Count2' >> beam.CombinePerKey(sum)
| 'Filter2' >> beam.Filter(lambda x: x[1] >= config['supp_cutoff']))
freq2_items = (freq2
| 'NormalizeFreq2' >> beam.FlatMap(normalize)
| 'CountFreq2' >> beam.CombinePerKey(sum)
| 'FilterFreq2' >> beam.Filter(lambda x: x[1] >= 2)
| 'SelectFreq2' >> beam.Map(lambda x: x[0]))
# extract length 3 itemsets
freq3 = (orders
| 'Trim3' >> beam.FlatMap(trimOrders, beam.pvalue.AsList(freq2_items), 3)
| 'Get3' >> beam.FlatMap(extractCombos, 3)
| 'Count3' >> beam.CombinePerKey(sum)
| 'Filter3' >> beam.Filter(lambda x: x[1] >= config['supp_cutoff']))
# concatenate all item sets and export to BigQuery
schema = {
'products': {'name': 'products', 'type': 'integer', 'mode': 'repeated'},
'frequency': {'name': 'frequency', 'type': 'integer', 'mode': 'required'}
}
all_freq = ((freq1, freq2, freq3)
| 'Flatten' >> beam.Flatten()
| 'ExportPrep' >> beam.Map(lambda x: {'products' : x[0], 'frequency' : x[1]})
| 'Export' >> beam.io.Write(makeBigQuerySink("instacart.freq_item_sets", schema))
)
# run
pipeline.run()
if __name__ == '__main__':
import yaml
with open("config.yaml", "r") as f:
config = yaml.load(f)
run(config)