Skip to content

Commit

Permalink
switch to actor
Browse files Browse the repository at this point in the history
Signed-off-by: SumanthRH <sumanthrh@anyscale.com>
  • Loading branch information
SumanthRH committed Feb 27, 2025
1 parent f55909f commit 5dd5978
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 32 deletions.
8 changes: 5 additions & 3 deletions recipes/sky-t1-preview/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@
Now, try to solve the following question through the above guidelines:"


def convert_to_sharegpt_format(row: Dict[str, Any]):
prompt = row["user_input"]
def convert_to_sharegpt_format(row: Dict[str, Any], prompt_column, response_column):
prompt = row[prompt_column]
# accept
# Create the conversation format
conversations = [
{"from": "user", "value": prompt},
{
"from": "assistant",
"value": row["formatted_response"],
"value": row[response_column],
},
]

# Prepare the final structure
cur_data = {
"system": STILL2_SYSTEM_PROMPT,
"conversations": conversations,
# TODO: remove this
**row,
}

return cur_data
107 changes: 78 additions & 29 deletions recipes/sky-t1-preview/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
SYSTEM_PROMPT = "You are a helpful and harmless assistant. You are Qwen developed by Alibaba. You should think step-by-step." # noqa: E501
MAX_TOKENS = 16384
# 1. Load datasets
apps_ds = datasets.load_dataset(
"codeparrot/apps",
split="test",
apps_ds = datasets.load_dataset("codeparrot/apps", split="test", trust_remote_code=True)
taco_ds_medium = datasets.load_dataset(
"BAAI/TACO", split="test", name="MEDIUM", trust_remote_code=True
)
numina_ds = datasets.load_dataset(
"AI-MO/NuminaMath-CoT", split="train", trust_remote_code=True
)
taco_ds_medium = datasets.load_dataset("BAAI/TACO", split="test", name="MEDIUM")
numina_ds = datasets.load_dataset("AI-MO/NuminaMath-CoT", split="train")

# convert all to ray dataset
apps_ds = ray.data.from_huggingface(apps_ds)
Expand Down Expand Up @@ -75,38 +76,65 @@
]

# these are user-defined simple preprocessing functions to go from entry -> prompt
preprocess_fns = [
APPSPreprocessor(),
TACOPreprocessor(),
NUMINAPreprocessor(),
NUMINAPreprocessor(),
NUMINAPreprocessor(),
preprocessors = [
APPSPreprocessor,
TACOPreprocessor,
NUMINAPreprocessor,
NUMINAPreprocessor,
NUMINAPreprocessor,
]

numina_scorer = MathEqualScorer(
response_column="formatted_response", answer_column="solution"
)
scorers = [
APPSScorer(response_column="formatted_response"),
TACOScorer(response_column="formatted_response", backend="ray"),
numina_scorer,
numina_scorer,
numina_scorer,
]
dataset_names = ["apps", "taco", "numina_amc_aime", "numina_math", "numina_olympiads"]
scorer_configs = [
dict(
cls=APPSScorer, fn_constructor_kwargs=dict(response_column="formatted_response")
),
dict(
cls=TACOScorer,
fn_constructor_kwargs=dict(response_column="formatted_response", backend="ray"),
),
dict(
cls=MathEqualScorer,
fn_constructor_kwargs=dict(
response_column="formatted_response", answer_column="solution"
),
),
dict(
cls=MathEqualScorer,
fn_constructor_kwargs=dict(
response_column="formatted_response", answer_column="solution"
),
),
dict(
cls=MathEqualScorer,
fn_constructor_kwargs=dict(
response_column="formatted_response", answer_column="solution"
),
),
]

for i, ds in enumerate(datasets):
datasets[i] = ds.map(preprocess_fns[i])
if i < 1:
continue
# 1. Preprocess and get model prompts
preprocess_cls = preprocessors[i]
datasets[i] = ds.map(
preprocess_cls,
concurrency=5,
)

# 2. Get model responses

config = vLLMEngineProcessorConfig(
model="Qwen/QwQ-32B-Preview",
# model="Qwen/Qwen2-0.5B-Instruct",
# model="Qwen/QwQ-32B-Preview",
model="Qwen/Qwen2-0.5B-Instruct",
engine_kwargs=dict(
enable_prefix_caching=True,
enable_chunked_prefill=True,
max_num_batched_tokens=16384,
),
concurrency=2,
batch_size=64,
batch_size=20,
)

processor = build_llm_processor(
Expand All @@ -118,7 +146,7 @@
],
sampling_params=dict(
temperature=0.3,
max_tokens=20,
max_tokens=MAX_TOKENS,
detokenize=False,
),
),
Expand All @@ -130,13 +158,15 @@
datasets[i] = processor(datasets[i])

# 3. Reformat the examples into a structured format

# define a configuration for the reformatter
config = HttpRequestProcessorConfig(
url="https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}"},
# number of processors to run in parallel
# Each handles a batch of requests
concurrency=1,
batch_size=64,
)
# define the reformatter
reformatter = build_llm_processor(
Expand Down Expand Up @@ -170,14 +200,33 @@
datasets[i] = reformatter(datasets[i])

# 4. Rejection Sampling based on scoring
datasets[i] = datasets[i].map(scorers[i])
score_column = scorers[i].SCORE_COLUMN
scorer_cls, fn_constructor_kwargs = (
scorer_configs[i]["cls"],
scorer_configs[i]["fn_constructor_kwargs"],
)
datasets[i] = datasets[i].map(
scorer_cls, concurrency=4, fn_constructor_kwargs=fn_constructor_kwargs
)
score_column = scorer_cls.SCORE_COLUMN
datasets[i] = datasets[i].filter(lambda x, sc=score_column: x[sc])

# 5. Convert to ShareGPT format
datasets[i] = datasets[i].map(convert_to_sharegpt_format)
datasets[i] = datasets[i].map(
convert_to_sharegpt_format,
fn_kwargs=dict(
prompt_column="user_input", response_column="formatted_response"
),
)

# 6. Save datasets
dir_name = f"data/sky-t1-preview-{dataset_names[i]}"
datasets[i] = datasets[i].materialize()
datasets[i].write_json(os.path.abspath(dir_name))


# 7. Union

# final_dataset = datasets[0].union(*datasets[1:])
# dir_name = f"data/sky-t1-preview-full"
# # save in folder as a single JSON file
# final_dataset.repartition(1).write_json(os.path.abspath(dir_name))

0 comments on commit 5dd5978

Please sign in to comment.