Skip to content

Commit

Permalink
Bah, fix nursery indents for maybe tsdb backloading
Browse files Browse the repository at this point in the history
Can't ref `dt_eps` and `tsdb_entry` if they don't exist.. like for 1s
sampling from `binance` (which dne). So make sure to add better logic
guard and only open the finaly backload nursery if we actually need to
fill the gap between latest history and where tsdb history ends.

TO CHERRY #486
  • Loading branch information
goodboy committed Dec 19, 2023
1 parent f7cc43e commit 659649e
Showing 1 changed file with 37 additions and 30 deletions.
67 changes: 37 additions & 30 deletions piker/tsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,44 +891,49 @@ async def tsdb_backfill(
config,
)

# tell parent task to continue
# TODO: really we'd want this the other way with the
# tsdb load happening asap and the since the latest
# frame query will normally be the main source of
# latency?
task_status.started()

tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
timeframe,
)

# tell parent task to continue
# TODO: really we'd want this the other way with the
# tsdb load happening asap and the since the latest
# frame query will normally be the main source of
# latency?
task_status.started()

# NOTE: iabs to start backfilling from, reverse chronological,
# ONLY AFTER the first history frame has been pushed to
# mem!
backfill_gap_from_shm_index: int = shm._first.value + 1

(
mr_start_dt,
mr_end_dt,
) = dt_eps

async with trio.open_nursery() as tn:
# Prepend any tsdb history to the shm buffer which should
# now be full of the most recent history pulled from the
# backend's last frame.
if (
dt_eps
and tsdb_entry
):
# unpack both the latest (gap) backfilled frame dts
(
mr_start_dt,
mr_end_dt,
) = dt_eps

# AND the tsdb history from (offline) storage)
(
tsdb_history,
first_tsdb_dt,
last_tsdb_dt,
) = tsdb_entry

# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
async with trio.open_nursery() as tn:

# Prepend any tsdb history to the shm buffer which should
# now be full of the most recent history pulled from the
# backend's last frame.
if tsdb_entry:
(
tsdb_history,
first_tsdb_dt,
last_tsdb_dt,
) = tsdb_entry

# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
Expand All @@ -941,8 +946,10 @@ async def tsdb_backfill(

backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,

sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,

storage=storage,
write_tsdb=True,
)
Expand Down Expand Up @@ -1032,11 +1039,11 @@ async def tsdb_backfill(
mkt=mkt,
))

# 2nd nursery END

# TODO: who would want to?
await nulls_detected.wait()

await bf_done.wait()
# TODO: who would want to?
await nulls_detected.wait()
await bf_done.wait()
# TODO: maybe start history anal and load missing "history
# gaps" via backend..

Expand Down

0 comments on commit 659649e

Please sign in to comment.