Skip to content

Commit

Permalink
fix: update naming and simplify logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Jennifer Tran committed May 17, 2024
1 parent c5a0031 commit b3d2597
Showing 1 changed file with 33 additions and 41 deletions.
74 changes: 33 additions & 41 deletions transformation-scripts/collection-and-item-workflows-ingest.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
" \"combined_CMIP6_daily_GISS-E2-1-G_tas_kerchunk_DEMO\",\n",
"]\n",
"\n",
"json_file_paths = glob.glob(\"../ingestion-data/collections/*.json\")\n",
"collection_json_file_paths = glob.glob(\"../ingestion-data/collections/*.json\")\n",
"filtered_list = [\n",
" item\n",
" for item in json_file_paths\n",
Expand Down Expand Up @@ -110,26 +110,14 @@
"metadata": {},
"outputs": [],
"source": [
"def load_json_files_from_directory(directory):\n",
" json_files = []\n",
" for filename in os.listdir(directory):\n",
" if filename.endswith(\".json\"):\n",
" json_files.append(filename)\n",
" return json_files\n",
"\n",
"\n",
"def find_matching_file_names(collections_list, discovery_items_list):\n",
" matching_file_names = []\n",
" for collection_filename in collections_list:\n",
" collection_json = load_json_file(\n",
" os.path.join(collections_files, collection_filename)\n",
" )\n",
" collection_json = load_json_file(collection_filename)\n",
" id1 = collection_json.get(\"id\")\n",
" if id1 is not None:\n",
" for discovery_items_filename in discovery_items_list:\n",
" item_json = load_json_file(\n",
" os.path.join(discovery_items_files, discovery_items_filename)\n",
" )\n",
" item_json = load_json_file(discovery_items_filename)\n",
" if isinstance(item_json, list):\n",
" if len(item_json) > 0:\n",
" collection2 = item_json[0].get(\"collection\")\n",
Expand All @@ -140,7 +128,6 @@
" if collection2 == id1:\n",
" # Found a match\n",
" matching_file_names.append(discovery_items_filename)\n",
" # Further processing or comparison can be done here\n",
" break\n",
" return matching_file_names\n",
"\n",
Expand All @@ -157,12 +144,15 @@
" else \"../ingestion-data/production/discovery-items/\"\n",
")\n",
"\n",
"# Load JSON files from directories\n",
"json_files_dir1 = load_json_files_from_directory(collections_files)\n",
"json_files_dir2 = load_json_files_from_directory(discovery_items_files)\n",
"\n",
"discovery_items_json_file_paths = (\n",
" glob.glob(\"../ingestion-data/staging/discovery-items//*.json\")\n",
" if testing_mode\n",
" else glob.glob(\"../ingestion-data/production/discovery-items//*.json\")\n",
")\n",
"# Find matching file names\n",
"matching_file_names = find_matching_file_names(json_files_dir1, json_files_dir2)\n",
"matching_file_names = find_matching_file_names(\n",
" collections_json_file_paths, discovery_items_json_file_paths\n",
")\n",
"\n",
"# for file_pair in matching_file_names:\n",
"# print(\"Match found:\")\n",
Expand Down Expand Up @@ -195,6 +185,7 @@
"mcp_prod_user_pool_id = \"CHANGE ME\"\n",
"mcp_prod_identity_pool_id = \"CHANGE ME\"\n",
"\n",
"print(f\"TESTING MODE? {testing_mode}\")\n",
"if testing_mode:\n",
" STAC_INGESTOR_API = f\"{test_endpoint}/api/ingest/\"\n",
" VEDA_STAC_API = f\"{test_endpoint}/api/stac/\"\n",
Expand All @@ -219,6 +210,7 @@
"outputs": [],
"source": [
"TOKEN = \"REPLACE ME\"\n",
"\n",
"authorization_header = f\"Bearer {TOKEN}\"\n",
"headers = {\n",
" \"Authorization\": authorization_header,\n",
Expand Down Expand Up @@ -265,20 +257,22 @@
" )\n",
"\n",
"\n",
"def ingest_item(item):\n",
"def ingest_discovery_item(discovery_item):\n",
" discovery_url = f\"{WORKFLOWS_API}/discovery\"\n",
" try:\n",
" response = requests.post(discovery_url, json=item, headers=headers)\n",
" response = requests.post(\n",
" discovery_url, json=ingest_discovery_item, headers=headers\n",
" )\n",
" response.raise_for_status()\n",
" if response.status_code == 201:\n",
" print(f\"Request was successful. \")\n",
" else:\n",
" print(\n",
" f\"Kicking off discovery for {item} failed. Request failed with status code: {response.status_code}\"\n",
" f\"Kicking off discovery for {ingest_discovery_item} failed. Request failed with status code: {response.status_code}\"\n",
" )\n",
" except requests.RequestException as e:\n",
" print(\n",
" f\"Kicking off discovery for {item} failed. An error occurred during the request: {e}\"\n",
" f\"Kicking off discovery for {ingest_discovery_item} failed. An error occurred during the request: {e}\"\n",
" )\n",
" except Exception as e:\n",
" print(\n",
Expand All @@ -300,7 +294,7 @@
"outputs": [],
"source": [
"test_file_paths_and_collection_ids = [file_paths_and_collection_ids[0]]\n",
"test_discovery_item = [f\"{file_paths_and_collection_ids[0].get(\"collectionId\")}.json\"]\n",
"test_discovery_item = [f\"../ingestion-data/staging/discovery-items/{file_paths_and_collection_ids[0].get(\"collectionId\")}.json\"]\n",
"\n",
"print(test_discovery_item)\n",
"print(test_file_paths_and_collection_ids)\n",
Expand All @@ -315,7 +309,10 @@
" test_discovery_item\n",
" if testing_mode\n",
" else discovery_items_to_process\n",
")"
")\n",
"\n",
"print(file_paths_and_collection_ids)\n",
"print(discovery_items_to_process)"
]
},
{
Expand Down Expand Up @@ -354,27 +351,22 @@
"metadata": {},
"outputs": [],
"source": [
"for item in discovery_items_to_process:\n",
" if testing_mode:\n",
" file_path = f\"../ingestion-data/staging/discovery_items/{item}\"\n",
" else:\n",
" file_path = f\"../ingestion-data/production/discovery_items/{item}\"\n",
"\n",
"for discovery_item in discovery_items_to_process:\n",
" try:\n",
" with open(file_path, \"r\", encoding=\"utf-8\") as file:\n",
" item = json.load(file)\n",
" with open(discovery_item, \"r\", encoding=\"utf-8\") as file:\n",
" discovery_item_json = json.load(file)\n",
"\n",
" # Publish the updated collection to the target ingestion `api/collections` endpoint\n",
" if isinstance(item_json, list):\n",
" for single_item in item_json:\n",
" ingest_item(single_item)\n",
" if isinstance(discovery_item_json, list):\n",
" for single_discovery_item in discovery_item_json:\n",
" ingest_discovery_item(single_discovery_item)\n",
" else:\n",
" ingest_item(item)\n",
" ingest_discovery_item(discovery_item_json)\n",
"\n",
" except requests.RequestException as e:\n",
" print(f\"An error occurred for collectionId {collection_id}: {e}\")\n",
" print(f\"An error occurred for discovery item {discovery_item}: {e}\")\n",
" except Exception as e:\n",
" print(f\"An unexpected error occurred for collectionId {collection_id}: {e}\")"
" print(f\"An unexpected error occurred for discovery item {discovery_item}: {e}\")"
]
}
],
Expand Down

0 comments on commit b3d2597

Please sign in to comment.