Skip to content

Commit

Permalink
feat: save multiple json file to respect labelstudio format
Browse files Browse the repository at this point in the history
  • Loading branch information
polomarcus committed Feb 21, 2025
1 parent 10fb38f commit 6021da4
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 20 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jobs/label_misinformation/jobs
**secrets/
**venv/
**/s3/
**/.parquet
# Node artifact files
node_modules/
dist/
Expand Down
6 changes: 4 additions & 2 deletions jobs/label_misinformation/app/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import openai
import logging
import os
import ray
import sys
from date_utils import *
from s3_utils import *
Expand Down Expand Up @@ -70,6 +71,7 @@ def detect_misinformation(df_news, model_name, min_misinformation_score = 10) ->
@monitor(monitor_slug='label-misinformation')
def main():
logger = getLogger()
ray.init(log_to_driver=True)
pd.set_option('display.max_columns', None)
sentry_init()
model_name = get_secret_docker("MODEL_NAME")
Expand Down Expand Up @@ -100,12 +102,12 @@ def main():
logging.info(f"Misinformation detected {len(misinformation_only_news)} rows")
logging.info(f"Examples : {misinformation_only_news.head(10)}")

# save TSV format (CSV)
# save JSON format
save_to_s3(misinformation_only_news, channel=channel,date=date, s3_client=s3_client, \
bucket=bucket_output, folder_inside_bucket=app_name)

# TODO save using LabelStudio's API
# (or use CSV import from S3)
# (or use JSON import from S3)
else:
logging.info(f"No misinformation detected for channel {channel} on {date}")
else:
Expand Down
Empty file.
Empty file.
80 changes: 66 additions & 14 deletions jobs/label_misinformation/app/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from sentry_sdk.crons import monitor
import shutil
import sys
import json

def get_secret_docker(secret_name):
secret_value = os.environ.get(secret_name, "")
Expand Down Expand Up @@ -42,8 +43,6 @@ def get_bucket_key_folder(date, channel, root_folder = None):
return f"{root_folder}/{key}"

return key



def read_folder_from_s3(date, channel: str, bucket: str):
s3_path: str = get_bucket_key_folder(date=date, channel=channel)
Expand All @@ -60,8 +59,6 @@ def read_folder_from_s3(date, channel: str, bucket: str):
logging.info(f"read {len(df)} rows from S3")
return df



def check_if_object_exists_in_s3(day, channel, s3_client, bucket: str, root_folder = None) -> bool:
folder_prefix = get_bucket_key_folder(day, channel, root_folder=root_folder)

