Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(duckdb): query s3 file #22611

Merged
merged 1 commit into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ dependencies = [
"fastapi==0.115.6",
"uvicorn==0.34.0",
"confluent-kafka==2.8.0",
"python-dotenv==1.0.1",
"sentry-sdk[fastapi]==2.19.2",
"pydantic-settings==2.7.1",
]
Expand Down
2 changes: 0 additions & 2 deletions api-python/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
HTTP_AUTH_TOKEN=xxx
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
HTTP_AUTH_TOKEN=xxx
13 changes: 13 additions & 0 deletions data-storage/hm-duckdb/query-amazon-s3/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
uv-install-python::
uv python install
uv-update-lock-file:
uv lock
uv-install-dependencies:
uv sync --dev

uv-run-dev:
uv run poe dev
uv-run-test:
uv run poe test
uv-run-test-coverage:
uv run poe test-coverage
Empty file.
25 changes: 25 additions & 0 deletions data-storage/hm-duckdb/query-amazon-s3/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[project]
name = "query-amazon-s3"
version = "1.0.0"
requires-python = "~=3.13.0"
dependencies = [
"duckdb==1.1.3",
"polars[pyarrow]==1.19.0",
"pydantic-settings==2.7.1",
"xxhash==3.5.0",
]

[dependency-groups]
dev = [
"poethepoet==0.32.1",
"pytest==8.3.4",
"pytest-cov==6.0.0",
]

[tool.uv]
package = false

[tool.poe.tasks]
dev = "python src/main.py"
test = "pytest --verbose --verbose"
test-coverage = "pytest --cov=. --cov-report=xml"
4 changes: 4 additions & 0 deletions data-storage/hm-duckdb/query-amazon-s3/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# https://docs.pytest.org/en/stable/reference/customize.html#pytest-ini

[pytest]
pythonpath = src
16 changes: 16 additions & 0 deletions data-storage/hm-duckdb/query-amazon-s3/src/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os

from pydantic_settings import BaseSettings


def get_env_file() -> str:
env = os.getenv("ENV")
return ".env.production.local" if env == "production" else ".env.development.local"


class Config(BaseSettings):
HTTP_AUTH_TOKEN: str

model_config = {
"env_file": get_env_file(),
}
65 changes: 65 additions & 0 deletions data-storage/hm-duckdb/query-amazon-s3/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import logging
from pathlib import Path

import duckdb
import xxhash
from config import Config
from utils.clean_table_name import clean_table_name
from utils.get_file_true_stem import get_file_true_stem

logger = logging.getLogger(__name__)


def get_cache_table_name(parquet_url: str) -> str:
url_hash = xxhash.xxh128(parquet_url.encode()).hexdigest()
table_name = clean_table_name(get_file_true_stem(Path(parquet_url.split("/")[-1])))
return f"{table_name}_{url_hash}"


def main(parquet_url: str) -> None:
config = Config()
duckdb_cache_db_path = Path("data/cache.duckdb")
logger.info(f"Using DuckDB cache file: {duckdb_cache_db_path}")

with duckdb.connect(duckdb_cache_db_path) as conn:
try:
# Configure DuckDB settings
conn.execute("set enable_progress_bar=true")
conn.execute(
f"""
create secret if not exists http_auth (
type http,
bearer_token '{config.HTTP_AUTH_TOKEN}'
)
""",
)

# Create DuckDB cache
logger.info("Loading data...")
table_name = get_cache_table_name(parquet_url)
conn.execute(f"""
create table if not exists {table_name} as
select * from read_parquet('{parquet_url}')
""") # noqa: S608

# Query
query = f"""
select *
from {table_name}
""" # noqa: S608

df = conn.execute(query).pl()
logger.info(df)
except Exception:
logger.exception("An error occurred")


if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
parquet_url = (
"https://data-browser.internal.hongbomiao.com/experiments/experiment1.parquet"
)
main(parquet_url)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import re


def clean_table_name(s: str) -> str:
# Lowercase
s = s.lower()
# Replace non-alphanumeric characters with _
return re.sub(r"[^a-z0-9]", "_", s)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from utils.clean_table_name import clean_table_name


class TestCleanTableName:
def test_input_string_with_only_alphanumeric_characters(self) -> None:
assert clean_table_name("abc123") == "abc123"

def test_input_string_with_only_uppercase_alphanumeric_characters(self) -> None:
assert clean_table_name("ABC123") == "abc123"

def test_input_string_with_mix_of_alphanumeric_characters_and_underscores(
self,
) -> None:
assert clean_table_name("abc_123") == "abc_123"

def test_input_string_with_only_non_alphanumeric_characters(self) -> None:
assert clean_table_name("abc/123") == "abc_123"
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pathlib import Path


def get_file_true_stem(file_path: Path) -> str:
stem = file_path.stem
return stem if stem == str(file_path) else get_file_true_stem(Path(stem))
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pathlib import Path

from utils.get_file_true_stem import get_file_true_stem


class TestGetFileTrueStem:
def test_file_path(self) -> None:
file_path = Path("/path/to/my.txt")
assert get_file_true_stem(file_path) == "my"

def test_file_name(self) -> None:
file_path = Path("my.txt")
assert get_file_true_stem(file_path) == "my"

def test_multiple_dot_extension(self) -> None:
file_path = Path("/path/to/my.zstd.parquet")
assert get_file_true_stem(file_path) == "my"

def test_hidden_file_path(self) -> None:
file_path = Path("/path/to/.my.txt")
assert get_file_true_stem(file_path) == ".my"

def test_hidden_file_name_with_extension(self) -> None:
file_path = Path(".my.txt")
assert get_file_true_stem(file_path) == ".my"

def test_hidden_file_name(self) -> None:
file_path = Path(".my")
assert get_file_true_stem(file_path) == ".my"

def test_dir_file_path(self) -> None:
file_path = Path("/path/to/my")
assert get_file_true_stem(file_path) == "my"
Loading