Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spacy + Dask? #5111

Open
AlJohri opened this issue Mar 5, 2020 · 9 comments
Open

Spacy + Dask? #5111

AlJohri opened this issue Mar 5, 2020 · 9 comments
Labels
bug Bugs and behaviour differing from documentation scaling Scaling, serving and parallelizing spaCy v3.0 Related to v3.0

Comments

@AlJohri
Copy link
Contributor

AlJohri commented Mar 5, 2020

I'm unable to get a simple example using spacy + dask.distributed up and running. In the context of a Jupyter Notebook, I recieve this error:

OSError: [E050] Can't find model 'en_core_web_lg.vectors'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.

In the context of a script, it simply hangs.

How to reproduce the behavior

Here's my attempt at creating a reproducible script although I get slightly different behavior in a notebook setting.

"""
python3 -m venv .venv
.venv/bin/pip install -e git+https://github.com/explosion/spaCy#egg=spaCy
# explicitly download spacy model because otherwise "No compatible models found for v2.2.4 of spaCy"
.venv/bin/pip install https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-2.2.5/en_core_web_lg-2.2.5.tar.gz
.venv/bin/pip install 'dask[bag]'
.venv/bin/pip install distributed
.venv/bin/python debug.py
"""

import os
import pathlib
import thinc
import spacy
import json
import dask.bag as db
from dask.distributed import Client, progress

print(spacy.__version__)
print(thinc.__version__)

nlp = spacy.load('en_core_web_lg')

def process(text):
    print(thinc.extra.load_nlp.VECTORS)
    return [str(x) for x in nlp(text).ents]

if __name__ == '__main__':
	client = Client(n_workers=2, threads_per_worker=1)
	docs = ["A quick brown fox jumps over a lazy dog."] * 10
	entities = db.from_sequence(docs).map(process).compute()
	print(entities)
Full Traceback:
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<timed exec> in <module>

/usr/local/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    164         dask.base.compute
    165         """
--> 166         (result,) = compute(self, traverse=False, **kwargs)
    167         return result
    168 

/usr/local/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    435     keys = [x.__dask_keys__() for x in collections]
    436     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437     results = schedule(dsk, keys, **kwargs)
    438     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    439 

/usr/local/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2593                     should_rejoin = False
   2594             try:
-> 2595                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2596             finally:
   2597                 for f in futures.values():

/usr/local/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1885             else:
   1886                 local_worker = None
-> 1887             return self.sync(
   1888                 self._gather,
   1889                 futures,

/usr/local/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    777             return future
    778         else:
--> 779             return sync(
    780                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    781             )

/usr/local/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    346     if error[0]:
    347         typ, exc, tb = error[0]
--> 348         raise exc.with_traceback(tb)
    349     else:
    350         return result[0]

/usr/local/lib/python3.8/site-packages/distributed/utils.py in f()
    330             if callback_timeout is not None:
    331                 future = asyncio.wait_for(future, callback_timeout)
--> 332             result[0] = yield future
    333         except Exception as exc:
    334             error[0] = sys.exc_info()

/usr/local/lib/python3.8/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/usr/local/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1750                             exc = CancelledError(key)
   1751                         else:
-> 1752                             raise exception.with_traceback(traceback)
   1753                         raise exc
   1754                     if errors == "skip":

/usr/local/lib/python3.8/site-packages/dask/bag/core.py in reify()
   1833 def reify(seq):
   1834     if isinstance(seq, Iterator):
-> 1835         seq = list(seq)
   1836     if len(seq) and isinstance(seq[0], Iterator):
   1837         seq = list(map(list, seq))

/usr/local/lib/python3.8/site-packages/dask/bag/core.py in __next__()
   2020             kwargs = dict(zip(self.kwarg_keys, vals[-self.nkws :]))
   2021             return self.f(*args, **kwargs)
-> 2022         return self.f(*vals)
   2023 
   2024     def check_all_iterators_consumed(self):

<ipython-input-7-8c491d0e578a> in process()
      3     doc = json.loads(row)
      4     text = "\n".join(x["content"] for x in doc["content"])
----> 5     return [str(x) for x in nlp(text).ents]

/usr/local/lib/python3.8/site-packages/spacy/language.py in __call__()
    437             if not hasattr(proc, "__call__"):
    438                 raise ValueError(Errors.E003.format(component=type(proc), name=name))
--> 439             doc = proc(doc, **component_cfg.get(name, {}))
    440             if doc is None:
    441                 raise ValueError(Errors.E005.format(name=name))

pipes.pyx in spacy.pipeline.pipes.Tagger.__call__()

pipes.pyx in spacy.pipeline.pipes.Tagger.predict()

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in __call__()
    165             Must match expected shape
    166         """
