Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

benchmark server pipeline #1600

Merged
merged 12 commits into from
Mar 6, 2024
Next Next commit
benchmark server pipeline
  • Loading branch information
horheynm committed Feb 12, 2024
commit 9ccf39c66f670d14c2a91861372e3c3d0ffe88ae
61 changes: 61 additions & 0 deletions src/deepsparse/benchmark/benchmark_pipeline.py
Original file line number Diff line number Diff line change
@@ -401,6 +401,67 @@ def _get_statistics(batch_times):
return sections, all_sections


def benchmark_from_pipeline(
horheynm marked this conversation as resolved.
Show resolved Hide resolved
pipeline: Pipeline,
batch_size: int = 1,
seconds_to_run: int = 10,
warmup_time: int = 2,
thread_pinning: str = "core",
scenario: str = "sync",
num_streams: int = 1,
data_type: str = "dummy",
**kwargs,
):
decide_thread_pinning(thread_pinning)
scenario = parse_scenario(scenario.lower())

input_type = data_type

config = PipelineBenchmarkConfig(
data_type=data_type,
**kwargs,
)
inputs = create_input_schema(pipeline, input_type, batch_size, config)

def _clear_measurements():
# Helper method to handle variations between v1 and v2 timers
if hasattr(pipeline.timer_manager, "clear"):
pipeline.timer_manager.clear()
else:
pipeline.timer_manager.measurements.clear()

if scenario == "singlestream":
singlestream_benchmark(pipeline, inputs, warmup_time)
_clear_measurements()
start_time = time.perf_counter()
singlestream_benchmark(pipeline, inputs, seconds_to_run)
elif scenario == "multistream":
multistream_benchmark(pipeline, inputs, warmup_time, num_streams)
_clear_measurements()
start_time = time.perf_counter()
multistream_benchmark(pipeline, inputs, seconds_to_run, num_streams)
elif scenario == "elastic":
multistream_benchmark(pipeline, inputs, warmup_time, num_streams)
_clear_measurements()
start_time = time.perf_counter()
multistream_benchmark(pipeline, inputs, seconds_to_run, num_streams)
else:
raise Exception(f"Unknown scenario '{scenario}'")

end_time = time.perf_counter()
total_run_time = end_time - start_time
if hasattr(pipeline.timer_manager, "all_times"):
batch_times = pipeline.timer_manager.all_times
else:
batch_times = pipeline.timer_manager.measurements
if len(batch_times) == 0:
raise Exception(
"Generated no batch timings, try extending benchmark time with '--time'"
)

return batch_times, total_run_time, num_streams


@click.command()
@click.argument("task_name", type=str)
@click.argument("model_path", type=str)
34 changes: 34 additions & 0 deletions src/deepsparse/benchmark/config.py
Original file line number Diff line number Diff line change
@@ -83,3 +83,37 @@ class PipelineBenchmarkConfig(BaseModel):
default={},
description=("Additional arguments passed to input schema creations "),
)


