-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmandelbrot_dask.py
110 lines (80 loc) · 3.85 KB
/
mandelbrot_dask.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from matplotlib import pyplot as plt
import time
from dask.distributed import Client
import dask.array as da
import mandelbrot_vectorized
import numpy as np
threshold = 2
iterations = 100
def mandelbrot(c):
"""
Generate a Mandelbrot set using vectorized numpy operations.
:param c:
:return mandelbrot:
"""
# Generate a 2D array of ones, which is then converted to a boolean data type array
mandelbrot_mask = da.ones_like(c, dtype=bool)
# Generate a 2D array of zeros, which is then converted to a complex data type array
z = da.zeros_like(c, dtype=np.complex128)
# z is iteratively updated with the Mandelbrot formula: z = z^2 + c
divergence_time = da.zeros(c.shape, dtype=np.float64)
# Iterate over the complex plane
for i in range(iterations):
# Apply the Mandelbrot formula
z = z * z + c
# Check each element of the array for divergence
diverged = mandelbrot_mask & (np.abs(z) > threshold)
# Update the divergence time
divergence_time[diverged] = i
# Stops early if the absolute value of z is greater than the threshold (point diverged)
mandelbrot_mask[np.abs(z) > threshold] = False
return divergence_time
def dask_local_distribution(pRE, pIM, chunk_size, show_figure=True):
start_time = time.time()
# Generates linear spaces with pRE and pIM elements respectively around the plane of the Mandelbrot set
x_space = da.linspace(-2.3, 0.8, pRE, dtype=np.float64).reshape((1, pRE))
y_space = da.linspace(-1.2, 1.2, pIM, dtype=np.float64).reshape((pIM, 1))
# Generate a 2D array for each dimension of the complex plane
complete_space = da.rechunk(x_space + y_space * 1j, chunks=chunk_size)
solution_return = complete_space.map_blocks(mandelbrot).compute()
end_time = time.time()
print("Computation time:", round(end_time - start_time, 3), "s")
if show_figure:
plt.imshow(solution_return, cmap='magma')
plt.show()
def dask_distributed_execution(client, pRE, pIM, chunk_size, show_figure=False):
start_time = time.time()
# Generates linear spaces with pRE and pIM elements respectively around the plane of the Mandelbrot set
x_space = da.linspace(-2.3, 0.8, pRE, dtype=np.float16).reshape((1, pRE))
y_space = da.linspace(-1.2, 1.2, pIM, dtype=np.float16).reshape((pIM, 1))
# Generate a 2D array for each dimension of the complex plane
complete_space = da.rechunk(x_space + y_space * 1j, chunks=chunk_size)
solution_return = client.compute(complete_space.map_blocks(mandelbrot)).result()
end_time = time.time()
print("Chunk size:", chunk_size, "Computation time:", round(end_time-start_time, 3), "s")
if show_figure:
plt.imshow(solution_return, cmap='magma')
plt.show()
def main():
plot_size = 1000
fig_show = False
chunk_sizes = [(500, 500), (200, 200), (100, 100), (50, 50)]
# print("\nComparing performance of numpy and dask:")
# print("Numpy:")
# mandelbrot_vectorized.main(8000, 8000, show_figure=fig_show)
print("DASK local execution:")
dask_local_distribution(8000, 8000, (8000, 8000), show_figure=fig_show)
print("\nComparing local DASK with different chunk sizes:")
for s_chunk in chunk_sizes:
dask_local_distribution(plot_size, plot_size, s_chunk, show_figure=fig_show)
client_distribute = Client()
print("Creating a client for distributed DASK execution:", client_distribute)
print("Dashboard for statistics:",client_distribute.dashboard_link)
print("\nComparing distributed DASK with different chunk sizes:")
for s_chunk in chunk_sizes:
print("\nChunk size:", s_chunk)
dask_distributed_execution(client_distribute, plot_size, plot_size, s_chunk, show_figure=fig_show)
print("Results:", client_distribute.dashboard_link)
client_distribute.close()
if __name__ == "__main__":
main()