diff --git a/website/www/site/content/en/documentation/ml/multi-model-pipelines.md b/website/www/site/content/en/documentation/ml/multi-model-pipelines.md index 569a51b8db55..c42c8b8ae661 100644 --- a/website/www/site/content/en/documentation/ml/multi-model-pipelines.md +++ b/website/www/site/content/en/documentation/ml/multi-model-pipelines.md @@ -95,3 +95,61 @@ captions. The solution consists of two open-source models: 2. **A caption ranking model ([CLIP](https://github.com/openai/CLIP))** that uses the image and candidate captions to rank the captions in the order in which they best describe the image. +## Use multiple differently-trained models + +You can use a `KeyedModelHandler` to load several different models into the `RunInference` transform. +Use the associated key to determine which model to use with which data. +The following example loads a model by using `config1`. That model is used for inference for all examples associated +with `key1`. It loads a second model by using `config2`. That model is used for all examples associated with `key2` and `key3`. + +``` +from apache_beam.ml.inference.base import KeyedModelHandler +keyed_model_handler = KeyedModelHandler([ + KeyModelMapping(['key1'], PytorchModelHandlerTensor()), + KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor()) +]) +with pipeline as p: + data = p | beam.Create([ + ('key1', torch.tensor([[1,2,3],[4,5,6],...])), + ('key2', torch.tensor([[1,2,3],[4,5,6],...])), + ('key3', torch.tensor([[1,2,3],[4,5,6],...])), + ]) + predictions = data | RunInference(keyed_model_handler) +``` + +For a more detailed example, see the notebook +[Run ML inference with multiple differently-trained models](https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/per_key_models.ipynb). + +Loading multiple models at the same times increases the risk of out of memory errors (OOMs). By default, `KeyedModelHandler` doesn't +limit the number of models loaded into memory at the same time. If the models don't all fit into memory, +your pipeline might fail with an out of memory error. To avoid this issue, use the `max_models_per_worker_hint` parameter +to set the maximum number of models that can be loaded into memory at the same time. + +The following example loads at most two models per SDK worker process at a time. It unloads models that aren't +currently in use. + +``` +mhs = [ + KeyModelMapping(['key1'], PytorchModelHandlerTensor()), + KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor()), + KeyModelMapping(['key4'], PytorchModelHandlerTensor()), + KeyModelMapping(['key5', 'key6', 'key7'], PytorchModelHandlerTensor()), +] +keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2) +``` + +Runners that have multiple SDK worker processes on a given machine load at most +`max_models_per_worker_hint*` models onto the machine. + +Leave enough space for the models and any additional memory needs from other transforms. +Because the memory might not be released immediately after a model is offloaded, +leaving an additional buffer is recommended. + +**Note**: Having many models but a small `max_models_per_worker_hint` can cause _memory thrashing_, where +a large amount of execution time is used to swap models in and out of memory. To reduce the likelihood and impact +of memory thrashing, if you're using a distributed runner, insert a +[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/) transform before your +inference step. The `GroupByKey` transform reduces thrashing by ensuring that elements with the same key and model are +collocated on the same worker. + +For more information, see [`KeyedModelHander`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.KeyedModelHandler). diff --git a/website/www/site/content/en/documentation/sdks/python-machine-learning.md b/website/www/site/content/en/documentation/sdks/python-machine-learning.md index 0076fa370b0f..a700806f14c6 100644 --- a/website/www/site/content/en/documentation/sdks/python-machine-learning.md +++ b/website/www/site/content/en/documentation/sdks/python-machine-learning.md @@ -197,9 +197,9 @@ For more information on resource hints, see [Resource hints](/documentation/runt This section suggests patterns and best practices that you can use to make your inference pipelines simpler, more robust, and more efficient. -### Use a keyed ModelHandler +### Use a keyed ModelHandler object -If a key is attached to the examples, wrap the `KeyedModelHandler` around the `ModelHandler` object: +If a key is attached to the examples, wrap `KeyedModelHandler` around the `ModelHandler` object: ``` from apache_beam.ml.inference.base import KeyedModelHandler @@ -213,9 +213,11 @@ with pipeline as p: predictions = data | RunInference(keyed_model_handler) ``` -If you are unsure if your data is keyed, you can also use `MaybeKeyedModelHandler`. +If you are unsure if your data is keyed, you can use `MaybeKeyedModelHandler`. -You can also use a `KeyedModelHandler` to load several different models based on their associated key: +You can also use a `KeyedModelHandler` to load several different models based on their associated key. +The following example loads a model by using `config1`. That model is used for inference for all examples associated +with `key1`. It loads a second model by using `config2`. That model is used for all examples associated with `key2` and `key3`. ``` from apache_beam.ml.inference.base import KeyedModelHandler @@ -232,13 +234,16 @@ with pipeline as p: predictions = data | RunInference(keyed_model_handler) ``` -The previous example loads a model by using `config1`. That model is then used for inference for all examples associated -with `key1`. It also loads a model by using `config2`. That model is used for all examples associated with `key2` and `key3`. +For a more detailed example, see the notebook +[Run ML inference with multiple differently-trained models](https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/per_key_models.ipynb). -Loading multiple models at the same times increases the risk of out of memory (OOM) errors. By default, `KeyedModelHandler` doesn't +Loading multiple models at the same times increases the risk of out of memory errors (OOMs). By default, `KeyedModelHandler` doesn't limit the number of models loaded into memory at the same time. If the models don't all fit into memory, -your pipeline will likely fail with an out of memory error. To avoid this issue, provide a hint about the -maximum number of models that can be loaded at the same time. +your pipeline might fail with an out of memory error. To avoid this issue, use the `max_models_per_worker_hint` parameter +to set the maximum number of models that can be loaded into memory at the same time. + +The following example loads at most two models per SDK worker process at a time. It unloads models that aren't +currently in use. ``` mhs = [ @@ -250,17 +255,18 @@ mhs = [ keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2) ``` -The previous example loads at most two models per SDK worker process at any given time. It unloads models that aren't -currently being used. Runners that have multiple SDK worker processes on a given machine load at most -`max_models_per_worker_hint*` models onto the machine. Leave enough space for the models -and any additional memory needs from other transforms. Because there might be a delay between when a model is offloaded and when the -memory is released, it is recommended that you leave additional buffer. +Runners that have multiple SDK worker processes on a given machine load at most +`max_models_per_worker_hint*` models onto the machine. + +Leave enough space for the models and any additional memory needs from other transforms. +Because the memory might not be released immediately after a model is offloaded, +leaving an additional buffer is recommended. -**Note**: Having many models but a small `max_models_per_worker_hint` can lead to _memory thrashing_, where -a large amount of execution time is wasted swapping models in and out of memory. To reduce the likelihood and impact +**Note**: Having many models but a small `max_models_per_worker_hint` can cause _memory thrashing_, where +a large amount of execution time is used to swap models in and out of memory. To reduce the likelihood and impact of memory thrashing, if you're using a distributed runner, insert a -[GroupByKey](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/) transform before your -inference step. This step reduces thrashing by ensuring that elements with the same key and model are +[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/) transform before your +inference step. The `GroupByKey` transform reduces thrashing by ensuring that elements with the same key and model are collocated on the same worker. For more information, see [`KeyedModelHander`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.KeyedModelHandler).