class PipelineBenchmarkServerConfig(PipelineBenchmarkConfig):
batch_size: int = Field(
default=1,
description="The batch size of the inputs to be used with the engine",
)
seconds_to_run: int = Field(
default=10,
description="The number of seconds to run benchmark for",
)
warmup_time: int = Field(
default=2,
description="The length to run pipeline before beginning benchmark",
)
thread_pinning: str = Field(
default="core",
description="To enable binding threads to cores",
)
scenario: str = Field(
default="sync",
description=(
"`BenchmarkScenario` object with specification for running "
"benchmark on an onnx model"
),
)
num_streams: int = Field(
default=1,
description=(
" The max number of requests the model can handle "
"concurrently. None or 0 implies a scheduler-defined default value; "
"default None"
),
)
8 changes: 7 additions & 1 deletion src/deepsparse/middlewares/middleware.py
Original file line number Diff line number Diff line change
@@ -112,7 +112,9 @@ class MiddlewareManager:
:param _lock: lock for the state
"""

def __init__(self, middleware: Optional[Sequence[MiddlewareSpec]], *args, **kwargs):
def __init__(
self, middleware: Optional[Sequence[MiddlewareSpec]] = None, *args, **kwargs
):

self.middleware: Optional[
Sequence[MiddlewareSpec]
@@ -172,3 +174,7 @@ def _update_middleware_spec_send(
next_middleware.send = self.recieve

self.middleware.append(MiddlewareSpec(next_middleware, **init_args))

@property
def middlewares(self):
return [middleware.cls for middleware in self.middleware]
2 changes: 1 addition & 1 deletion src/deepsparse/pipeline.py
Original file line number Diff line number Diff line change
@@ -91,7 +91,7 @@ def __init__(
self.schedulers = schedulers
self.pipeline_state = pipeline_state
self._continuous_batching_scheduler = continuous_batching_scheduler
self.middleware_manager = middleware_manager
self.middleware_manager = middleware_manager or MiddlewareManager()
self.timer_manager = timer_manager or TimerManager()
self.validate()

4 changes: 4 additions & 0 deletions src/deepsparse/pipeline_config.py
Original file line number Diff line number Diff line change
@@ -73,6 +73,10 @@ class PipelineConfig(BaseModel):
"with multiple models. Default is None"
),
)
middlewares: Optional[List[str]] = Field(
default=None,
description="Middlewares to use",
)
kwargs: Optional[Dict[str, Any]] = Field(
default={},
description=(
4 changes: 4 additions & 0 deletions src/deepsparse/server/config.py
Original file line number Diff line number Diff line change
@@ -127,6 +127,9 @@ class EndpointConfig(BaseModel):
"```\n"
),
)
middlewares: Optional[List[str]] = Field(
default=None, description=("Middleware to use")
)

kwargs: Dict[str, Any] = Field(
default={}, description="Additional arguments to pass to the Pipeline"
@@ -147,6 +150,7 @@ def to_pipeline_config(self) -> PipelineConfig:
num_cores=None, # this will be set from Context
alias=self.name,
input_shapes=input_shapes,
middlewares=self.middlewares,
kwargs=kwargs,
)

67 changes: 67 additions & 0 deletions src/deepsparse/server/deepsparse_server.py
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
from functools import partial

from deepsparse import Pipeline
from deepsparse.middlewares import MiddlewareSpec, TimerMiddleware
from deepsparse.server.config import EndpointConfig
from deepsparse.server.server import CheckReady, ModelMetaData, ProxyPipeline, Server
from deepsparse.tasks import SupportedTasks
@@ -86,6 +87,11 @@ def _add_endpoint(
endpoint_config,
pipeline,
)
self._add_benchmark_endpoints(
app,
endpoint_config,
pipeline,
)
self._add_status_and_metadata_endpoints(app, endpoint_config, pipeline)

def _add_status_and_metadata_endpoints(
@@ -180,3 +186,64 @@ def _add_inference_endpoints(
methods=["POST"],
tags=["model", "inference"],
)

def _add_benchmark_endpoints(
self,
app: FastAPI,
endpoint_config: EndpointConfig,
pipeline: Pipeline,
):
if TimerMiddleware not in pipeline.middleware_manager.middlewares:
pipeline.middleware_manager.add_middleware(
[MiddlewareSpec(TimerMiddleware)]
)

routes_and_fns = []
if endpoint_config.route:
endpoint_config.route = self.clean_up_route(endpoint_config.route)
route = f"/v2/models{endpoint_config.route}/benchmark"
else:
route = f"/v2/models/{endpoint_config.name}/benchmark"

routes_and_fns.append(
(
route,
partial(
Server.benchmark,
ProxyPipeline(pipeline),
self.server_config.system_logging,
),
)
)

legacy_pipeline = not isinstance(pipeline, Pipeline) and hasattr(
pipeline.input_schema, "from_files"
)
# New pipelines do not have to have an input_schema. Just checking task
# names for now but can keep a list of supported from_files tasks in
# SupportedTasks as more pipelines are migrated as well as output schemas.
new_pipeline = SupportedTasks.is_image_classification(endpoint_config.task)

if legacy_pipeline or new_pipeline:
routes_and_fns.append(
(
route + "/from_files",
partial(
Server.predict_from_files,
ProxyPipeline(pipeline),
self.server_config.system_logging,
),
)
)
if isinstance(pipeline, Pipeline):
response_model = None
else:
response_model = pipeline.output_schema

self._update_routes(
app=app,
routes_and_fns=routes_and_fns,
response_model=response_model,
methods=["POST"],
tags=["model", "pipeline"],
)
16 changes: 16 additions & 0 deletions src/deepsparse/server/server.py
Original file line number Diff line number Diff line change
@@ -24,6 +24,8 @@
from pydantic import BaseModel

import uvicorn
from deepsparse.benchmark.benchmark_pipeline import benchmark_from_pipeline
from deepsparse.benchmark.config import PipelineBenchmarkConfig
from deepsparse.engine import Context
from deepsparse.pipeline import Pipeline
from deepsparse.server.config import ServerConfig, SystemLoggingConfig
@@ -274,6 +276,20 @@ async def format_response():

return prep_for_serialization(pipeline_outputs)

@staticmethod
async def benchmark(
proxy_pipeline: ProxyPipeline,
system_logging_config: SystemLoggingConfig,
raw_request: Request,
):
json_params = await raw_request.json()
benchmark_config = PipelineBenchmarkConfig(**json_params)
results = benchmark_from_pipeline(
pipeline=proxy_pipeline.pipeline, **benchmark_config.dict()
)

return results

@staticmethod
async def predict_from_files(
proxy_pipeline: ProxyPipeline,
Loading