Expand All @@ -78,8 +75,8 @@ def check_if_object_exists_in_s3(day, channel, s3_client, bucket: str, root_fold
logging.error(f"Error while checking folder in S3: {folder_prefix}\n{e}")
return False

def upload_file_to_s3(local_file, bucket_name, base_s3_path, s3_client):
s3_key = f"{base_s3_path}misinformation.tsv"
def upload_file_to_s3(local_file, bucket_name, base_s3_path, s3_client, format = "json"):
s3_key = f"{base_s3_path}misinformation.{format}"
logging.info(f"Reading {local_file} to upload it to {s3_key}")
s3_client.upload_file(local_file, bucket_name, s3_key)

Expand All @@ -96,29 +93,83 @@ def upload_folder_to_s3(local_folder, bucket_name, base_s3_path, s3_client):
logging.info(f"Uploading to bucket {bucket_name} key : {s3_key}")
s3_client.upload_file(local_file_path, bucket_name, s3_key)
logging.info(f"Uploaded: {s3_key}")
# Delete the local folder after successful upload
shutil.rmtree(local_folder)
logging.info(f"Deleted local folder: {local_folder}")
# Delete the local folder after successful upload
shutil.rmtree(local_folder)
logging.info(f"Deleted local folder: {local_folder}")

def save_csv(df: pd.DataFrame, channel: str, date: pd.Timestamp, s3_path, folder_inside_bucket = None):
csv_based_path = "s3"
based_path = "s3"
local_csv = "s3/misinformation.tsv"
if folder_inside_bucket is not None:
local_csv = f"{csv_based_path}/{folder_inside_bucket}.tsv"
local_csv = f"{based_path}/{folder_inside_bucket}.tsv"

os.makedirs(os.path.dirname(local_csv), exist_ok=True)
df.to_csv(local_csv, sep ='\t') # tab separated

# local_csv_folder = f"{csv_based_path}/{s3_path}"
logging.info(f"CSV saved locally {local_csv}")
return local_csv

# https://labelstud.io/guide/storage#One-Task-One-JSON-File
def reformat_and_save(df, output_folder="output_json_files"):
# Ensure the output folder exists
os.makedirs(os.path.dirname(output_folder), exist_ok=True)

for idx, row in df.iterrows():
start_time = row["start"].isoformat() if isinstance(row["start"], pd.Timestamp) else row["start"]
task_data = {
"data": {
"item": {
"plaintext": row["plaintext"], # replace with your actual column name
"start": start_time,
"channel_title": row["channel_title"],
"channel_name": row["channel_name"],
"channel_program": row["channel_program"],
"channel_program_type": row["channel_program_type"],
"model_name": row["model_name"],
"model_result": row["model_result"],
"year": row["year"],
"month": row["month"],
"day": row["day"],
"channel": row["channel"]
}
},
"annotations": [],
"predictions": []
}

# Define the file path for each row
file_path = os.path.join(output_folder, f"task_{idx + 1}.json")
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Write the formatted data to a file
with open(file_path, "w", encoding="utf-8") as f:
json.dump(task_data, f, ensure_ascii=False, indent=4)

return output_folder

# new line delimited json
def save_json(df: pd.DataFrame, channel: str, date: pd.Timestamp, s3_path, folder_inside_bucket = None):
based_path = "s3"

local_json = "s3/json"
if folder_inside_bucket is not None:
local_json = f"{based_path}/{folder_inside_bucket}"
os.makedirs(os.path.dirname(local_json), exist_ok=True)
# formatted_df = reformat_for_labelstudio(df)
# formatted_df.to_json(local_json, orient="records", lines=True)

local_json = reformat_and_save(df, output_folder=local_json)
logging.info(f"JSON saved locally {local_json}")
return local_json

def save_parquet(df: pd.DataFrame, channel: str, date: pd.Timestamp, s3_path, folder_inside_bucket = None):
based_path = "s3/parquet"
os.makedirs(os.path.dirname(based_path), exist_ok=True)
local_parquet = based_path
if folder_inside_bucket is not None:
local_parquet = f"{based_path}/{folder_inside_bucket}"

os.makedirs(os.path.dirname(local_parquet), exist_ok=True)
df.to_parquet(local_parquet,
compression='gzip'
,partition_cols=['year', 'month', 'day', 'channel'])
Expand Down Expand Up @@ -146,9 +197,10 @@ def save_to_s3(df: pd.DataFrame, channel: str, date: pd.Timestamp, s3_client, bu
s3_path: str = f"{get_bucket_key_folder(date, channel, root_folder=folder_inside_bucket)}"

# local_folder_parquet = save_parquet(df, channel, date, s3_path, folder_inside_bucket)
local_csv_file = save_csv(df, channel, date, s3_path, folder_inside_bucket)
# upload_folder_to_s3(local_folder_parquet, bucket, s3_path, s3_client=s3_client)
upload_file_to_s3(local_csv_file, bucket, s3_path, s3_client=s3_client)
# local_csv_file = save_csv(df, channel, date, s3_path, folder_inside_bucket)
local_json_folder = save_json(df, channel, date, s3_path, folder_inside_bucket)
upload_folder_to_s3(local_json_folder, bucket, s3_path, s3_client=s3_client)
# upload_file_to_s3(local_json_file, bucket, s3_path, s3_client=s3_client, format="json")

except Exception as err:
logging.fatal("save_to_s3 (%s) %s" % (type(err).__name__, err))
Expand Down
6 changes: 3 additions & 3 deletions jobs/label_misinformation/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ services:
app:
build: .
environment:
LOGLEVEL: DEBUG
LOGLEVEL: INFO
ENV: docker
APP_NAME: label-misinformation
OPENAI_API_KEY: /run/secrets/openai_key
Expand All @@ -13,9 +13,9 @@ services:
BUCKET_SECRET: /run/secrets/bucket_secret
AWS_DEFAULT_REGION: fr-par
MIN_MISINFORMATION_SCORE: 10 # the minimum score to have to be kept (10 out of 10)
DATE: 2025-02-01 # YYYY-MM-DD
DATE: 2025-02-19 # YYYY-MM-DD
# SENTRY_DSN: prod_only
CHANNEL: itele # mediatree former channel name, not title (cnews)
CHANNEL: europe1 # mediatree former channel name, not title (cnews)
secrets:
- model_name
- openai_key
Expand Down
14 changes: 13 additions & 1 deletion jobs/label_misinformation/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
sys.path.append(os.path.abspath('/app/app'))
import modin.pandas as pd
from app.s3_utils import save_to_s3, save_csv, save_parquet, get_s3_client, get_bucket_key_folder
from app.s3_utils import save_to_s3, save_csv, save_parquet, get_s3_client, get_bucket_key_folder, save_json
from datetime import datetime, timedelta
import logging

Expand All @@ -20,6 +20,18 @@ def test_save_csv():
output = save_csv(df, channel="itele", date=date, s3_path="test")
assert output == "s3/misinformation.tsv"

def test_save_json():
df: pd.DataFrame = pd.read_parquet("tests/label_misinformation/data/misinformation.parquet")
date: datetime = datetime.now()
channel="itele"
df['year'] = date.year
df['month'] = date.month
df['day'] = date.day
df['channel'] = channel # channel_name from mediatree's api

output = save_json(df, channel=channel, date=date, s3_path="test")
assert output == "s3/json"

def test_save_parquet():
df: pd.DataFrame = pd.read_parquet("tests/label_misinformation/data/misinformation.parquet")
channel="itele"
Expand Down

0 comments on commit 6021da4

Please sign in to comment.