Files
reasoning-gym/notebooks/codeio.ipynb
Zafir Stojanovski 4c637c3b13 final tweaks
2025-02-27 08:38:34 +01:00

720 lines
27 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import abc\n",
"import asyncio\n",
"from collections import defaultdict\n",
"import json\n",
"import os\n",
"import re\n",
"from typing import Union\n",
"\n",
"import aiohttp\n",
"import datasets\n",
"from dotenv import load_dotenv\n",
"import numpy as np\n",
"from sentence_transformers import SentenceTransformer\n",
"from tenacity import (\n",
" AsyncRetrying,\n",
" retry_if_exception_type,\n",
" stop_after_attempt,\n",
" wait_exponential,\n",
")\n",
"import torch\n",
"from tqdm.notebook import tqdm\n",
"from e2b_code_interpreter import Sandbox\n",
"from e2b import TimeoutException\n",
"\n",
"load_dotenv()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dataset = datasets.load_dataset(\"hkust-nlp/CodeIO-PyEdu-Reasoning\")['train']"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Extract the relevant parts of the prompt"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pattern = re.compile(\n",
" r'(?s)' # DOTALL so . matches newlines\n",
" r'You are given a question that requires some input and output variables as follows:\\s*(.*?)'\n",
" r'\\s*The input and output requirements are as follows:\\s*(.*?)'\n",
" r'\\s*Given the following.*?Tip: Here is a reference code snippet for this question\\. '\n",
" r'You can refer to this code to guide your reasoning but not copy spans of code directly\\.\\s*(.*)'\n",
")\n",
"\n",
"seen = set()\n",
"duplicate = 0\n",
"\n",
"with open(\"data/codeio-pyedu-extracted.jsonl\", \"w+\") as f:\n",
" for i, item in tqdm(enumerate(dataset), total=len(dataset)):\n",
" match = pattern.search(item[\"prompt\"])\n",
" if match:\n",
" # Extract relevant info\n",
" task_description = match.group(1).strip()\n",
" input_output_spec = match.group(2).strip()\n",
" code_sample = match.group(3).strip()\n",
"\n",
" # Check if code sample is unique\n",
" hash_entry = f\"{hash(task_description)}-{hash(input_output_spec)}-{hash(code_sample)}\"\n",
" if hash_entry in seen:\n",
" duplicate += 1\n",
" continue\n",
" seen.add(hash_entry)\n",
"\n",
" # Save to disk\n",
" json.dump({\n",
" \"task_description\": task_description,\n",
" \"input_output_spec\": input_output_spec,\n",
" \"code_sample\": code_sample\n",
" }, f)\n",
" f.write(\"\\n\")\n",
" else:\n",
" print(f\"No match found for item {i}\")\n",
"\n",
"print(f\"There were {duplicate} out of {len(dataset)} duplicate entries\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Subsample the data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class IdentitySampler:\n",
" def run(\n",
" self, features: Union[torch.Tensor, np.ndarray]\n",
" ) -> Union[torch.Tensor, np.ndarray]:\n",
" return features\n",
"\n",
"\n",
"class BaseSampler(abc.ABC):\n",
" def __init__(self, percentage: float):\n",
" if not 0 < percentage < 1:\n",
" raise ValueError(\"Percentage value not in (0, 1).\")\n",
" self.percentage = percentage\n",
"\n",
" @abc.abstractmethod\n",
" def run(\n",
" self, features: Union[torch.Tensor, np.ndarray]\n",
" ) -> Union[torch.Tensor, np.ndarray]:\n",
" pass\n",
"\n",
" def _store_type(self, features: Union[torch.Tensor, np.ndarray]) -> None:\n",
" self.features_is_numpy = isinstance(features, np.ndarray)\n",
" if not self.features_is_numpy:\n",
" self.features_device = features.device\n",
"\n",
" def _restore_type(self, features: torch.Tensor) -> Union[torch.Tensor, np.ndarray]:\n",
" if self.features_is_numpy:\n",
" return features.cpu().numpy()\n",
" return features.to(self.features_device)\n",
"\n",
"\n",
"class GreedyCoresetSampler(BaseSampler):\n",
" def __init__(\n",
" self,\n",
" percentage: float,\n",
" device: torch.device,\n",
" dtype: torch.dtype = torch.float32,\n",
" dimension_to_project_features_to=128,\n",
" ):\n",
" \"\"\"Greedy Coreset sampling base class.\"\"\"\n",
" super().__init__(percentage)\n",
"\n",
" self.device = device\n",
" self.dtype = dtype\n",
" self.dimension_to_project_features_to = dimension_to_project_features_to\n",
"\n",
" def _reduce_features(self, features):\n",
" if features.shape[1] == self.dimension_to_project_features_to:\n",
" return features\n",
" mapper = torch.nn.Linear(\n",
" features.shape[1], self.dimension_to_project_features_to, bias=False, dtype=self.dtype,\n",
" )\n",
" _ = mapper.to(self.device)\n",
" features = features.to(self.device)\n",
" return mapper(features)\n",
"\n",
" def run(\n",
" self, features: Union[torch.Tensor, np.ndarray]\n",
" ) -> Union[torch.Tensor, np.ndarray]:\n",
" \"\"\"Subsamples features using Greedy Coreset.\n",
"\n",
" Args:\n",
" features: [N x D]\n",
" \"\"\"\n",
" if self.percentage == 1:\n",
" return features\n",
" self._store_type(features)\n",
" if isinstance(features, np.ndarray):\n",
" features = torch.from_numpy(features)\n",
" reduced_features = self._reduce_features(features)\n",
" sample_indices = self._compute_greedy_coreset_indices(reduced_features)\n",
" return sample_indices\n",
"\n",
" @staticmethod\n",
" def _compute_batchwise_differences(\n",
" matrix_a: torch.Tensor, matrix_b: torch.Tensor\n",
" ) -> torch.Tensor:\n",
" \"\"\"Computes batchwise Euclidean distances using PyTorch.\"\"\"\n",
" a_times_a = matrix_a.unsqueeze(1).bmm(matrix_a.unsqueeze(2)).reshape(-1, 1)\n",
" b_times_b = matrix_b.unsqueeze(1).bmm(matrix_b.unsqueeze(2)).reshape(1, -1)\n",
" a_times_b = matrix_a.mm(matrix_b.T)\n",
"\n",
" return (-2 * a_times_b + a_times_a + b_times_b).clamp(0, None).sqrt()\n",
"\n",
" def _compute_greedy_coreset_indices(self, features: torch.Tensor) -> np.ndarray:\n",
" \"\"\"Runs iterative greedy coreset selection.\n",
"\n",
" Args:\n",
" features: [NxD] input feature bank to sample.\n",
" \"\"\"\n",
" distance_matrix = self._compute_batchwise_differences(features, features)\n",
" coreset_anchor_distances = torch.norm(distance_matrix, dim=1)\n",
"\n",
" coreset_indices = []\n",
" num_coreset_samples = int(len(features) * self.percentage)\n",
"\n",
" for _ in range(num_coreset_samples):\n",
" select_idx = torch.argmax(coreset_anchor_distances).item()\n",
" coreset_indices.append(select_idx)\n",
"\n",
" coreset_select_distance = distance_matrix[\n",
" :, select_idx : select_idx + 1 # noqa E203\n",
" ]\n",
" coreset_anchor_distances = torch.cat(\n",
" [coreset_anchor_distances.unsqueeze(-1), coreset_select_distance], dim=1\n",
" )\n",
" coreset_anchor_distances = torch.min(coreset_anchor_distances, dim=1).values\n",
"\n",
" return torch.tensor(coreset_indices, device=features.device, dtype=torch.int64)\n",
"\n",
"\n",
"class ApproximateGreedyCoresetSampler(GreedyCoresetSampler):\n",
" def __init__(\n",
" self,\n",
" percentage: float,\n",
" device: torch.device,\n",
" dtype: torch.dtype = torch.float32,\n",
" number_of_starting_points: int = 10,\n",
" dimension_to_project_features_to: int = 128,\n",
" ):\n",
" \"\"\"Approximate Greedy Coreset sampling base class.\"\"\"\n",
" self.number_of_starting_points = number_of_starting_points\n",
" super().__init__(percentage, device, dtype, dimension_to_project_features_to)\n",
"\n",
" def _compute_greedy_coreset_indices(self, features: torch.Tensor) -> np.ndarray:\n",
" \"\"\"Runs approximate iterative greedy coreset selection.\n",
"\n",
" This greedy coreset implementation does not require computation of the\n",
" full N x N distance matrix and thus requires a lot less memory, however\n",
" at the cost of increased sampling times.\n",
"\n",
" Args:\n",
" features: [NxD] input feature bank to sample.\n",
" \"\"\"\n",
" number_of_starting_points = np.clip(\n",
" self.number_of_starting_points, None, len(features)\n",
" )\n",
" start_points = np.random.choice(\n",
" len(features), number_of_starting_points, replace=False\n",
" ).tolist()\n",
"\n",
" approximate_distance_matrix = self._compute_batchwise_differences(\n",
" features, features[start_points]\n",
" )\n",
" approximate_coreset_anchor_distances = torch.mean(\n",
" approximate_distance_matrix, axis=-1\n",
" ).reshape(-1, 1)\n",
" coreset_indices = []\n",
" num_coreset_samples = int(len(features) * self.percentage)\n",
"\n",
" with torch.no_grad():\n",
" for _ in tqdm.tqdm(range(num_coreset_samples), desc=\"Subsampling...\"):\n",
" select_idx = torch.argmax(approximate_coreset_anchor_distances).item()\n",
" coreset_indices.append(select_idx)\n",
" coreset_select_distance = self._compute_batchwise_differences(\n",
" features, features[select_idx : select_idx + 1] # noqa: E203\n",
" )\n",
" approximate_coreset_anchor_distances = torch.cat(\n",
" [approximate_coreset_anchor_distances, coreset_select_distance],\n",
" dim=-1,\n",
" )\n",
" approximate_coreset_anchor_distances = torch.min(\n",
" approximate_coreset_anchor_distances, dim=1\n",
" ).values.reshape(-1, 1)\n",
"\n",
" return torch.tensor(coreset_indices, device=features.device, dtype=torch.int64)\n",
"\n",
"\n",
"class RandomSampler(BaseSampler):\n",
" def __init__(self, percentage: float):\n",
" super().__init__(percentage)\n",
"\n",
" def run(\n",
" self, features: Union[torch.Tensor, np.ndarray]\n",
" ) -> Union[torch.Tensor, np.ndarray]:\n",
" \"\"\"Randomly samples input feature collection.\n",
"\n",
" Args:\n",
" features: [N x D]\n",
" \"\"\"\n",
" num_random_samples = int(len(features) * self.percentage)\n",
" subset_indices = np.random.choice(\n",
" len(features), num_random_samples, replace=False\n",
" )\n",
" return torch.tensor(subset_indices, device=features.device, dtype=torch.int64)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# I ran this cell on Google Colab because I don't have a GPU on my local machine,\n",
"# hence why you see the Google Drive paths\n",
"\n",
"device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n",
"model = SentenceTransformer(\"nomic-ai/modernbert-embed-base\")\n",
"print(model)\n",
"\n",
"def get_entry_info(entry) -> str:\n",
" return entry['task_description']\n",
"\n",
"def get_embeddings(text) -> torch.Tensor:\n",
" return torch.from_numpy(model.encode(text)).to(torch.bfloat16)\n",
"\n",
"embeddings = []\n",
"\n",
"with open(\"./drive/MyDrive/reasoning-gym/codeio-pyedu-extracted.jsonl\") as f:\n",
" for line in tqdm(f):\n",
" entry = json.loads(line)\n",
" entry_info = get_entry_info(entry)\n",
" embeddings.append(get_embeddings(entry_info))\n",
"\n",
"embeddings = torch.stack(embeddings).to(torch.bfloat16).to(device)\n",
"print(embeddings.shape)\n",
"\n",
"sampler = ApproximateGreedyCoresetSampler(\n",
" percentage=0.05, \n",
" device=device, \n",
" dtype=torch.bfloat16,\n",
" dimension_to_project_features_to=768,\n",
")\n",
"subsampled = sampler.run(embeddings)\n",
"\n",
"indices = set(subsampled.cpu().tolist())\n",
"with open(\"./drive/MyDrive/reasoning-gym/codeio-pyedu-extracted.jsonl\", \"r\") as f_in, \\\n",
" open(\"./drive/MyDrive/reasoning-gym/codeio-pyedu-best-coverage.jsonl\", \"w+\") as f_out:\n",
" for i, line in enumerate(f_in):\n",
" if i in indices:\n",
" f_out.write(line)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create input generators for each problem separately"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"SYSTEM_PROMPT = \"\"\"You are a helpful assistant that generates valid Python functions that act as input generators for a given code snippet.\n",
"\n",
"You have access to `random.Random`, therefore you SHOULD NOT import it again. You should use this random number generator to make the input generation process stochastic on each call.\n",
"\n",
"When the user asks you to generate an input for a code snippet, you should strictly respond in the following format:\n",
"<function>\n",
"def generate_input(rng: Random) -> dict:\n",
" # Your code here\n",
" pass\n",
"</function>\n",
"\n",
"The output of the function should be a dictionary where the keys are the variable names and the values are the generated values.\n",
"\n",
"It must contain all the variables that listed in the user's input specification, or more precisely in the `main_solution` function signature. \n",
"\"\"\"\n",
"\n",
"USER_PROMPT = \"\"\"Following are a task description, input/output specification, and relevant code snippet for a Python programming task.\n",
"\n",
"<task_description>\n",
"{task_description}\n",
"</task_description>\n",
"\n",
"<input_output_spec>\n",
"{input_output_spec}\n",
"</input_output_spec>\n",
"\n",
"<code_sample>\n",
"{code_sample}\n",
"</code_sample>\n",
"\n",
"Your task is to write a Python function `def generate_input(rng: Random) -> dict:` that generates valid inputs for the given code snippet, based on the provided information.\n",
"\"\"\"\n",
"\n",
"# We'll control concurrency with a semaphore\n",
"CONCURRENCY_LIMIT = 10\n",
"sem = asyncio.Semaphore(CONCURRENCY_LIMIT)\n",
"\n",
"async def fetch_input_generator(session: aiohttp.ClientSession, entry: dict) -> dict:\n",
" \"\"\"\n",
" Sends a POST request to OpenRouter with the system & user prompts,\n",
" extracts the function from the response, and returns the updated entry.\n",
" \"\"\"\n",
" url = \"https://openrouter.ai/api/v1/chat/completions\"\n",
" headers = {\n",
" \"Authorization\": f\"Bearer {os.getenv('OPENROUTER_API_KEY')}\",\n",
" \"Content-Type\": \"application/json\",\n",
" }\n",
"\n",
" payload = {\n",
" \"model\": \"deepseek/deepseek-chat\",\n",
" \"messages\": [\n",
" {\"role\": \"system\", \"content\": SYSTEM_PROMPT},\n",
" {\n",
" \"role\": \"user\",\n",
" \"content\": USER_PROMPT.format(**entry)\n",
" },\n",
" ],\n",
" }\n",
"\n",
" async with sem:\n",
" async for attempt in AsyncRetrying(\n",
" stop=stop_after_attempt(5),\n",
" wait=wait_exponential(multiplier=1, min=1, max=60),\n",
" retry=retry_if_exception_type(\n",
" (aiohttp.ClientError, asyncio.TimeoutError, json.JSONDecodeError, ValueError)\n",
" ),\n",
" ):\n",
" with attempt:\n",
" async with session.post(url, headers=headers, json=payload) as response:\n",
" data = await response.json()\n",
"\n",
" # Basic checks for valid response\n",
" if \"choices\" not in data or not data[\"choices\"]:\n",
" print(\"No choices found in response\")\n",
" return entry\n",
"\n",
" content = data[\"choices\"][0][\"message\"][\"content\"]\n",
" match = re.search(r\"<function>(.*?)</function>\", content, re.DOTALL)\n",
" if not match:\n",
" print(\"Could not find <function>...</function> block in response\")\n",
" return entry\n",
"\n",
" input_generator = match.group(1).strip()\n",
" entry[\"input_generator\"] = input_generator\n",
" return entry\n",
"\n",
" # If we exit the loop without returning, raise Exception\n",
" raise Exception(\"Failed to get valid input generator after retries\")\n",
"\n",
"async def process_file(input_file: str, output_file: str):\n",
" \"\"\"\n",
" Reads each line from `input_file`, processes each entry concurrently,\n",
" and writes augmented entries to `output_file`.\n",
" \"\"\"\n",
" # Read all lines first (synchronously)\n",
" with open(input_file, \"r\") as f_in:\n",
" lines = f_in.readlines()\n",
"\n",
" tasks = []\n",
" async with aiohttp.ClientSession() as session:\n",
" # Create a task for each line/entry\n",
" for line in lines:\n",
" entry = json.loads(line)\n",
" tasks.append(asyncio.create_task(fetch_input_generator(session, entry)))\n",
"\n",
" # We'll gather results while showing progress\n",
" results = []\n",
" for t in tqdm(asyncio.as_completed(tasks), total=len(tasks)):\n",
" result = await t\n",
" results.append(result)\n",
"\n",
" # Write all results out\n",
" with open(output_file, \"w\") as f_out:\n",
" for res in results:\n",
" f_out.write(json.dumps(res))\n",
" f_out.write(\"\\n\")\n",
"\n",
"# Finally, run the entire pipeline\n",
"await process_file(\n",
" input_file=\"data/codeio-pyedu-best-coverage.jsonl\",\n",
" output_file=\"data/codeio-pyedu-with-input-generator.jsonl\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Filter out invalid input generators"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you want to install a template with custom package\n",
"\n",
"https://e2b.dev/docs/quickstart/install-custom-packages\n",
"\n",
"An example e2b.Dockerfile looks like this:\n",
"\n",
"```Dockerfile\n",
"FROM e2bdev/code-interpreter:latest\n",
"\n",
"RUN pip install numpy matplotlib scipy pandas scikit-learn sympy networkx requests pillow bs4 cryptography spacy numba pyyaml regex\n",
"```\n",
"\n",
"However, I am going with the default installed libraries: https://e2b.dev/docs/code-interpreting/analyze-data-with-ai/pre-installed-libraries "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Example usage of the Sandbox class\n",
"with Sandbox() as sandbox:\n",
"\n",
" # First initialize the sandbox\n",
" execution = sandbox.run_code(\"\"\"\n",
"from random import Random # <----- ALWAYS PREPEND THIS LINE TO YOUR CODE SNIPPET\n",
"\n",
"def hello_world():\n",
" return {\"a\": 5, \"b\": 10}\n",
"\n",
"def multiple_hello_worlds(rng: Random):\n",
" return [\n",
" {\"a\": rng.randint(1, 10), \"b\": rng.randint(10, 20)},\n",
" {\"a\": 10, \"b\": 20},\n",
" ]\n",
"\"\"\"\n",
" )\n",
" try:\n",
" # Run the code snippet\n",
" execution = sandbox.run_code(\"rng = Random(53);multiple_hello_worlds(rng)\", timeout=5)\n",
" print(execution)\n",
" if execution.error:\n",
" print(\"[!! FOUND ERROR !!]\")\n",
" else:\n",
" print(type(execution.text))\n",
" print(execution.text)\n",
" except TimeoutException as e:\n",
" print(e)\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "371d38e1fe9e41d587b2cfa64ca9ef91",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
" 0%| | 0/7053 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Response 404\n",
"Response 404\n",
"Response 404\n",
"Response 404\n",
"Response 404\n",
"Response 404\n",
"Response 404\n",
"Response 404\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"full_sampling_fails: 913\n",
"warmup_fails: 528\n",
"missing_input_generator: 36\n",
"cannot_initialize_code: 98\n",
"Total errors: 1575\n"
]
}
],
"source": [
"CODE_TEMPLATE = \"\"\"from random import Random\n",
"{code_sample}\n",
"\n",
"{input_generator}\n",
"\n",
"def multiple_eval(num_generations: int, seed: int = 42) -> tuple:\n",
" rng = Random(seed)\n",
" inputs = [generate_input(rng) for _ in range(num_generations)]\n",
" outputs = [main_solution(**inp) for inp in inputs]\n",
" return inputs, outputs\n",
"\"\"\"\n",
"\n",
"SAMPLING_TEMPLATE = \"multiple_eval({num_generations})\"\n",
"\n",
"WARMUP_GENERATIONS = 5\n",
"TOTAL_GENERATIONS = 1_000\n",
"TIMEOUT_CODE_INIT = 10\n",
"TIMEOUT_PER_SAMPLE = 0.01\n",
"\n",
"errors = defaultdict(int)\n",
"total_entries = sum(1 for _ in open(\"data/codeio-pyedu-with-input-generator.jsonl\", \"r\"))\n",
"\n",
"with open(\"data/codeio-pyedu-with-input-generator.jsonl\", \"r\") as f_in, \\\n",
" open(\"data/codeio-pyedu-with-input-generator-filtered.jsonl\", \"w+\") as f_out:\n",
"\n",
" iterator = tqdm(enumerate(f_in), total=total_entries)\n",
"\n",
" for i, line in iterator:\n",
" iterator.set_description(f\"Failures: \" + \" | \".join(f\"{k}: {v}\" for k, v in errors.items()) + f\" | total: {sum(errors.values())}\")\n",
" entry = json.loads(line)\n",
"\n",
" if not \"input_generator\" in entry:\n",
" errors[\"missing_input_generator\"] += 1\n",
" continue\n",
" \n",
" with Sandbox() as sandbox:\n",
" # 1. Initialize the sandbox\n",
" try: \n",
" execution = sandbox.run_code(\n",
" code=CODE_TEMPLATE.format(**entry), \n",
" timeout=TIMEOUT_CODE_INIT\n",
" )\n",
" assert not execution.error, \"Error in code snippet\"\n",
" except Exception as e:\n",
" errors[\"cannot_initialize_code\"] += 1\n",
" continue\n",
" \n",
" # 2. Warmup the sampling\n",
" try:\n",
" execution = sandbox.run_code(\n",
" code=SAMPLING_TEMPLATE.format(num_generations=WARMUP_GENERATIONS),\n",
" timeout=TIMEOUT_CODE_INIT,\n",
" )\n",
" assert not execution.error, \"Error in input generator (warmup)\"\n",
" assert execution.text, \"Empty input generator output (warmup)\"\n",
" inputs, outputs = eval(execution.text)\n",
" except Exception as e:\n",
" errors[\"warmup_fails\"] += 1\n",
" continue\n",
"\n",
" # 3. Run the full sampling\n",
" try:\n",
" execution = sandbox.run_code(\n",
" code=SAMPLING_TEMPLATE.format(num_generations=TOTAL_GENERATIONS),\n",
" timeout=int(TIMEOUT_PER_SAMPLE * TOTAL_GENERATIONS),\n",
" )\n",
" assert not execution.error, \"Error in input generator (full)\"\n",
" assert execution.text, \"Empty input generator output (full)\"\n",
" inputs, outputs = eval(execution.text)\n",
" assert len(inputs) == TOTAL_GENERATIONS, \"Mismatch in input generations\"\n",
" assert len(outputs) == TOTAL_GENERATIONS, \"Mismatch in output generations\"\n",
" unique_inputs = len(set(hash(json.dumps(inp, sort_keys=True)) for inp in inputs))\n",
" unique_outputs = len(set(hash(json.dumps(out, sort_keys=True)) for out in outputs))\n",
" except:\n",
" errors[\"full_sampling_fails\"] += 1\n",
" continue\n",
" \n",
" # 4. Save the entry\n",
" entry = entry | {\n",
" \"unique_inputs\": unique_inputs,\n",
" \"unique_outputs\": unique_outputs,\n",
" \"total_generations\": TOTAL_GENERATIONS,\n",
" }\n",
" f_out.write(json.dumps(entry))\n",
" f_out.write(\"\\n\")\n",
"\n",
"for k, v in errors.items():\n",
" print(f\"{k}: {v}\")\n",
"print(f\"Total errors: {sum(errors.values())}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "reasoning_gym",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.11"
}
},
"nbformat": 4,
"nbformat_minor": 2
}