-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cooler not compatible with multiprocessing #441
Comments
Can you provide more details on your OS, Python version? You should be able to just use I suspect the issue here is conflicts between fork-based multiprocessing and threads. Oddly, the We are investigating explicit multiprocessing contexts in #447. |
Are you sure it's when you "manipulate" the outputs, or simply return them? Have you tried returning numpy arrays ( If you can make a reproducible example with a test or public dataset, that would be very helpful. |
Yes, I'm sure it happens when I manipulate the outputs and the loop keeps going on if I just return them. |
Could you share the coolers, or can you reproduce with small test files? e.g. https://osf.io/3h9js |
I'll post a test file later, currently I'm not available. Thanks! |
This is super weird. I can reproduce this entirely with random torch tensors and no reading from any file, so this has nothing to do with cooler. The deadlock only seems to occur when launching more than one processing pool. If you reuse the same pool each epoch, it's fine. This would also explain why the import cooler
import torch
import multiprocessing as mp
# def load_targets(args):
# file_path, sampled_regions = args
# cool_path = f"{file_path}::resolutions/1000"
# targets = []
# cooler_file = cooler.Cooler(cool_path)
# print(cooler_file)
# for region in sampled_regions:
# matrix = cooler_file.matrix(balance=False).fetch(region)
# targets.append(torch.tensor(matrix))
# targets = torch.stack(targets)
# print("fetched targets:", targets.shape)
# return targets
def load_targets_synthetic(args):
file_path, sampled_regions = args
targets = []
for region in sampled_regions:
targets.append(torch.rand((2000, 2000)))
targets = torch.stack(targets)
print("fetched targets:", targets.shape)
return targets
def run_reuse_pool(ctx, num_processes=1):
with ctx.Pool(num_processes) as pool:
for i in range(3):
print(f"epoch {i + 1}")
targets = pool.map(load_targets_synthetic, [(file_path, test_regions) for file_path in test_samples])
targets = torch.cat(targets)
def run_multi_pool(ctx, num_processes=1):
for i in range(3):
with ctx.Pool(num_processes) as pool:
print(f"epoch {i + 1}")
targets = pool.map(load_targets_synthetic, [(file_path, test_regions) for file_path in test_samples])
targets = torch.cat(targets)
if __name__ == "__main__":
test_samples = ["test.mcool", "test2.mcool", "test3.mcool", "test4.mcool"]
test_regions = [('chr17', 33236000, 35236000), ('chr2', 219504000, 221504000)]
ctx = mp.get_context('fork')
print("\n\nReuse pool")
run_reuse_pool(ctx)
print("\n\nMultiple pools")
run_multi_pool(ctx) |
And in my hands, the example above works in a "spawn" context: both If this is right, I suspect that multiple pools in a "fork" context are trying to contend for resources shared by torch threads but those threads aren't actually copied in the child processes so the locks end up in inconsistent states. Classic case of don't mix forking and threads. |
Wow, that's interesting. Although I don't really know about threads and I'm not quite sure why the threads didn't successfully join. I thought they should join inside a 'with' block after the code is executed, and therefore the previous epoch shouldn't influence the next one. |
It's not about the threads not joining. It's that when a process spawns a child process using |
To close this issue: in summary, I believe that when you do something like |
Thanks for your explanation, much appreciated! |
I'm using the latest version of cooler(0.9.2). For clarity, the num_processes is set to 1, the problem is the same.
And the output:
You'll find the child process stuck, loop is not forwarding and no error would be raised. The child process just awaits, if you stop the program manually, you'll see:
Interestingly, this only happens if you want to manipulate the results returned from pool. If you don't use these line:
Then the loop will keep going forward and successfully ends.
I've tried several ways to figure it out, including delete the variables after using them, trying to close the file handle of cool file (although they are actually closed automatically), put the code into a
if __name__ == "__main__":
block, changing the method of multiprocessing from fork to spawn, use a multiprocessing lock, clone the tensors in targets before manipulating... But they are not the point. The only thing I know is that manipulation of results blocks (at least one) new child process to fetch data from cool file. As pdb cannot be used in child process, I don't know how to debug.Luckily, I found that solution at last. Instead of using
multiprocessing
ortorch.multiprocessing
,from pathos.multiprocessing import ProcessingPool as Pool
prevents the problem. However, exploring the cause of this phenomenon is beyond my ability. I just hope this will be helpful for those trying to process cool files using multiprocessing.The text was updated successfully, but these errors were encountered: