We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
The following flow may
import os import time os.environ['PREFECT__FLOWS__CHECKPOINTING'] = 'true' from funnel import CacheStore from funnel.prefect.result import FunnelResult from prefect import task, Flow from prefect.executors import DaskExecutor import xarray as xr executor = DaskExecutor("tcp://127.0.0.1:41321") cache_store = CacheStore('/home/jovyan/shared-readwrite/prefect-sprint/funnel-cache-store-8') @task(result=FunnelResult(cache_store, serializer='xarray.zarr'), target='tranformed_data') def get_transformed_data(): time.sleep(5) ds = xr.tutorial.open_dataset('air_temperature').chunk({'lat': 50, 'lon': 50}) ds['air'] = ds['air'] - 273.13 ds['air'].attrs['units'] = 'degC' # compute a dask array to confirm that dask tasks are executed on the executor's client dask.array.zeros((10000, 10000), chunks=(100, 100)).mean().compute() return ds @task(result=FunnelResult(cache_store, serializer='xarray.zarr'), target='mean_data-2') def calc_ds_mean(ds): time.sleep(4) ds_mean = ds.mean('time').load() return ds_mean @task def print_ds(ds): time.sleep(2) print(ds.air) with Flow('sprint-flow', executor=executor) as flow: ds = get_transformed_data() ds_mean = calc_ds_mean(ds) print_ds(ds_mean) flow.run()
The main TODO item here is to replace the local filesystem usage in CacheStore with an azure blob file path.
CacheStore
Once that is sorted, @norlandrhagen can probably expand on this to show how it can be deployed on Prefect-Cloud.
The text was updated successfully, but these errors were encountered:
No branches or pull requests
The following flow may
The main TODO item here is to replace the local filesystem usage in
CacheStore
with an azure blob file path.Once that is sorted, @norlandrhagen can probably expand on this to show how it can be deployed on Prefect-Cloud.
The text was updated successfully, but these errors were encountered: