-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathreplace.py
64 lines (47 loc) · 1.58 KB
/
replace.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""A demonstration of how to use dynamic workflows with replace."""
from jobflow import Flow, job, run_locally
@job
def read_websites():
"""Read list of websites from a file."""
from pathlib import Path
file_contents = Path("websites.txt").read_text()
return file_contents.split()
@job
def time_website(website: str):
"""Time how long it takes to load a website."""
import urllib.request
from time import perf_counter
stream = urllib.request.urlopen(website)
start_time = perf_counter()
stream.read()
end_time = perf_counter()
stream.close()
return end_time - start_time
@job
def start_timing_jobs(websites: list[str]):
"""Time a list of websites."""
from jobflow.core.job import Response
jobs = []
for website in websites:
time_job = time_website(website)
time_job.name = f"time {website}"
jobs.append(time_job)
output = [j.output for j in jobs]
return Response(replace=Flow(jobs, output))
@job
def sum_times(times: list[float]):
"""Sum a list of loading times."""
return sum(times)
# create a flow that will:
# 1. load a list of websites from a file
# 2. generate one new job for each website to time the website loading
# 3. sum all the times together
read_websites_job = read_websites()
timings_job = start_timing_jobs(read_websites_job.output)
sum_job = sum_times(timings_job.output)
flow = Flow([read_websites_job, timings_job, sum_job])
# draw the flow graph
flow.draw_graph().show()
# run the flow, "responses" contains the output of all jobs
responses = run_locally(flow)
print(responses)