Skip to content

Commit

Permalink
first draft
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobin Baker committed Aug 28, 2017
1 parent c6fdede commit ad97764
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 5 deletions.
20 changes: 20 additions & 0 deletions raco/algebra.py
Original file line number Diff line number Diff line change
Expand Up @@ -1941,6 +1941,26 @@ def __repr__(self):
return "{op}({pl!r})".format(op=self.opname(), pl=self.input)


class Stream(UnaryOperator):

"""Stream query results back to the client."""

def __init__(self, input=None):
UnaryOperator.__init__(self, input)

def num_tuples(self):
return self.input.num_tuples()

def partitioning(self):
return self.input.partitioning()

def shortStr(self):
return "{op}".format(op=self.opname())

def __repr__(self):
return "{op}({pl!r})".format(op=self.opname(), pl=self.input)


class Parallel(NaryOperator):

"""Execute a set of independent plans in parallel."""
Expand Down
58 changes: 54 additions & 4 deletions raco/backends/myria/myria.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,33 @@ def compileme(self, inputid):
}


class MyriaStreamingSink(algebra.Stream, MyriaOperator):

"""A Myria StreamingSink"""

def __init__(self, input):
algebra.UnaryOperator.__init__(self, input)

def num_tuples(self):
return self.input.num_tuples()

def partitioning(self):
# TODO: have a way to say it is on a specific worker
return RepresentationProperties()

def shortStr(self):
return "%s" % self.opname()

def compileme(self, inputid):
return {
"opType": "StreamingSink",
"argChild": inputid,
}

def __repr__(self):
return "{op}({inp!r})".format(op=self.opname(), inp=self.input)


class MyriaAppendTemp(algebra.AppendTemp, MyriaOperator):

def compileme(self, inputid):
Expand Down Expand Up @@ -622,6 +649,14 @@ def compileme(self, inputid):
raise NotImplementedError('shouldn''t ever get here, should be turned into CP-CC pair') # noqa


class MyriaStream(algebra.Stream, MyriaOperator):

"""Represents a streaming sink operator"""

def compileme(self, inputid):
raise NotImplementedError('shouldn''t ever get here, should be turned into CP-CC-StreamingSink triple') # noqa


class MyriaDupElim(algebra.Distinct, MyriaOperator):

"""Represents duplicate elimination"""
Expand Down Expand Up @@ -1838,6 +1873,18 @@ def fire(self, expr):
return expr


class ExpandStreamSink(rules.Rule):

def fire(self, expr):
if not isinstance(expr, MyriaStream):
return expr

producer = MyriaCollectProducer(expr.input, None)
consumer = MyriaCollectConsumer(producer)
sink = MyriaStreamingSink(consumer)
return sink


# 6. shuffle logics, hyper_cube_shuffle_logic is only used in HCAlgebra
left_deep_tree_shuffle_logic = [
ShuffleBeforeSetop(),
Expand Down Expand Up @@ -1875,6 +1922,7 @@ def fire(self, expr):
rules.OneToOne(algebra.OrderBy, MyriaInMemoryOrderBy),
rules.OneToOne(algebra.Limit, MyriaLimit),
rules.OneToOne(algebra.Sink, MyriaSink),
rules.OneToOne(algebra.Stream, MyriaStream),
rules.OneToOne(algebra.IDBController, MyriaIDBController),
]

Expand Down Expand Up @@ -1904,7 +1952,7 @@ class MyriaAlgebra(Algebra):
MyriaScanTemp,
MyriaFileScan,
MyriaEmptyRelation,
MyriaSingleton
MyriaSingleton,
)


Expand Down Expand Up @@ -2189,6 +2237,7 @@ def opt_rules(self, **kwargs):
[AddAppendTemp()],
break_communication,
idb_until_convergence(kwargs.get('async_ft')),
[ExpandStreamSink()],
]

if kwargs.get('add_splits', True):
Expand Down Expand Up @@ -2243,7 +2292,7 @@ def opt_rules(self, **kwargs):
rules.push_select,
distributed_group_by(MyriaGroupBy),
[rules.DeDupBroadcastInputs()],
hyper_cube_shuffle_logic
hyper_cube_shuffle_logic,
]

if kwargs.get('push_sql', False):
Expand All @@ -2252,7 +2301,8 @@ def opt_rules(self, **kwargs):
compile_grps_sequence = [
myriafy,
[AddAppendTemp()],
break_communication
break_communication,
[ExpandStreamSink()],
]

if kwargs.get('add_splits', True):
Expand Down Expand Up @@ -2430,7 +2480,7 @@ def compile_to_json(raw_query, logical_plan, physical_plan,
string and passed along unchanged."""

# Store/StoreTemp is a reasonable physical plan... for now.
root_ops = (algebra.Store, algebra.StoreTemp, algebra.Sink)
root_ops = (algebra.Store, algebra.StoreTemp, algebra.Sink, algebra.Stream)
if isinstance(physical_plan, root_ops):
physical_plan = algebra.Parallel([physical_plan])

Expand Down
9 changes: 9 additions & 0 deletions raco/myrial/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,15 @@ def sink(self, _id):
uses_set = self.ep.get_and_clear_uses_set()
self.cfg.add_op(op, None, uses_set)

def stream(self, _id):
alias_expr = ("ALIAS", _id)
child_op = self.ep.evaluate(alias_expr)

op = raco.algebra.Stream(child_op)

uses_set = self.ep.get_and_clear_uses_set()
self.cfg.add_op(op, None, uses_set)

def dump(self, _id):
alias_expr = ("ALIAS", _id)
child_op = self.ep.evaluate(alias_expr)
Expand Down
5 changes: 5 additions & 0 deletions raco/myrial/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,11 @@ def p_statement_sink(p):
'statement : SINK LPAREN unreserved_id RPAREN SEMI' # noqa
p[0] = ('SINK', p[3])

@staticmethod
def p_statement_stream(p):
'statement : STREAM LPAREN unreserved_id RPAREN SEMI' # noqa
p[0] = ('STREAM', p[3])

@staticmethod
def p_statement_dump(p):
'statement : DUMP LPAREN unreserved_id RPAREN SEMI'
Expand Down
2 changes: 1 addition & 1 deletion raco/myrial/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

builtins = ['EMPTY', 'WORKER_ID', 'SCAN', 'COUNTALL', 'COUNT', 'STORE',
'DIFF', 'CROSS', 'JOIN', 'UNION', 'UNIONALL', 'INTERSECT',
'DISTINCT', 'LIMIT', 'SINK', 'SAMPLESCAN', 'LIKE']
'DISTINCT', 'LIMIT', 'SINK', 'STREAM', 'SAMPLESCAN', 'LIKE']


# identifiers with special meaning; case-insensitive
Expand Down

0 comments on commit ad97764

Please sign in to comment.