From 5397347718ca77d87a653fcee0d37a2724adb57a Mon Sep 17 00:00:00 2001 From: Huy Do Date: Fri, 15 Nov 2024 11:30:50 -0800 Subject: [PATCH] Populate the benchmark metadata (#5918) To ease the process of gathering the benchmark metadata before uploading the the database, I'm adding a script `.github/scripts/benchmarks/gather_metadata.py` to gather this information and pass it to the upload script. From https://github.com/pytorch/test-infra/pull/5839, the benchmark metadata includes the following required fields: ``` -- Metadata `timestamp` UInt64, `schema_version` String DEFAULT 'v3', `name` String, -- About the change `repo` String DEFAULT 'pytorch/pytorch', `head_branch` String, `head_sha` String, `workflow_id` UInt64, `run_attempt` UInt32, `job_id` UInt64, -- The raw records on S3 `s3_path` String, ``` I'm going to test this out with PT2 compiler instruction count benchmark at https://github.com/pytorch/pytorch/pull/140493 ### Testing https://github.com/pytorch/test-infra/actions/runs/11831746632/job/32967412160?pr=5918#step:5:105 gathers the metadata and upload the benchmark results correctly Also, an actual upload at https://github.com/pytorch/pytorch/actions/runs/11831781500/job/33006545698#step:24:138 --- .../upload-benchmark-results/action.yml | 78 ++++++- .../v3/add_loop_eager.json | 1 + .../v3/add_loop_eager_dynamic.json | 1 + .../v3/add_loop_inductor_dynamic_gpu.json | 1 + .github/scripts/benchmarks/gather_metadata.py | 98 +++++++++ .github/scripts/upload_benchmark_results.py | 208 ++++++++++++++---- .../test_upload_benchmark_results.yml | 1 + 7 files changed, 342 insertions(+), 46 deletions(-) create mode 100644 .github/scripts/benchmark-results-dir-for-testing/v3/add_loop_eager.json create mode 100644 .github/scripts/benchmark-results-dir-for-testing/v3/add_loop_eager_dynamic.json create mode 100644 .github/scripts/benchmark-results-dir-for-testing/v3/add_loop_inductor_dynamic_gpu.json create mode 100755 .github/scripts/benchmarks/gather_metadata.py diff --git a/.github/actions/upload-benchmark-results/action.yml b/.github/actions/upload-benchmark-results/action.yml index c5a6ae8a6d..1f202fe423 100644 --- a/.github/actions/upload-benchmark-results/action.yml +++ b/.github/actions/upload-benchmark-results/action.yml @@ -9,6 +9,8 @@ inputs: # TODO (huydhn): Use this to gate the migration to oss_ci_benchmark_v3 on S3 schema-version: default: 'v2' + github-token: + default: '' runs: using: composite @@ -19,22 +21,92 @@ runs: set -eux python3 -mpip install boto3==1.35.33 + - name: Check that GITHUB_TOKEN is defined + if: ${{ inputs.schema-version != 'v2' }} + env: + GITHUB_TOKEN: ${{ inputs.github-token }} + shell: bash + run: | + set -eux + + if [[ -z "${GITHUB_TOKEN}" ]]; then + echo "Missing github-token input" + exit 1 + fi + + - name: Get workflow job id + if: ${{ inputs.github-token != '' }} + id: get-job-id + uses: pytorch/test-infra/.github/actions/get-workflow-job-id@main + with: + github-token: ${{ inputs.github-token }} + + - name: Gather the metadata + id: gather-metadata + shell: bash + env: + SCHEMA_VERSION: ${{ inputs.schema-version }} + REPO: ${{ github.repository }} + HEAD_BRANCH: ${{ github.head_ref }} + HEAD_SHA: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.sha || github.sha }} + WORKFLOW_RUN_ID: ${{ github.run_id }} + RUN_ATTEMPT: ${{ github.run_attempt }} + JOB_ID: ${{ inputs.github-token != '' && steps.get-job-id.outputs.job-id || '0' }} + JOB_NAME: ${{ inputs.github-token != '' && steps.get-job-id.outputs.job-name || '' }} + run: | + set -eux + + python3 "${GITHUB_ACTION_PATH}/../../scripts/benchmarks/gather_metadata.py" \ + --schema-version "${SCHEMA_VERSION}" \ + --repo "${REPO}" \ + --head-branch "${HEAD_BRANCH}" \ + --head-sha "${HEAD_SHA}" \ + --workflow-id "${WORKFLOW_RUN_ID}" \ + --run-attempt "${RUN_ATTEMPT}" \ + --job-id "${JOB_ID}" \ + --job-name "${JOB_NAME}" + + - name: Gather the runner information + id: gather-runner-info + shell: bash + run: | + set -eux + + # TODO (huydhn): Implement this part + echo "runners=[]" >> "${GITHUB_OUTPUT}" + + - name: Gather the dependencies information + id: gather-dependencies + shell: bash + run: | + set -eux + + # TODO (huydhn): Implement this part + echo "dependencies={}" >> "${GITHUB_OUTPUT}" + - name: Upload benchmark results shell: bash env: BENCHMARK_RESULTS_DIR: ${{ inputs.benchmark-results-dir }} DRY_RUN: ${{ inputs.dry-run }} - SCHEMA_VERSION: ${{ inputs.schema-version }} + # Additional information about the benchmarks + BENCHMARK_METADATA: ${{ steps.gather-metadata.outputs.metadata }} + RUNNER_INFO: ${{ steps.gather-runner-info.outputs.runners }} + DEPENDENCIES: ${{ steps.gather-dependencies.outputs.dependencies }} run: | set -eux if [[ "${DRY_RUN}" == "true" ]]; then python3 "${GITHUB_ACTION_PATH}/../../scripts/upload_benchmark_results.py" \ --benchmark-results-dir "${BENCHMARK_RESULTS_DIR}" \ - --schema-version "${SCHEMA_VERSION}" \ + --metadata "${BENCHMARK_METADATA}" \ + --runners "${RUNNER_INFO}" \ + --dependencies "${DEPENDENCIES}" \ --dry-run else python3 "${GITHUB_ACTION_PATH}/../../scripts/upload_benchmark_results.py" \ --benchmark-results-dir "${BENCHMARK_RESULTS_DIR}" \ - --schema-version "${SCHEMA_VERSION}" + --metadata "${BENCHMARK_METADATA}" \ + --runners "${RUNNER_INFO}" \ + --dependencies "${DEPENDENCIES}" fi diff --git a/.github/scripts/benchmark-results-dir-for-testing/v3/add_loop_eager.json b/.github/scripts/benchmark-results-dir-for-testing/v3/add_loop_eager.json new file mode 100644 index 0000000000..23d556005c --- /dev/null +++ b/.github/scripts/benchmark-results-dir-for-testing/v3/add_loop_eager.json @@ -0,0 +1 @@ +[{"benchmark": {"name": "pr_time_benchmarks", "extra_info": {"is_dynamic": false, "device": "cpu", "description": "a loop over 100 add node"}}, "model": {"name": "add_loop_eager", "type": "add_loop", "backend": "eager"}, "metric": {"name": "compile_time_instruction_count", "benchmark_values": [3086359081]}}] \ No newline at end of file diff --git a/.github/scripts/benchmark-results-dir-for-testing/v3/add_loop_eager_dynamic.json b/.github/scripts/benchmark-results-dir-for-testing/v3/add_loop_eager_dynamic.json new file mode 100644 index 0000000000..9297ab315f --- /dev/null +++ b/.github/scripts/benchmark-results-dir-for-testing/v3/add_loop_eager_dynamic.json @@ -0,0 +1 @@ +[{"benchmark": {"name": "pr_time_benchmarks", "extra_info": {"is_dynamic": true, "device": "cpu", "description": "a loop over 100 add node"}}, "model": {"name": "add_loop_eager_dynamic", "type": "add_loop", "backend": "eager"}, "metric": {"name": "compile_time_instruction_count", "benchmark_values": [5712213247]}}] \ No newline at end of file diff --git a/.github/scripts/benchmark-results-dir-for-testing/v3/add_loop_inductor_dynamic_gpu.json b/.github/scripts/benchmark-results-dir-for-testing/v3/add_loop_inductor_dynamic_gpu.json new file mode 100644 index 0000000000..9bc09c8c57 --- /dev/null +++ b/.github/scripts/benchmark-results-dir-for-testing/v3/add_loop_inductor_dynamic_gpu.json @@ -0,0 +1 @@ +[{"benchmark": {"name": "pr_time_benchmarks", "extra_info": {"is_dynamic": true, "device": "cuda", "description": "a loop over 100 add node"}}, "model": {"name": "add_loop_inductor_dynamic_gpu", "type": "add_loop", "backend": "inductor"}, "metric": {"name": "compile_time_instruction_count", "benchmark_values": [40859830085]}}] \ No newline at end of file diff --git a/.github/scripts/benchmarks/gather_metadata.py b/.github/scripts/benchmarks/gather_metadata.py new file mode 100755 index 0000000000..50011fb3f7 --- /dev/null +++ b/.github/scripts/benchmarks/gather_metadata.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import os +import json +import time +from typing import Any + + +def parse_args() -> Any: + from argparse import ArgumentParser + + parser = ArgumentParser("gather some metadata about the benchmark") + # v3 is defined at torchci/clickhouse_queries/oss_ci_benchmark_v3/query.sql + parser.add_argument( + "--schema-version", + choices=["v2", "v3"], + required=True, + help="the database schema to use", + ) + parser.add_argument( + "--repo", + type=str, + required=True, + help="the name of repository where the benchmark is run", + ) + parser.add_argument( + "--head-branch", + type=str, + required=True, + help="the name of branch where the benchmark is run", + ) + parser.add_argument( + "--head-sha", + type=str, + required=True, + help="the commit that the benchmark uses", + ) + parser.add_argument( + "--workflow-id", + type=int, + required=True, + help="the benchmark workflow id", + ) + parser.add_argument( + "--run-attempt", + type=int, + default=1, + help="the workflow run attempt", + ) + parser.add_argument( + "--job-id", + type=int, + required=True, + help="the benchmark job id", + ) + parser.add_argument( + "--job-name", + type=str, + required=True, + help="the benchmark job name", + ) + + return parser.parse_args() + + +def set_output(name: str, val: Any) -> None: + if os.getenv("GITHUB_OUTPUT"): + with open(str(os.getenv("GITHUB_OUTPUT")), "a") as env: + print(f"{name}={val}", file=env) + else: + print(f"::set-output name={name}::{val}") + + +def main() -> None: + args = parse_args() + + # From https://github.com/pytorch/test-infra/pull/5839 + metadata = { + "timestamp": int(time.time()), + "schema_version": args.schema_version, + "name": args.job_name, + "repo": args.repo, + "head_branch": args.head_branch, + "head_sha": args.head_sha, + "workflow_id": args.workflow_id, + "run_attempt": args.run_attempt, + "job_id": args.job_id, + } + set_output("metadata", json.dumps(metadata)) + + +if __name__ == "__main__": + main() diff --git a/.github/scripts/upload_benchmark_results.py b/.github/scripts/upload_benchmark_results.py index 3c0ce212bd..1725091538 100755 --- a/.github/scripts/upload_benchmark_results.py +++ b/.github/scripts/upload_benchmark_results.py @@ -13,9 +13,11 @@ import time from argparse import Action, ArgumentParser, Namespace from decimal import Decimal +from json.decoder import JSONDecodeError from logging import info from typing import Any, Callable, Dict, List, Optional +from warnings import warn import boto3 @@ -40,6 +42,57 @@ def __call__( parser.error(f"{values} is not a valid directory") +class ValidateMetadata(Action): + def __call__( + self, + parser: ArgumentParser, + namespace: Namespace, + values: Any, + option_string: Optional[str] = None, + ) -> None: + try: + decoded_values = json.loads(values) + except JSONDecodeError: + parser.error(f"{values} is not a valid JSON") + return + + if all( + k in decoded_values + for k in ( + "timestamp", + "schema_version", + "name", + "repo", + "head_branch", + "head_sha", + "workflow_id", + "run_attempt", + "job_id", + ) + ): + setattr(namespace, self.dest, decoded_values) + return + + parser.error(f"{values} is not a valid benchmark metadata") + + +class ValidateJSON(Action): + def __call__( + self, + parser: ArgumentParser, + namespace: Namespace, + values: Any, + option_string: Optional[str] = None, + ) -> None: + try: + decoded_values = json.loads(values) + except JSONDecodeError: + parser.error(f"{values} is not a valid JSON") + return + + setattr(namespace, self.dest, decoded_values) + + def parse_args() -> Any: from argparse import ArgumentParser @@ -61,12 +114,26 @@ def parse_args() -> Any: default="torchci-oss-ci-benchmark", help="the name of the DynamoDB table to upload to", ) - # v3 is defined at torchci/clickhouse_queries/oss_ci_benchmark_v3/query.sql parser.add_argument( - "--schema-version", - choices=["v2", "v3"], + "--metadata", + type=str, required=True, - help="the database schema to use", + action=ValidateMetadata, + help="the metadata to use in JSON format", + ) + parser.add_argument( + "--runners", + type=str, + default=json.dumps([]), + action=ValidateJSON, + help="the information about the benchmark runners in JSON format", + ) + parser.add_argument( + "--dependencies", + type=str, + default=json.dumps({}), + action=ValidateJSON, + help="the information about the benchmark dependencies in JSON format", ) return parser.parse_args() @@ -116,34 +183,73 @@ def upload_to_dynamodb( batch.put_item(Item=doc) -def generate_s3_path(filepath: str, schema_version: str) -> Optional[str]: +def process_benchmark_results( + filepath: str, + metadata: Dict[str, Any], + runners: List[Any], + dependencies: Dict[str, Any], +) -> List[Dict[str, Any]]: with open(filepath) as f: - docs = json.load(f) - - if not docs: - info(f"{filepath} is empty") - return "" - - for doc in docs: - repo = doc.get("repo", "") - workflow_id = doc.get("workflow_id", 0) - job_id = doc.get("job_id", 0) - servicelab_experiment_id = doc.get("servicelab_experiment_id", 0) - servicelab_trial_id = doc.get("servicelab_trial_id", 0) - - # Also handle service lab records here - workflow_id = workflow_id if workflow_id else servicelab_experiment_id - job_id = job_id if job_id else servicelab_trial_id - - # We just need one record here to get some metadata to generate the s3 path - if repo and workflow_id and job_id: - break - - if not repo or not workflow_id or not job_id: - info( - f"{filepath} is without any information about the repo, workflow, or job id" - ) - return "" + try: + benchmark_results = json.load(f) + except JSONDecodeError: + warn(f"Invalid JSON file {filepath}, skipping") + return [] + + if not isinstance(benchmark_results, (list, tuple)): + return [] + + processed_benchmark_results: List[Dict[str, Any]] = [] + for result in benchmark_results: + # This is a required field + if "metric" not in result: + warn(f"{result} is not a benchmark record, skipping") + continue + + record: Dict[str, Any] = {**metadata, **result} + # Gather all the information about the benchmark + if "runners" not in record: + record["runners"] = runners + if "dependencies" not in record: + record["dependencies"] = dependencies + + processed_benchmark_results.append(record) + return processed_benchmark_results + + +def generate_s3_path( + benchmark_results: List[Dict[str, Any]], filepath: str, schema_version: str +) -> Optional[str]: + if not benchmark_results: + return None + + repo = "" + workflow_id = 0 + job_id = 0 + + for result in benchmark_results: + repo = result.get("repo", "") + if not repo: + continue + + workflow_id = result.get("workflow_id", 0) + job_id = result.get("job_id", 0) + servicelab_experiment_id = result.get("servicelab_experiment_id", 0) + servicelab_trial_id = result.get("servicelab_trial_id", 0) + + # Also handle service lab records here + workflow_id = workflow_id if workflow_id else servicelab_experiment_id + job_id = job_id if job_id else servicelab_trial_id + + # We just need one record here to get some metadata to generate the s3 path + if workflow_id and job_id: + break + + if not repo or not workflow_id or not job_id: + info( + "The result is without any information about the repo, workflow, or job id" + ) + return None filename = os.path.basename(filepath) return f"{schema_version}/{repo}/{workflow_id}/{job_id}/{filename}" @@ -153,39 +259,44 @@ def upload_to_s3( s3_bucket: str, filepath: str, schema_version: str, + benchmark_results: List[Dict[str, Any]], dry_run: bool = True, ) -> None: """ Upload the benchmark results to S3 """ - s3_path = generate_s3_path(filepath, schema_version) + s3_path = generate_s3_path(benchmark_results, filepath, schema_version) if not s3_path: info(f"Could not generate an S3 path for {filepath}, skipping...") return + # Populate the path to S3 + for result in benchmark_results: + result["s3_path"] = s3_path + info(f"Upload {filepath} to s3://{s3_bucket}/{s3_path}") if not dry_run: - # Copied from upload stats script - with open(filepath) as f: - boto3.resource("s3").Object( - f"{s3_bucket}", - f"{s3_path}", - ).put( - Body=gzip.compress(f.read().encode()), - ContentEncoding="gzip", - ContentType="application/json", - ) + # Write in JSONEachRow format + data = "\n".join([json.dumps(result) for result in benchmark_results]) + boto3.resource("s3").Object( + f"{s3_bucket}", + f"{s3_path}", + ).put( + Body=gzip.compress(data.encode()), + ContentEncoding="gzip", + ContentType="application/json", + ) def main() -> None: args = parse_args() - schema_version = args.schema_version for file in os.listdir(args.benchmark_results_dir): if not file.endswith(".json"): continue filepath = os.path.join(args.benchmark_results_dir, file) + schema_version = args.metadata["schema_version"] # NB: This is for backward compatibility before we move to schema v3 if schema_version == "v2": @@ -199,10 +310,21 @@ def main() -> None: dry_run=args.dry_run, ) + benchmark_results = process_benchmark_results( + filepath=filepath, + metadata=args.metadata, + runners=args.runners, + dependencies=args.dependencies, + ) + + if not benchmark_results: + return + upload_to_s3( s3_bucket=OSSCI_BENCHMARKS_BUCKET, filepath=filepath, schema_version=schema_version, + benchmark_results=benchmark_results, dry_run=args.dry_run, ) diff --git a/.github/workflows/test_upload_benchmark_results.yml b/.github/workflows/test_upload_benchmark_results.yml index f53d0e9995..15be9b16d7 100644 --- a/.github/workflows/test_upload_benchmark_results.yml +++ b/.github/workflows/test_upload_benchmark_results.yml @@ -26,3 +26,4 @@ jobs: benchmark-results-dir: .github/scripts/benchmark-results-dir-for-testing/v3 schema-version: v3 dry-run: true + github-token: ${{ secrets.GITHUB_TOKEN }}