Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zcemycl committed Jun 26, 2024
1 parent 0e65495 commit 23619c3
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions src/containers/docker/dagster/alternative2/scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,31 @@ def a1():
return [1,2]

@op
def a2():
return [3]
def a2(
context: OpExecutionContext,
inp
):
context.add_output_metadata({"abc": 11})
return [each+1 for each in inp]

@graph_asset
def a1a2():
return a2(a1())

job_graph = define_asset_job("a1a2_job", selection=[a1a2])


@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
monitored_jobs=[job_dummy],
monitored_jobs=[job_dummy, job_graph],
request_job=job_dummy2,
minimum_interval_seconds=5,
default_status=DefaultSensorStatus.RUNNING
)
def run_job_2_sensor(context: SensorEvaluationContext):
inst = context.instance
mat = inst.get_latest_materialization_event(AssetKey(["dummy_asset"])).asset_materialization
# inst.get_job_snapshot()
assetkey_ = list(context.dagster_run.asset_selection)[0]
mat = inst.get_latest_materialization_event(assetkey_).asset_materialization
context.log.info(mat)
run_config = {
"ops": {
Expand All @@ -57,4 +67,4 @@ def run_job_2_sensor(context: SensorEvaluationContext):
}
return RunRequest(run_key=f"{datetime.now().timestamp()}", run_config=run_config)

defs = Definitions(jobs=[job_dummy, job_dummy2], assets=[dummy_asset, dummy_asset2], sensors=[run_job_2_sensor])
defs = Definitions(jobs=[job_dummy, job_dummy2, job_graph], assets=[dummy_asset, dummy_asset2, a1a2], sensors=[run_job_2_sensor])

0 comments on commit 23619c3

Please sign in to comment.