diff --git a/src/containers/docker/dagster/alternative2/scrape.py b/src/containers/docker/dagster/alternative2/scrape.py index e485319..f87fe89 100644 --- a/src/containers/docker/dagster/alternative2/scrape.py +++ b/src/containers/docker/dagster/alternative2/scrape.py @@ -34,21 +34,31 @@ def a1(): return [1,2] @op -def a2(): - return [3] +def a2( + context: OpExecutionContext, + inp +): + context.add_output_metadata({"abc": 11}) + return [each+1 for each in inp] + +@graph_asset +def a1a2(): + return a2(a1()) + +job_graph = define_asset_job("a1a2_job", selection=[a1a2]) @run_status_sensor( run_status=DagsterRunStatus.SUCCESS, - monitored_jobs=[job_dummy], + monitored_jobs=[job_dummy, job_graph], request_job=job_dummy2, minimum_interval_seconds=5, default_status=DefaultSensorStatus.RUNNING ) def run_job_2_sensor(context: SensorEvaluationContext): inst = context.instance - mat = inst.get_latest_materialization_event(AssetKey(["dummy_asset"])).asset_materialization - # inst.get_job_snapshot() + assetkey_ = list(context.dagster_run.asset_selection)[0] + mat = inst.get_latest_materialization_event(assetkey_).asset_materialization context.log.info(mat) run_config = { "ops": { @@ -57,4 +67,4 @@ def run_job_2_sensor(context: SensorEvaluationContext): } return RunRequest(run_key=f"{datetime.now().timestamp()}", run_config=run_config) -defs = Definitions(jobs=[job_dummy, job_dummy2], assets=[dummy_asset, dummy_asset2], sensors=[run_job_2_sensor]) +defs = Definitions(jobs=[job_dummy, job_dummy2, job_graph], assets=[dummy_asset, dummy_asset2, a1a2], sensors=[run_job_2_sensor])