Skip to content

Commit

Permalink
feat(duckdb): query s3 file (#22611)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongbo-miao authored Jan 11, 2025
1 parent d38c60a commit 61f4392
Show file tree
Hide file tree
Showing 21 changed files with 537 additions and 21 deletions.
1 change: 0 additions & 1 deletion api-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ dependencies = [
"fastapi==0.115.6",
"uvicorn==0.34.0",
"confluent-kafka==2.8.0",
"python-dotenv==1.0.1",
"sentry-sdk[fastapi]==2.19.2",
"pydantic-settings==2.7.1",
]
Expand Down
2 changes: 0 additions & 2 deletions api-python/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
HTTP_AUTH_TOKEN=xxx
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
HTTP_AUTH_TOKEN=xxx
13 changes: 13 additions & 0 deletions data-storage/hm-duckdb/query-amazon-s3/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
uv-install-python::
uv python install
uv-update-lock-file:
uv lock
uv-install-dependencies:
uv sync --dev

uv-run-dev:
uv run poe dev
uv-run-test:
uv run poe test
uv-run-test-coverage:
uv run poe test-coverage
Empty file.
25 changes: 25 additions & 0 deletions data-storage/hm-duckdb/query-amazon-s3/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[project]
name = "query-amazon-s3"
version = "1.0.0"
requires-python = "~=3.13.0"
dependencies = [
"duckdb==1.1.3",
"polars[pyarrow]==1.19.0",
"pydantic-settings==2.7.1",
"xxhash==3.5.0",
]

[dependency-groups]
dev = [
"poethepoet==0.32.1",
"pytest==8.3.4",
"pytest-cov==6.0.0",
]

[tool.uv]
package = false

[tool.poe.tasks]
dev = "python src/main.py"
test = "pytest --verbose --verbose"
test-coverage = "pytest --cov=. --cov-report=xml"
4 changes: 4 additions & 0 deletions data-storage/hm-duckdb/query-amazon-s3/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# https://docs.pytest.org/en/stable/reference/customize.html#pytest-ini

[pytest]
pythonpath = src
16 changes: 16 additions & 0 deletions data-storage/hm-duckdb/query-amazon-s3/src/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os

from pydantic_settings import BaseSettings


def get_env_file() -> str:
env = os.getenv("ENV")
return ".env.production.local" if env == "production" else ".env.development.local"


class Config(BaseSettings):
HTTP_AUTH_TOKEN: str

model_config = {
"env_file": get_env_file(),
}
65 changes: 65 additions & 0 deletions data-storage/hm-duckdb/query-amazon-s3/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import logging
from pathlib import Path

import duckdb
import xxhash
from config import Config
from utils.clean_table_name import clean_table_name
from utils.get_file_true_stem import get_file_true_stem

logger = logging.getLogger(__name__)


def get_cache_table_name(parquet_url: str) -> str:
url_hash = xxhash.xxh128(parquet_url.encode()).hexdigest()
table_name = clean_table_name(get_file_true_stem(Path(parquet_url.split("/")[-1])))
return f"{table_name}_{url_hash}"


def main(parquet_url: str) -> None:
config = Config()
duckdb_cache_db_path = Path("data/cache.duckdb")
logger.info(f"Using DuckDB cache file: {duckdb_cache_db_path}")

with duckdb.connect(duckdb_cache_db_path) as conn:
try:
# Configure DuckDB settings
conn.execute("set enable_progress_bar=true")
conn.execute(
f"""
create secret if not exists http_auth (
type http,
bearer_token '{config.HTTP_AUTH_TOKEN}'
)
""",
)

# Create DuckDB cache
logger.info("Loading data...")
table_name = get_cache_table_name(parquet_url)
conn.execute(f"""
create table if not exists {table_name} as
select * from read_parquet('{parquet_url}')
""") # noqa: S608

# Query
query = f"""
select *
from {table_name}
""" # noqa: S608

df = conn.execute(query).pl()
logger.info(df)
except Exception:
logger.exception("An error occurred")


if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
parquet_url = (
"https://data-browser.internal.hongbomiao.com/experiments/experiment1.parquet"
)
main(parquet_url)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import re


def clean_table_name(s: str) -> str:
# Lowercase
s = s.lower()
# Replace non-alphanumeric characters with _
return re.sub(r"[^a-z0-9]", "_", s)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from utils.clean_table_name import clean_table_name


class TestCleanTableName:
def test_input_string_with_only_alphanumeric_characters(self) -> None:
assert clean_table_name("abc123") == "abc123"

def test_input_string_with_only_uppercase_alphanumeric_characters(self) -> None:
assert clean_table_name("ABC123") == "abc123"

def test_input_string_with_mix_of_alphanumeric_characters_and_underscores(
self,
) -> None:
assert clean_table_name("abc_123") == "abc_123"

def test_input_string_with_only_non_alphanumeric_characters(self) -> None:
assert clean_table_name("abc/123") == "abc_123"
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pathlib import Path


def get_file_true_stem(file_path: Path) -> str:
stem = file_path.stem
return stem if stem == str(file_path) else get_file_true_stem(Path(stem))
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pathlib import Path

from utils.get_file_true_stem import get_file_true_stem


class TestGetFileTrueStem:
def test_file_path(self) -> None:
file_path = Path("/path/to/my.txt")
assert get_file_true_stem(file_path) == "my"

def test_file_name(self) -> None:
file_path = Path("my.txt")
assert get_file_true_stem(file_path) == "my"

def test_multiple_dot_extension(self) -> None:
file_path = Path("/path/to/my.zstd.parquet")
assert get_file_true_stem(file_path) == "my"

def test_hidden_file_path(self) -> None:
file_path = Path("/path/to/.my.txt")
assert get_file_true_stem(file_path) == ".my"

def test_hidden_file_name_with_extension(self) -> None:
file_path = Path(".my.txt")
assert get_file_true_stem(file_path) == ".my"

def test_hidden_file_name(self) -> None:
file_path = Path(".my")
assert get_file_true_stem(file_path) == ".my"

def test_dir_file_path(self) -> None:
file_path = Path("/path/to/my")
assert get_file_true_stem(file_path) == "my"
Loading

0 comments on commit 61f4392

Please sign in to comment.