Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cooler not compatible with multiprocessing #441

Closed
pghzeng opened this issue Nov 30, 2024 · 11 comments
Closed

Cooler not compatible with multiprocessing #441

pghzeng opened this issue Nov 30, 2024 · 11 comments

Comments

@pghzeng
Copy link

pghzeng commented Nov 30, 2024

I'm using the latest version of cooler(0.9.2). For clarity, the num_processes is set to 1, the problem is the same.

# a basic example
import cooler
import torch
from torch.multiprocessing import Pool
# from multiprocessing import Pool
# from pathos.multiprocessing import ProcessingPool as Pool
import gc

def load_targets(args):
    sample_name, cool_dir, sampled_regions = args
    cool_path = f"{cool_dir}/{sample_name}.sumnorm.mcool::/"
    targets = []
    cooler_file = cooler.Cooler(cool_path)
    print(cooler_file)
    for region in sampled_regions:
        # pdb.set_trace()
        matrix = cooler_file.matrix(balance=False).fetch(region)
        targets.append(torch.tensor(matrix))
    targets = torch.stack(targets)
    print("fetched targets", targets.shape)
    # del cooler_file
    # gc.collect()
    return targets

cool_dir = '/mnt/d/cools/'
test_samples = ["K562_MboI", "endoC", "AoTCPCs", "Liver"]
test_regions = [('chr12', 93236000, 95236000), ('chr2', 219504000, 221504000)]

num_processes = 1
for i in range(3):
    print("epoch%s!!!" % i)
    with Pool(num_processes) as pool:
        targets = pool.map(load_targets, [(sample_name, cool_dir, test_regions) for sample_name in test_samples])
    targets = torch.cat(targets)
    print("Shape!!!", targets.shape)
    # del targets
    # gc.collect()

And the output:

epoch0!!!
<Cooler "K562_MboI.sumnorm.mcool::/">
fetched targets torch.Size([2, 500, 500])
<Cooler "endoC.sumnorm.mcool::/">
fetched targets torch.Size([2, 500, 500])
<Cooler "AoTCPCs.sumnorm.mcool::/">
fetched targets torch.Size([2, 500, 500])
<Cooler "Liver.sumnorm.mcool::/">
fetched targets torch.Size([2, 500, 500])
Shape!!! torch.Size([8, 500, 500])
epoch1!!!
<Cooler "K562_MboI.sumnorm.mcool::/">
<Cooler "endoC.sumnorm.mcool::/">
fetched targets torch.Size([2, 500, 500])
<Cooler "AoTCPCs.sumnorm.mcool::/">
fetched targets torch.Size([2, 500, 500])
<Cooler "Liver.sumnorm.mcool::/">
fetched targets torch.Size([2, 500, 500])

You'll find the child process stuck, loop is not forwarding and no error would be raised. The child process just awaits, if you stop the program manually, you'll see:

KeyboardInterrupt                         Traceback (most recent call last)
Cell In[1], line 39
     36 print("epoch%s!!!" % i)
     37 with Pool(num_processes) as pool:
     38     # targets = pool.starmap(load_targets, [(sample_name, cool_dir, test_regions) for sample_name in test_samples])
---> 39     targets = pool.map(load_targets, [(sample_name, cool_dir, test_regions) for sample_name in test_samples])
     40     # print(targets)
     41 targets = torch.cat(targets)

File ~/miniconda3/envs/env/lib/python3.9/multiprocessing/pool.py:364, in Pool.map(self, func, iterable, chunksize)
    359 def map(self, func, iterable, chunksize=None):
    360     '''
    361     Apply `func` to each element in `iterable`, collecting the results
    362     in a list that is returned.
    363     '''
--> 364     return self._map_async(func, iterable, mapstar, chunksize).get()

File ~/miniconda3/envs/env/lib/python3.9/multiprocessing/pool.py:765, in ApplyResult.get(self, timeout)
    764 def get(self, timeout=None):
--> 765     self.wait(timeout)
    766     if not self.ready():
    767         raise TimeoutError