--> 167         return self.predict(x)
    168 
    169     def pipe(self, stream, batch_size=128):

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/feed_forward.py in predict()
     38     def predict(self, X):
     39         for layer in self._layers:
---> 40             X = layer(X)
     41         return X
     42 

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in __call__()
    165             Must match expected shape
    166         """
--> 167         return self.predict(x)
    168 
    169     def pipe(self, stream, batch_size=128):

/usr/local/lib/python3.8/site-packages/thinc/api.py in predict()
    308     def predict(seqs_in):
    309         lengths = layer.ops.asarray([len(seq) for seq in seqs_in])
--> 310         X = layer(layer.ops.flatten(seqs_in, pad=pad))
    311         return layer.ops.unflatten(X, lengths, pad=pad)
    312 

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in __call__()
    165             Must match expected shape
    166         """
--> 167         return self.predict(x)
    168 
    169     def pipe(self, stream, batch_size=128):

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/feed_forward.py in predict()
     38     def predict(self, X):
     39         for layer in self._layers:
---> 40             X = layer(X)
     41         return X
     42 

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in __call__()
    165             Must match expected shape
    166         """
--> 167         return self.predict(x)
    168 
    169     def pipe(self, stream, batch_size=128):

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in predict()
    129 
    130     def predict(self, X):
--> 131         y, _ = self.begin_update(X, drop=None)
    132         return y
    133 

/usr/local/lib/python3.8/site-packages/thinc/api.py in uniqued_fwd()
    377         )
    378         X_uniq = layer.ops.xp.ascontiguousarray(X[ind])
--> 379         Y_uniq, bp_Y_uniq = layer.begin_update(X_uniq, drop=drop)
    380         Y = Y_uniq[inv].reshape((X.shape[0],) + Y_uniq.shape[1:])
    381 

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/feed_forward.py in begin_update()
     44         callbacks = []
     45         for layer in self._layers:
---> 46             X, inc_layer_grad = layer.begin_update(X, drop=drop)
     47             callbacks.append(inc_layer_grad)
     48 

/usr/local/lib/python3.8/site-packages/thinc/api.py in begin_update()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in <listcomp>()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in wrap()
    254 
    255     def wrap(*args, **kwargs):
--> 256         output = func(*args, **kwargs)
    257         if splitter is None:
    258             to_keep, to_sink = output

/usr/local/lib/python3.8/site-packages/thinc/api.py in begin_update()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in <listcomp>()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in wrap()
    254 
    255     def wrap(*args, **kwargs):
--> 256         output = func(*args, **kwargs)
    257         if splitter is None:
    258             to_keep, to_sink = output

/usr/local/lib/python3.8/site-packages/thinc/api.py in begin_update()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in <listcomp>()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in wrap()
    254 
    255     def wrap(*args, **kwargs):
--> 256         output = func(*args, **kwargs)
    257         if splitter is None:
    258             to_keep, to_sink = output

/usr/local/lib/python3.8/site-packages/thinc/api.py in begin_update()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in <listcomp>()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in wrap()
    254 
    255     def wrap(*args, **kwargs):
--> 256         output = func(*args, **kwargs)
    257         if splitter is None:
    258             to_keep, to_sink = output

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/static_vectors.py in begin_update()
     58         if ids.ndim >= 2:
     59             ids = self.ops.xp.ascontiguousarray(ids[:, self.column])
---> 60         vector_table = self.get_vectors()
     61         vectors = vector_table[ids * (ids < vector_table.shape[0])]
     62         vectors = self.ops.xp.ascontiguousarray(vectors)

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/static_vectors.py in get_vectors()
     53 
     54     def get_vectors(self):
---> 55         return get_vectors(self.ops, self.lang)
     56 
     57     def begin_update(self, ids, drop=0.0):

/usr/local/lib/python3.8/site-packages/thinc/extra/load_nlp.py in get_vectors()
     24     key = (ops.device, lang)
     25     if key not in VECTORS:
---> 26         nlp = get_spacy(lang)
     27         VECTORS[key] = nlp.vocab.vectors.data
     28     return VECTORS[key]

/usr/local/lib/python3.8/site-packages/thinc/extra/load_nlp.py in get_spacy()
     12 
     13     if lang not in SPACY_MODELS:
---> 14         SPACY_MODELS[lang] = spacy.load(lang, **kwargs)
     15     return SPACY_MODELS[lang]
     16 

/usr/local/lib/python3.8/site-packages/spacy/__init__.py in load()
     28     if depr_path not in (True, False, None):
     29         deprecation_warning(Warnings.W001.format(path=depr_path))
---> 30     return util.load_model(name, **overrides)
     31 
     32 

/usr/local/lib/python3.8/site-packages/spacy/util.py in load_model()
    167     elif hasattr(name, "exists"):  # Path or Path-like to model data
    168         return load_model_from_path(name, **overrides)
--> 169     raise IOError(Errors.E050.format(name=name))
    170 
    171 

OSError: [E050] Can't find model 'en_core_web_lg.vectors'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.

Info about spaCy

  • spaCy version: 2.2.4.dev0
  • Platform: macOS-10.15.3-x86_64-i386-64bit
  • Python version: 3.8.1
@AlJohri
Copy link
Contributor Author

AlJohri commented Mar 5, 2020

Related: #3552

@svlandeg svlandeg added the scaling Scaling, serving and parallelizing spaCy label Mar 5, 2020
@svlandeg
Copy link
Member

svlandeg commented Mar 5, 2020

Thanks for the detailed report!

I wonder whether this may actually be solved by the very recent PR #5081 that was just merged onto master 2 days ago. It is (hopefully) the same issue that we would encounter when running nlp.pipe(..., n_process=2) because the child processes (produced by spawn on Windows and MacOS) would not have access to the global state of the parent process, and the vector data, such as en_core_web_lg.vectors, is stored in this global state. Actually that will change from spacy v.3 onwards, but we're not quite there yet. So that PR makes sure that the global state is properly transferred to the children instead.

Is there any chance you could try building the current master branch from source, and see whether that would fix your issue?

@svlandeg svlandeg added the more-info-needed This issue needs more information label Mar 5, 2020
@AlJohri
Copy link
Contributor Author

AlJohri commented Mar 5, 2020

@svlandeg I think by running pip install -e git+https://github.com/explosion/spaCy#egg=spaCy, I'm already using the latest code from the master branch- can you confirm?

@no-response no-response bot removed the more-info-needed This issue needs more information label Mar 5, 2020
@svlandeg
Copy link
Member

svlandeg commented Mar 6, 2020

Ah, yes, that should do it. You can double check in the src folder whether spacy/spacy/language.py has a method _apply_pipes with parameter vectors - then you have that latest PR.

@svlandeg
Copy link
Member

svlandeg commented Mar 6, 2020

What is annoying, is that I can't really reproduce this. I actually run into a MemoryError when running your code snippet, which seems kind of ridiculous. The vectors are definitely the culprit though, because it works just fine when you move the nlp = spacy.load('en_core_web_lg') statement inside the process function.

@svlandeg svlandeg added the bug Bugs and behaviour differing from documentation label Mar 6, 2020
@AlJohri
Copy link
Contributor Author

AlJohri commented Mar 6, 2020

@svlandeg let me attach a jupyter notebook where I could more reliably produce the error early next week

I think by moving nlp = spacy.load('en_core_web_lg') into the function it sidesteps the error as it runs spacy.load directly on the workers and it doesn't need to copy globals from the parent process

based on that, this pattern has been working for me so far:

from dask.distributed import get_worker
def process(doc):
    
    worker = get_worker()
    try:
        nlp = worker.nlp
    except AttributeError:
        nlp = spacy.load(model)
        worker.nlp = nlp

    return [str(x) for x in nlp(doc).ents]

ents = bag.map(process).compute()

# same approach works if you run want to process one partition at a time
# which allows using `nlp.pipe` with some batch size which is even faster
ents = bag.map_partition(process_partition).compute()

@svlandeg svlandeg added the v3.0 Related to v3.0 label Mar 19, 2020
@svlandeg
Copy link
Member

Considering there's a workaround for now (though it might not be ideal), I suggest to revisit this issue once we're closer to releasing the v.3 version. Hopefully the refactored vectors should make this a non-issue then.

@rtbs-dev
Copy link

Just wanted to add that the workaround seems to have limitations, though I have not yet had time to fully investigate the root cause: usnistgov/cv-py#1

@honnibal
Copy link
Member

I think you'll have an easier time of things if you try to parallelise over larger units of work, e.g. a few dozen megabytes of compressed text per process. I would recommend chunking the text beforehand in a preprocess, and just passing the input and output path to dask. You'd then have something like this as your remote task:

def process_job(model_name, input_path, output_path):
    nlp = load_or_get_model(model_name)
    outputs = []
    texts = read_texts(input_path):
    for doc in nlp.pipe(texts):
        outputs.append(convert_output(doc))
    write_outputs(outputs, output_path)

I advise against trying to handle the parallelism too transparently. Really fine-grained task distribution is a method of last resort, for when the scheduling is difficult enough that you can't really do it manually. When you have a simple loop, you're much better off cutting it up into chunks yourself, and only distributing units of work that take at least a few minutes to complete. It's much more efficient and far more reliable, because you can manage failure by just repeating the jobs where the output files aren't there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Bugs and behaviour differing from documentation scaling Scaling, serving and parallelizing spaCy v3.0 Related to v3.0
Projects
None yet
Development

No branches or pull requests

4 participants