-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfrequent_pattern_tasks.py
97 lines (73 loc) · 2.29 KB
/
frequent_pattern_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from pyTasks.tasks import Task, Parameter
from pyTasks.utils import containerHash
from .graph_tasks import GraphPruningTask
from .mongo_tasks import MongoResourceTarget
from sklearn.model_selection import KFold
import numpy as np
from bson.code import Code
def non_filter(label):
return False
def identity(obj):
return obj
class MongoGraphNodesTask(Task):
collection = Parameter("graph_nodes")
def __init__(self, graph, D):
self.graph = graph
self.D = D
def require(self):
return GraphPruningTask(self.graph, self.D)
def __taskid__(self):
return "GraphNodesTask_%s_%d_%d" % (self.graph, self.D)
def output(self):
return MongoResourceTarget(
self.collection.value, '_id', self.graph
)
def run(self):
with self.input()[0] as i:
G = i.query()
nodes = set([])
for node in G:
label = G.node[node]['label']
nodes.add(label)
with self.output() as o:
coll = o.collection
coll.insert_many([
{'graph_id': self.graph,
'node': n}
for n in nodes
])
class MongoFrequencyTask(Task):
collection = Parameter("node_frequency")
def __init__(self, graphs, it, D):
self.graphs = graphs
self.it = it
self.D = D
def require(self):
return [
MongoGraphNodesTask(g, self.D)
for g in self.graphs
]
def output(self):
return MongoResourceTarget(
self.collection.value, '_id', 'frequency_%d' % self.it
)
def run(self):
with self.input()[0] as i:
coll = i.collection
map = Code("""
function(){
emit(this.node, 1);
}
""")
reduce = Code("""
function(key, values){
var total = 0;
for(var i = 0; i < values.length; i++){
total += values[i];
}
return total;
}
""")
reduce = coll.map_reduce(map, reduce, self.collection.value)
all = len(self.graphs)
reduce.update({}, {'$mul': {'value': 1/all}})