From 5dd5978f7dea039c448c6b7d276cb54a2a1a8747 Mon Sep 17 00:00:00 2001 From: SumanthRH Date: Thu, 27 Feb 2025 11:59:37 -0800 Subject: [PATCH] switch to actor Signed-off-by: SumanthRH --- recipes/sky-t1-preview/postprocess.py | 8 +- recipes/sky-t1-preview/recipe.py | 107 +++++++++++++++++++------- 2 files changed, 83 insertions(+), 32 deletions(-) diff --git a/recipes/sky-t1-preview/postprocess.py b/recipes/sky-t1-preview/postprocess.py index a62986e..0a5d205 100644 --- a/recipes/sky-t1-preview/postprocess.py +++ b/recipes/sky-t1-preview/postprocess.py @@ -21,15 +21,15 @@ Now, try to solve the following question through the above guidelines:" -def convert_to_sharegpt_format(row: Dict[str, Any]): - prompt = row["user_input"] +def convert_to_sharegpt_format(row: Dict[str, Any], prompt_column, response_column): + prompt = row[prompt_column] # accept # Create the conversation format conversations = [ {"from": "user", "value": prompt}, { "from": "assistant", - "value": row["formatted_response"], + "value": row[response_column], }, ] @@ -37,6 +37,8 @@ def convert_to_sharegpt_format(row: Dict[str, Any]): cur_data = { "system": STILL2_SYSTEM_PROMPT, "conversations": conversations, + # TODO: remove this + **row, } return cur_data diff --git a/recipes/sky-t1-preview/recipe.py b/recipes/sky-t1-preview/recipe.py index 3ece0b8..47eb17a 100644 --- a/recipes/sky-t1-preview/recipe.py +++ b/recipes/sky-t1-preview/recipe.py @@ -33,12 +33,13 @@ SYSTEM_PROMPT = "You are a helpful and harmless assistant. You are Qwen developed by Alibaba. You should think step-by-step." # noqa: E501 MAX_TOKENS = 16384 # 1. Load datasets -apps_ds = datasets.load_dataset( - "codeparrot/apps", - split="test", +apps_ds = datasets.load_dataset("codeparrot/apps", split="test", trust_remote_code=True) +taco_ds_medium = datasets.load_dataset( + "BAAI/TACO", split="test", name="MEDIUM", trust_remote_code=True +) +numina_ds = datasets.load_dataset( + "AI-MO/NuminaMath-CoT", split="train", trust_remote_code=True ) -taco_ds_medium = datasets.load_dataset("BAAI/TACO", split="test", name="MEDIUM") -numina_ds = datasets.load_dataset("AI-MO/NuminaMath-CoT", split="train") # convert all to ray dataset apps_ds = ray.data.from_huggingface(apps_ds) @@ -75,38 +76,65 @@ ] # these are user-defined simple preprocessing functions to go from entry -> prompt -preprocess_fns = [ - APPSPreprocessor(), - TACOPreprocessor(), - NUMINAPreprocessor(), - NUMINAPreprocessor(), - NUMINAPreprocessor(), +preprocessors = [ + APPSPreprocessor, + TACOPreprocessor, + NUMINAPreprocessor, + NUMINAPreprocessor, + NUMINAPreprocessor, ] -numina_scorer = MathEqualScorer( - response_column="formatted_response", answer_column="solution" -) -scorers = [ - APPSScorer(response_column="formatted_response"), - TACOScorer(response_column="formatted_response", backend="ray"), - numina_scorer, - numina_scorer, - numina_scorer, -] dataset_names = ["apps", "taco", "numina_amc_aime", "numina_math", "numina_olympiads"] +scorer_configs = [ + dict( + cls=APPSScorer, fn_constructor_kwargs=dict(response_column="formatted_response") + ), + dict( + cls=TACOScorer, + fn_constructor_kwargs=dict(response_column="formatted_response", backend="ray"), + ), + dict( + cls=MathEqualScorer, + fn_constructor_kwargs=dict( + response_column="formatted_response", answer_column="solution" + ), + ), + dict( + cls=MathEqualScorer, + fn_constructor_kwargs=dict( + response_column="formatted_response", answer_column="solution" + ), + ), + dict( + cls=MathEqualScorer, + fn_constructor_kwargs=dict( + response_column="formatted_response", answer_column="solution" + ), + ), +] + for i, ds in enumerate(datasets): - datasets[i] = ds.map(preprocess_fns[i]) + if i < 1: + continue + # 1. Preprocess and get model prompts + preprocess_cls = preprocessors[i] + datasets[i] = ds.map( + preprocess_cls, + concurrency=5, + ) + + # 2. Get model responses config = vLLMEngineProcessorConfig( - model="Qwen/QwQ-32B-Preview", - # model="Qwen/Qwen2-0.5B-Instruct", + # model="Qwen/QwQ-32B-Preview", + model="Qwen/Qwen2-0.5B-Instruct", engine_kwargs=dict( enable_prefix_caching=True, enable_chunked_prefill=True, max_num_batched_tokens=16384, ), concurrency=2, - batch_size=64, + batch_size=20, ) processor = build_llm_processor( @@ -118,7 +146,7 @@ ], sampling_params=dict( temperature=0.3, - max_tokens=20, + max_tokens=MAX_TOKENS, detokenize=False, ), ), @@ -130,6 +158,7 @@ datasets[i] = processor(datasets[i]) # 3. Reformat the examples into a structured format + # define a configuration for the reformatter config = HttpRequestProcessorConfig( url="https://api.openai.com/v1/chat/completions", @@ -137,6 +166,7 @@ # number of processors to run in parallel # Each handles a batch of requests concurrency=1, + batch_size=64, ) # define the reformatter reformatter = build_llm_processor( @@ -170,14 +200,33 @@ datasets[i] = reformatter(datasets[i]) # 4. Rejection Sampling based on scoring - datasets[i] = datasets[i].map(scorers[i]) - score_column = scorers[i].SCORE_COLUMN + scorer_cls, fn_constructor_kwargs = ( + scorer_configs[i]["cls"], + scorer_configs[i]["fn_constructor_kwargs"], + ) + datasets[i] = datasets[i].map( + scorer_cls, concurrency=4, fn_constructor_kwargs=fn_constructor_kwargs + ) + score_column = scorer_cls.SCORE_COLUMN datasets[i] = datasets[i].filter(lambda x, sc=score_column: x[sc]) # 5. Convert to ShareGPT format - datasets[i] = datasets[i].map(convert_to_sharegpt_format) + datasets[i] = datasets[i].map( + convert_to_sharegpt_format, + fn_kwargs=dict( + prompt_column="user_input", response_column="formatted_response" + ), + ) # 6. Save datasets dir_name = f"data/sky-t1-preview-{dataset_names[i]}" datasets[i] = datasets[i].materialize() datasets[i].write_json(os.path.abspath(dir_name)) + + +# 7. Union + +# final_dataset = datasets[0].union(*datasets[1:]) +# dir_name = f"data/sky-t1-preview-full" +# # save in folder as a single JSON file +# final_dataset.repartition(1).write_json(os.path.abspath(dir_name))