Skip to content

Commit

Permalink
Add KeyedModelHandler content to the multi-model ML page (apache#28987)
Browse files Browse the repository at this point in the history
  • Loading branch information
rszper authored Oct 13, 2023
1 parent 155f850 commit d636dd3
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,61 @@ captions. The solution consists of two open-source models:
2. **A caption ranking model ([CLIP](https://github.com/openai/CLIP))** that uses the image and
candidate captions to rank the captions in the order in which they best describe the image.

## Use multiple differently-trained models

You can use a `KeyedModelHandler` to load several different models into the `RunInference` transform.
Use the associated key to determine which model to use with which data.
The following example loads a model by using `config1`. That model is used for inference for all examples associated
with `key1`. It loads a second model by using `config2`. That model is used for all examples associated with `key2` and `key3`.

```
from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>))
])
with pipeline as p:
data = p | beam.Create([
('key1', torch.tensor([[1,2,3],[4,5,6],...])),
('key2', torch.tensor([[1,2,3],[4,5,6],...])),
('key3', torch.tensor([[1,2,3],[4,5,6],...])),
])
predictions = data | RunInference(keyed_model_handler)
```

For a more detailed example, see the notebook
[Run ML inference with multiple differently-trained models](https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/per_key_models.ipynb).

Loading multiple models at the same times increases the risk of out of memory errors (OOMs). By default, `KeyedModelHandler` doesn't
limit the number of models loaded into memory at the same time. If the models don't all fit into memory,
your pipeline might fail with an out of memory error. To avoid this issue, use the `max_models_per_worker_hint` parameter
to set the maximum number of models that can be loaded into memory at the same time.

The following example loads at most two models per SDK worker process at a time. It unloads models that aren't
currently in use.

```
mhs = [
KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>)),
KeyModelMapping(['key4'], PytorchModelHandlerTensor(<config3>)),
KeyModelMapping(['key5', 'key6', 'key7'], PytorchModelHandlerTensor(<config4>)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)
```

Runners that have multiple SDK worker processes on a given machine load at most
`max_models_per_worker_hint*<num worker processes>` models onto the machine.

Leave enough space for the models and any additional memory needs from other transforms.
Because the memory might not be released immediately after a model is offloaded,
leaving an additional buffer is recommended.

**Note**: Having many models but a small `max_models_per_worker_hint` can cause _memory thrashing_, where
a large amount of execution time is used to swap models in and out of memory. To reduce the likelihood and impact
of memory thrashing, if you're using a distributed runner, insert a
[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/) transform before your
inference step. The `GroupByKey` transform reduces thrashing by ensuring that elements with the same key and model are
collocated on the same worker.

For more information, see [`KeyedModelHander`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.KeyedModelHandler).
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ For more information on resource hints, see [Resource hints](/documentation/runt
This section suggests patterns and best practices that you can use to make your inference pipelines simpler,
more robust, and more efficient.

### Use a keyed ModelHandler
### Use a keyed ModelHandler object

If a key is attached to the examples, wrap the `KeyedModelHandler` around the `ModelHandler` object:
If a key is attached to the examples, wrap `KeyedModelHandler` around the `ModelHandler` object:

```
from apache_beam.ml.inference.base import KeyedModelHandler
Expand All @@ -213,9 +213,11 @@ with pipeline as p:
predictions = data | RunInference(keyed_model_handler)
```

If you are unsure if your data is keyed, you can also use `MaybeKeyedModelHandler`.
If you are unsure if your data is keyed, you can use `MaybeKeyedModelHandler`.

You can also use a `KeyedModelHandler` to load several different models based on their associated key:
You can also use a `KeyedModelHandler` to load several different models based on their associated key.
The following example loads a model by using `config1`. That model is used for inference for all examples associated
with `key1`. It loads a second model by using `config2`. That model is used for all examples associated with `key2` and `key3`.

```
from apache_beam.ml.inference.base import KeyedModelHandler
Expand All @@ -232,13 +234,16 @@ with pipeline as p:
predictions = data | RunInference(keyed_model_handler)
```

The previous example loads a model by using `config1`. That model is then used for inference for all examples associated
with `key1`. It also loads a model by using `config2`. That model is used for all examples associated with `key2` and `key3`.
For a more detailed example, see the notebook
[Run ML inference with multiple differently-trained models](https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/per_key_models.ipynb).

Loading multiple models at the same times increases the risk of out of memory (OOM) errors. By default, `KeyedModelHandler` doesn't
Loading multiple models at the same times increases the risk of out of memory errors (OOMs). By default, `KeyedModelHandler` doesn't
limit the number of models loaded into memory at the same time. If the models don't all fit into memory,
your pipeline will likely fail with an out of memory error. To avoid this issue, provide a hint about the
maximum number of models that can be loaded at the same time.
your pipeline might fail with an out of memory error. To avoid this issue, use the `max_models_per_worker_hint` parameter
to set the maximum number of models that can be loaded into memory at the same time.

The following example loads at most two models per SDK worker process at a time. It unloads models that aren't
currently in use.

```
mhs = [
Expand All @@ -250,17 +255,18 @@ mhs = [
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)
```

The previous example loads at most two models per SDK worker process at any given time. It unloads models that aren't
currently being used. Runners that have multiple SDK worker processes on a given machine load at most
`max_models_per_worker_hint*<num worker processes>` models onto the machine. Leave enough space for the models
and any additional memory needs from other transforms. Because there might be a delay between when a model is offloaded and when the
memory is released, it is recommended that you leave additional buffer.
Runners that have multiple SDK worker processes on a given machine load at most
`max_models_per_worker_hint*<num worker processes>` models onto the machine.

Leave enough space for the models and any additional memory needs from other transforms.
Because the memory might not be released immediately after a model is offloaded,
leaving an additional buffer is recommended.

**Note**: Having many models but a small `max_models_per_worker_hint` can lead to _memory thrashing_, where
a large amount of execution time is wasted swapping models in and out of memory. To reduce the likelihood and impact
**Note**: Having many models but a small `max_models_per_worker_hint` can cause _memory thrashing_, where
a large amount of execution time is used to swap models in and out of memory. To reduce the likelihood and impact
of memory thrashing, if you're using a distributed runner, insert a
[GroupByKey](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/) transform before your
inference step. This step reduces thrashing by ensuring that elements with the same key and model are
[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/) transform before your
inference step. The `GroupByKey` transform reduces thrashing by ensuring that elements with the same key and model are
collocated on the same worker.

For more information, see [`KeyedModelHander`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.KeyedModelHandler).
Expand Down

0 comments on commit d636dd3

Please sign in to comment.