Skip to content

Commit

Permalink
Support injecting a info: dict to Sampler.broadcast_all() calls
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Jun 6, 2023
1 parent 23b46df commit 4fdac6a
Showing 1 changed file with 30 additions and 9 deletions.
39 changes: 30 additions & 9 deletions piker/data/_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ async def broadcast(
self,
period_s: float,
time_stamp: float | None = None,
info: dict | None = None,

) -> None:
'''
Expand Down Expand Up @@ -258,10 +259,14 @@ async def broadcast(
try:
for stream in (subs - sent):
try:
await stream.send({
msg = {
'index': time_stamp or last_ts,
'period': period_s,
})
}
if info:
msg.update(info)

await stream.send(msg)
sent.add(stream)

except (
Expand All @@ -287,9 +292,15 @@ async def broadcast(
)

@classmethod
async def broadcast_all(self) -> None:
async def broadcast_all(
self,
info: dict | None = None,
) -> None:
for period_s in self.subscribers:
await self.broadcast(period_s)
await self.broadcast(
period_s,
info=info,
)


@tractor.context
Expand Down Expand Up @@ -359,8 +370,10 @@ async def register_with_sampler(

# except broadcast requests from the subscriber
async for msg in stream:
if msg == 'broadcast_all':
await Sampler.broadcast_all()
if 'broadcast_all' in msg:
await Sampler.broadcast_all(
info=msg['broadcast_all'],
)
finally:
if (
sub_for_broadcasts
Expand Down Expand Up @@ -468,6 +481,8 @@ async def open_sample_stream(
cache_key: str | None = None,
allow_new_sampler: bool = True,

ensure_is_active: bool = False,

) -> AsyncIterator[dict[str, float]]:
'''
Subscribe to OHLC sampling "step" events: when the time aggregation
Expand Down Expand Up @@ -510,12 +525,18 @@ async def open_sample_stream(
},
) as (ctx, first)
):
assert len(first) > 1
if ensure_is_active:
assert len(first) > 1

async with (
ctx.open_stream() as istream,

# TODO: we don't need this task-bcasting right?
# istream.subscribe() as istream,
# TODO: we DO need this task-bcasting so that
# for eg. the history chart update loop eventually
# receceives all backfilling event msgs such that
# the underlying graphics format arrays are
# re-allocated until all history is loaded!
istream.subscribe() as istream,
):
yield istream

Expand Down

0 comments on commit 4fdac6a

Please sign in to comment.