File ~/miniconda3/envs/env/lib/python3.9/multiprocessing/pool.py:762, in ApplyResult.wait(self, timeout)
    761 def wait(self, timeout=None):
--> 762     self._event.wait(timeout)

File ~/miniconda3/envs/env/lib/python3.9/threading.py:581, in Event.wait(self, timeout)
    579 signaled = self._flag
    580 if not signaled:
--> 581     signaled = self._cond.wait(timeout)
    582 return signaled

File ~/miniconda3/envs/env/lib/python3.9/threading.py:312, in Condition.wait(self, timeout)
    310 try:    # restore state no matter what (e.g., KeyboardInterrupt)
    311     if timeout is None:
--> 312         waiter.acquire()
    313         gotit = True
    314     else:

KeyboardInterrupt: 

Interestingly, this only happens if you want to manipulate the results returned from pool. If you don't use these line:

targets = torch.cat(targets)
# or 
targets = torch.stack(targets)
# or some other manipulation I havn't tested

Then the loop will keep going forward and successfully ends.

I've tried several ways to figure it out, including delete the variables after using them, trying to close the file handle of cool file (although they are actually closed automatically), put the code into a if __name__ == "__main__": block, changing the method of multiprocessing from fork to spawn, use a multiprocessing lock, clone the tensors in targets before manipulating... But they are not the point. The only thing I know is that manipulation of results blocks (at least one) new child process to fetch data from cool file. As pdb cannot be used in child process, I don't know how to debug.

Luckily, I found that solution at last. Instead of using multiprocessing or torch.multiprocessing,
from pathos.multiprocessing import ProcessingPool as Pool prevents the problem. However, exploring the cause of this phenomenon is beyond my ability. I just hope this will be helpful for those trying to process cool files using multiprocessing.

@nvictus
Copy link
Member

nvictus commented Dec 19, 2024

Can you provide more details on your OS, Python version?

You should be able to just use multiprocess.Pool instead of pathos.multiprocessing.ProcessPool which just aliases the former. The former is a dependency of cooler.

I suspect the issue here is conflicts between fork-based multiprocessing and threads. Oddly, the multiprocess library seems to preserve fork-based multiprocessing on all POSIX platforms by default, which is problematic, while the standard library has switched to the "spawn" method on Mac OS already.

We are investigating explicit multiprocessing contexts in #447.

@nvictus
Copy link
Member

nvictus commented Dec 19, 2024

Interestingly, this only happens if you want to manipulate the results returned from pool.

Are you sure it's when you "manipulate" the outputs, or simply return them? Have you tried returning numpy arrays (np.dstack) instead of torch tensors?

If you can make a reproducible example with a test or public dataset, that would be very helpful.

@pghzeng
Copy link
Author

pghzeng commented Dec 19, 2024

Interestingly, this only happens if you want to manipulate the results returned from pool.

Are you sure it's when you "manipulate" the outputs, or simply return them? Have you tried returning numpy arrays (np.dstack) instead of torch tensors?

If you can make a reproducible example with a test or public dataset, that would be very helpful.

Yes, I'm sure it happens when I manipulate the outputs and the loop keeps going on if I just return them.
My OS is Ubuntu 22.04.2, and I I'm using python 3.9.17. The code provided at the very first is reproducible for me.
Thanks for your reply!

@nvictus
Copy link
Member

nvictus commented Dec 19, 2024

Could you share the coolers, or can you reproduce with small test files? e.g. https://osf.io/3h9js

@pghzeng
Copy link
Author

pghzeng commented Dec 19, 2024

Could you share the coolers, or can you reproduce with small test files? e.g. https://osf.io/3h9js

I'll post a test file later, currently I'm not available. Thanks!

@nvictus
Copy link
Member

nvictus commented Dec 19, 2024

This is super weird. I can reproduce this entirely with random torch tensors and no reading from any file, so this has nothing to do with cooler.

The deadlock only seems to occur when launching more than one processing pool. If you reuse the same pool each epoch, it's fine. This would also explain why the pathos implementation works: I was wrong about the ProcessPool object in my last post. It seems like pathos implements its own shim object that reuses the same Pool.

