Skip to content

Commit

Permalink
benchmark server pipeline (#1600)
Browse files Browse the repository at this point in the history
* benchmark server pipeline

* pass tests

* comments

* raise watning using middleware + cont.batching.sched
  • Loading branch information
horheynm authored Mar 6, 2024
1 parent 14b59a8 commit acf190c
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 45 deletions.
137 changes: 93 additions & 44 deletions src/deepsparse/benchmark/benchmark_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,50 +308,16 @@ def benchmark_pipeline(
num_streams=num_streams,
**kwargs,
)
inputs = create_input_schema(pipeline, input_type, batch_size, config)

def _clear_measurements():
# Helper method to handle variations between v1 and v2 timers
if hasattr(pipeline.timer_manager, "clear"):
pipeline.timer_manager.clear()
else:
pipeline.timer_manager.measurements.clear()

if scenario == "singlestream":
singlestream_benchmark(pipeline, inputs, warmup_time)
_clear_measurements()
start_time = time.perf_counter()
singlestream_benchmark(pipeline, inputs, seconds_to_run)
elif scenario == "multistream":
multistream_benchmark(pipeline, inputs, warmup_time, num_streams)
_clear_measurements()
start_time = time.perf_counter()
multistream_benchmark(pipeline, inputs, seconds_to_run, num_streams)
elif scenario == "elastic":
multistream_benchmark(pipeline, inputs, warmup_time, num_streams)
_clear_measurements()
start_time = time.perf_counter()
multistream_benchmark(pipeline, inputs, seconds_to_run, num_streams)
else:
raise Exception(f"Unknown scenario '{scenario}'")

end_time = time.perf_counter()
total_run_time = end_time - start_time
if hasattr(pipeline.timer_manager, "all_times"):
batch_times = pipeline.timer_manager.all_times
else:
batch_times = pipeline.timer_manager.measurements
if len(batch_times) == 0:
raise Exception(
"Generated no batch timings, try extending benchmark time with '--time'"
)

if SupportedTasks.is_text_generation(task) or SupportedTasks.is_code_generation(
task
):
kwargs.pop("middleware_manager")

return batch_times, total_run_time, num_streams
return run(
pipeline,
input_type,
batch_size,
config,
warmup_time,
seconds_to_run,
num_streams,
scenario,
)


def calculate_statistics(
Expand Down Expand Up @@ -406,6 +372,89 @@ def _get_statistics(batch_times):
return sections, all_sections


def benchmark_from_pipeline(
pipeline: Pipeline,
batch_size: int = 1,
seconds_to_run: int = 10,
warmup_time: int = 2,
thread_pinning: str = "core",
scenario: str = "sync",
num_streams: int = 1,
data_type: str = "dummy",
**kwargs,
):
decide_thread_pinning(thread_pinning)
scenario = parse_scenario(scenario.lower())

input_type = data_type

config = PipelineBenchmarkConfig(
data_type=data_type,
**kwargs,
)
return run(
pipeline,
input_type,
batch_size,
config,
warmup_time,
seconds_to_run,
num_streams,
scenario,
)


def run(
pipeline: Pipeline,
input_type: str,
batch_size: int,
config: PipelineBenchmarkConfig,
warmup_time: int,
seconds_to_run: int,
num_streams: int,
scenario: str,
):
inputs = create_input_schema(pipeline, input_type, batch_size, config)

def _clear_measurements():
# Helper method to handle variations between v1 and v2 timers
if hasattr(pipeline.timer_manager, "clear"):
pipeline.timer_manager.clear()
else:
pipeline.timer_manager.measurements.clear()

if scenario == "singlestream":
singlestream_benchmark(pipeline, inputs, warmup_time)
_clear_measurements()
start_time = time.perf_counter()
singlestream_benchmark(pipeline, inputs, seconds_to_run)
elif scenario == "multistream":
multistream_benchmark(pipeline, inputs, warmup_time, num_streams)
_clear_measurements()
start_time = time.perf_counter()
multistream_benchmark(pipeline, inputs, seconds_to_run, num_streams)
elif scenario == "elastic":
multistream_benchmark(pipeline, inputs, warmup_time, num_streams)
_clear_measurements()
start_time = time.perf_counter()
multistream_benchmark(pipeline, inputs, seconds_to_run, num_streams)
else:
raise Exception(f"Unknown scenario '{scenario}'")

end_time = time.perf_counter()
total_run_time = end_time - start_time
if hasattr(pipeline.timer_manager, "all_times"):
batch_times = pipeline.timer_manager.all_times
else:
batch_times = pipeline.timer_manager.measurements
if len(batch_times) == 0:
raise Exception(
"Generated no batch timings, try extending benchmark time with '--time'"
)

return batch_times, total_run_time, num_streams


@click.command()
@click.argument("task_name", type=str)
@click.argument("model_path", type=str)
Expand Down
34 changes: 34 additions & 0 deletions src/deepsparse/benchmark/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,37 @@ class PipelineBenchmarkConfig(BaseModel):
default={},
description=("Additional arguments passed to input schema creations "),
)


class PipelineBenchmarkServerConfig(PipelineBenchmarkConfig):
batch_size: int = Field(
default=1,
description="The batch size of the inputs to be used with the engine",
)
seconds_to_run: int = Field(
default=10,
description="The number of seconds to run benchmark for",
)
warmup_time: int = Field(
default=2,
description="The length to run pipeline before beginning benchmark",
)
thread_pinning: str = Field(
default="core",
description="To enable binding threads to cores",
)
scenario: str = Field(
default="sync",
description=(
"`BenchmarkScenario` object with specification for running "
"benchmark on an onnx model"
),
)
num_streams: int = Field(
default=1,
description=(
" The max number of requests the model can handle "
"concurrently. None or 0 implies a scheduler-defined default value; "
"default None"
),
)
8 changes: 7 additions & 1 deletion src/deepsparse/middlewares/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ class MiddlewareManager:
:param _lock: lock for the state
"""

def __init__(self, middleware: Optional[Sequence[MiddlewareSpec]], *args, **kwargs):
def __init__(
self, middleware: Optional[Sequence[MiddlewareSpec]] = None, *args, **kwargs
):

self.middleware: Optional[
Sequence[MiddlewareSpec]
Expand Down Expand Up @@ -172,3 +174,7 @@ def _update_middleware_spec_send(
next_middleware.send = self.recieve

self.middleware.append(MiddlewareSpec(next_middleware, **init_args))

@property
def middlewares(self):
return [middleware.cls for middleware in self.middleware]
10 changes: 10 additions & 0 deletions src/deepsparse/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,16 @@ def validate(self):
elif isinstance(router_validation, str):
raise ValueError(f"Invalid Router for operators: {router_validation}")

if (
self.middleware_manager is not None
and self._continuous_batching_scheduler is not None
):
_LOGGER.warning(
"Middleware is yet to be supported using continous batching scheduler. "
"Either remove middleware or remove continous batching scheduler "
"in the instantiation of the Pipeline class"
)

def run_func(
self,
*args,
Expand Down
4 changes: 4 additions & 0 deletions src/deepsparse/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ class PipelineConfig(BaseModel):
"with multiple models. Default is None"
),
)
middlewares: Optional[List[str]] = Field(
default=None,
description="Middlewares to use",
)
kwargs: Optional[Dict[str, Any]] = Field(
default={},
description=(
Expand Down
4 changes: 4 additions & 0 deletions src/deepsparse/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ class EndpointConfig(BaseModel):
"```\n"
),
)
middlewares: Optional[List[str]] = Field(
default=None, description=("Middleware to use")
)

kwargs: Dict[str, Any] = Field(
default={}, description="Additional arguments to pass to the Pipeline"
Expand All @@ -147,6 +150,7 @@ def to_pipeline_config(self) -> PipelineConfig:
num_cores=None, # this will be set from Context
alias=self.name,
input_shapes=input_shapes,
middlewares=self.middlewares,
kwargs=kwargs,
)

Expand Down
69 changes: 69 additions & 0 deletions src/deepsparse/server/deepsparse_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from functools import partial

from deepsparse import Pipeline
from deepsparse.middlewares import MiddlewareManager, MiddlewareSpec, TimerMiddleware
from deepsparse.server.config import EndpointConfig
from deepsparse.server.server import CheckReady, ModelMetaData, ProxyPipeline, Server
from deepsparse.tasks import SupportedTasks
Expand Down Expand Up @@ -105,6 +106,11 @@ def _add_endpoint(
endpoint_config,
pipeline,
)
self._add_benchmark_endpoints(
app,
endpoint_config,
pipeline,
)
self._add_status_and_metadata_endpoints(app, endpoint_config, pipeline)

def _add_status_and_metadata_endpoints(
Expand Down Expand Up @@ -199,3 +205,66 @@ def _add_inference_endpoints(
methods=["POST"],
tags=["model", "inference"],
)

def _add_benchmark_endpoints(
self,
app: FastAPI,
endpoint_config: EndpointConfig,
pipeline: Pipeline,
):
if not hasattr(pipeline, "middleware_mamanger"):
pipeline.middleware_manager = MiddlewareManager()
if TimerMiddleware not in pipeline.middleware_manager.middlewares:
pipeline.middleware_manager.add_middleware(
[MiddlewareSpec(TimerMiddleware)]
)

routes_and_fns = []
if endpoint_config.route:
endpoint_config.route = self.clean_up_route(endpoint_config.route)
route = f"/v2/models{endpoint_config.route}/benchmark"
else:
route = f"/v2/models/{endpoint_config.name}/benchmark"

routes_and_fns.append(
(
route,
partial(
Server.benchmark,
ProxyPipeline(pipeline),
self.server_config.system_logging,
),
)
)

legacy_pipeline = not isinstance(pipeline, Pipeline) and hasattr(
pipeline.input_schema, "from_files"
)
# New pipelines do not have to have an input_schema. Just checking task
# names for now but can keep a list of supported from_files tasks in
# SupportedTasks as more pipelines are migrated as well as output schemas.
new_pipeline = SupportedTasks.is_image_classification(endpoint_config.task)

if legacy_pipeline or new_pipeline:
routes_and_fns.append(
(
route + "/from_files",
partial(
Server.predict_from_files,
ProxyPipeline(pipeline),
self.server_config.system_logging,
),
)
)
if isinstance(pipeline, Pipeline):
response_model = None
else:
response_model = pipeline.output_schema

self._update_routes(
app=app,
routes_and_fns=routes_and_fns,
response_model=response_model,
methods=["POST"],
tags=["model", "pipeline"],
)
16 changes: 16 additions & 0 deletions src/deepsparse/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from pydantic import BaseModel

import uvicorn
from deepsparse.benchmark.benchmark_pipeline import benchmark_from_pipeline
from deepsparse.benchmark.config import PipelineBenchmarkConfig
from deepsparse.engine import Context
from deepsparse.pipeline import Pipeline
from deepsparse.server.config import ServerConfig, SystemLoggingConfig
Expand Down Expand Up @@ -268,6 +270,20 @@ async def format_response():

return prep_for_serialization(pipeline_outputs)

@staticmethod
async def benchmark(
proxy_pipeline: ProxyPipeline,
system_logging_config: SystemLoggingConfig,
raw_request: Request,
):
json_params = await raw_request.json()
benchmark_config = PipelineBenchmarkConfig(**json_params)
results = benchmark_from_pipeline(
pipeline=proxy_pipeline.pipeline, **benchmark_config.dict()
)

return results

@staticmethod
async def predict_from_files(
proxy_pipeline: ProxyPipeline,
Expand Down
Loading

0 comments on commit acf190c

Please sign in to comment.