diff --git a/raco/algebra.py b/raco/algebra.py index 5f7aacd2..facb1865 100644 --- a/raco/algebra.py +++ b/raco/algebra.py @@ -1930,6 +1930,26 @@ def __repr__(self): return "{op}({pl!r})".format(op=self.opname(), pl=self.input) +class Stream(UnaryOperator): + + """Stream query results back to the client.""" + + def __init__(self, input=None): + UnaryOperator.__init__(self, input) + + def num_tuples(self): + return self.input.num_tuples() + + def partitioning(self): + return self.input.partitioning() + + def shortStr(self): + return "{op}".format(op=self.opname()) + + def __repr__(self): + return "{op}({pl!r})".format(op=self.opname(), pl=self.input) + + class Parallel(NaryOperator): """Execute a set of independent plans in parallel.""" diff --git a/raco/backends/myria/myria.py b/raco/backends/myria/myria.py index 53db7a55..4f61731f 100644 --- a/raco/backends/myria/myria.py +++ b/raco/backends/myria/myria.py @@ -389,6 +389,33 @@ def compileme(self, inputid): } +class MyriaStreamingSink(algebra.Stream, MyriaOperator): + + """A Myria StreamingSink""" + + def __init__(self, input): + algebra.UnaryOperator.__init__(self, input) + + def num_tuples(self): + return self.input.num_tuples() + + def partitioning(self): + # TODO: have a way to say it is on a specific worker + return RepresentationProperties() + + def shortStr(self): + return "%s" % self.opname() + + def compileme(self, inputid): + return { + "opType": "StreamingSink", + "argChild": inputid, + } + + def __repr__(self): + return "{op}({inp!r})".format(op=self.opname(), inp=self.input) + + class MyriaAppendTemp(algebra.AppendTemp, MyriaOperator): def compileme(self, inputid): @@ -623,6 +650,14 @@ def compileme(self, inputid): raise NotImplementedError('shouldn''t ever get here, should be turned into CP-CC pair') # noqa +class MyriaStream(algebra.Stream, MyriaOperator): + + """Represents a streaming sink operator""" + + def compileme(self, inputid): + raise NotImplementedError('shouldn''t ever get here, should be turned into CP-CC-StreamingSink triple') # noqa + + class MyriaDupElim(algebra.Distinct, MyriaOperator): """Represents duplicate elimination""" @@ -1837,6 +1872,18 @@ def fire(self, expr): return expr +class ExpandStreamSink(rules.Rule): + + def fire(self, expr): + if not isinstance(expr, MyriaStream): + return expr + + producer = MyriaCollectProducer(expr.input, None) + consumer = MyriaCollectConsumer(producer) + sink = MyriaStreamingSink(consumer) + return sink + + # 6. shuffle logics, hyper_cube_shuffle_logic is only used in HCAlgebra left_deep_tree_shuffle_logic = [ ShuffleBeforeSetop(), @@ -1873,6 +1920,7 @@ def fire(self, expr): rules.OneToOne(algebra.Difference, MyriaDifference), rules.OneToOne(algebra.OrderBy, MyriaInMemoryOrderBy), rules.OneToOne(algebra.Sink, MyriaSink), + rules.OneToOne(algebra.Stream, MyriaStream), rules.OneToOne(algebra.IDBController, MyriaIDBController), ] @@ -1902,7 +1950,7 @@ class MyriaAlgebra(Algebra): MyriaScanTemp, MyriaFileScan, MyriaEmptyRelation, - MyriaSingleton + MyriaSingleton, ) @@ -2187,6 +2235,7 @@ def opt_rules(self, **kwargs): [AddAppendTemp()], break_communication, idb_until_convergence(kwargs.get('async_ft')), + [ExpandStreamSink()], ] if kwargs.get('add_splits', True): @@ -2241,7 +2290,7 @@ def opt_rules(self, **kwargs): rules.push_select, distributed_group_by(MyriaGroupBy), [rules.DeDupBroadcastInputs()], - hyper_cube_shuffle_logic + hyper_cube_shuffle_logic, ] if kwargs.get('push_sql', False): @@ -2250,7 +2299,8 @@ def opt_rules(self, **kwargs): compile_grps_sequence = [ myriafy, [AddAppendTemp()], - break_communication + break_communication, + [ExpandStreamSink()], ] if kwargs.get('add_splits', True): @@ -2428,7 +2478,7 @@ def compile_to_json(raw_query, logical_plan, physical_plan, string and passed along unchanged.""" # Store/StoreTemp is a reasonable physical plan... for now. - root_ops = (algebra.Store, algebra.StoreTemp, algebra.Sink) + root_ops = (algebra.Store, algebra.StoreTemp, algebra.Sink, algebra.Stream) if isinstance(physical_plan, root_ops): physical_plan = algebra.Parallel([physical_plan]) diff --git a/raco/myrial/interpreter.py b/raco/myrial/interpreter.py index 97bccce4..6f7bac37 100644 --- a/raco/myrial/interpreter.py +++ b/raco/myrial/interpreter.py @@ -416,6 +416,15 @@ def sink(self, _id): uses_set = self.ep.get_and_clear_uses_set() self.cfg.add_op(op, None, uses_set) + def stream(self, _id): + alias_expr = ("ALIAS", _id) + child_op = self.ep.evaluate(alias_expr) + + op = raco.algebra.Stream(child_op) + + uses_set = self.ep.get_and_clear_uses_set() + self.cfg.add_op(op, None, uses_set) + def dump(self, _id): alias_expr = ("ALIAS", _id) child_op = self.ep.evaluate(alias_expr) diff --git a/raco/myrial/parser.py b/raco/myrial/parser.py index 9dd59420..a2d0cb40 100644 --- a/raco/myrial/parser.py +++ b/raco/myrial/parser.py @@ -564,6 +564,11 @@ def p_statement_sink(p): 'statement : SINK LPAREN unreserved_id RPAREN SEMI' # noqa p[0] = ('SINK', p[3]) + @staticmethod + def p_statement_stream(p): + 'statement : STREAM LPAREN unreserved_id RPAREN SEMI' # noqa + p[0] = ('STREAM', p[3]) + @staticmethod def p_statement_dump(p): 'statement : DUMP LPAREN unreserved_id RPAREN SEMI' diff --git a/raco/myrial/scanner.py b/raco/myrial/scanner.py index 643b5bf0..b27b7be4 100644 --- a/raco/myrial/scanner.py +++ b/raco/myrial/scanner.py @@ -19,7 +19,7 @@ builtins = ['EMPTY', 'WORKER_ID', 'SCAN', 'COUNTALL', 'COUNT', 'STORE', 'DIFF', 'CROSS', 'JOIN', 'UNION', 'UNIONALL', 'INTERSECT', - 'DISTINCT', 'LIMIT', 'SINK', 'SAMPLESCAN', 'LIKE'] + 'DISTINCT', 'LIMIT', 'SINK', 'STREAM', 'SAMPLESCAN', 'LIKE'] # identifiers with special meaning; case-insensitive