import cooler
import torch
import multiprocessing as mp


# def load_targets(args):
#     file_path, sampled_regions = args
#     cool_path = f"{file_path}::resolutions/1000"
#     targets = []
#     cooler_file = cooler.Cooler(cool_path)
#     print(cooler_file)
#     for region in sampled_regions:
#         matrix = cooler_file.matrix(balance=False).fetch(region)
#         targets.append(torch.tensor(matrix))
#     targets = torch.stack(targets)
#     print("fetched targets:", targets.shape)
#     return targets


def load_targets_synthetic(args):
    file_path, sampled_regions = args
    targets = []
    for region in sampled_regions:
        targets.append(torch.rand((2000, 2000)))
    targets = torch.stack(targets)
    print("fetched targets:", targets.shape)
    return targets


def run_reuse_pool(ctx, num_processes=1):
    with ctx.Pool(num_processes) as pool:
        for i in range(3):
            print(f"epoch {i + 1}")
            targets = pool.map(load_targets_synthetic, [(file_path, test_regions) for file_path in test_samples])
            targets = torch.cat(targets)


def run_multi_pool(ctx, num_processes=1):
    for i in range(3):
        with ctx.Pool(num_processes) as pool:
            print(f"epoch {i + 1}")
            targets = pool.map(load_targets_synthetic, [(file_path, test_regions) for file_path in test_samples])
            targets = torch.cat(targets)


if __name__ == "__main__":
    test_samples = ["test.mcool", "test2.mcool", "test3.mcool", "test4.mcool"]
    test_regions = [('chr17', 33236000, 35236000), ('chr2', 219504000, 221504000)]
    ctx = mp.get_context('fork')

    print("\n\nReuse pool")
    run_reuse_pool(ctx)

    print("\n\nMultiple pools")
    run_multi_pool(ctx)

@nvictus
Copy link
Member

nvictus commented Dec 19, 2024

And in my hands, the example above works in a "spawn" context: both run_reuse_pool and run_multi_pool complete, both on synthetic data and loading data from files.

If this is right, I suspect that multiple pools in a "fork" context are trying to contend for resources shared by torch threads but those threads aren't actually copied in the child processes so the locks end up in inconsistent states. Classic case of don't mix forking and threads.

@pghzeng
Copy link
Author

pghzeng commented Dec 20, 2024

run_reuse_pool

Wow, that's interesting. Although I don't really know about threads and I'm not quite sure why the threads didn't successfully join. I thought they should join inside a 'with' block after the code is executed, and therefore the previous epoch shouldn't influence the next one.
I've once tested 'spawn' but something else wrong happend. Anyway, I would test it in the future with pathos, and thanks for your help and time!

@nvictus
Copy link
Member

nvictus commented Dec 20, 2024

I'm not quite sure why the threads didn't successfully join

It's not about the threads not joining. It's that when a process spawns a child process using fork(), the child process inherits a copy of the locks and synchronization primitives from the parent in an undefined state (as they are at the time of forking) but it does not inherit the threads themselves that may hold those locks. So when a thread in the parent changes the state of those primitives, they are not reflected in the child. The resulting state inconsistency between child and parent can cause deadlock. This is why fork in a multithreaded context is considered dangerous.

@nvictus
Copy link
Member

nvictus commented Dec 20, 2024

To close this issue: in summary, I believe that when you do something like torch.cat before launching a process pool via fork, some threading primitives are created before the fork occurs and end up in inconsistent states between the parent and the workers, causing deadlock. If the primitives are created after forking (e.g. when you reuse the same pool), you're "safe".

@nvictus nvictus closed this as completed Dec 20, 2024
@pghzeng
Copy link
Author

pghzeng commented Dec 21, 2024

To close this issue: in summary, I believe that when you do something like torch.cat before launching a process pool via fork, some threading primitives are created before the fork occurs and end up in inconsistent states between the parent and the workers, causing deadlock. If the primitives are created after forking (e.g. when you reuse the same pool), you're "safe".

Thanks for your explanation, much appreciated!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants