Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-44907: Update to use boto3 #10

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 2 additions & 30 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.8
python-version: 3.11

- name: Install package
run: pip install .[dev]
Expand All @@ -28,38 +28,10 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.8
python-version: 3.11

- name: Install package
run: pip install .[dev]

- name: Run unit tests
run: pytest --log-format="%(asctime)s %(levelname)s %(message)s" --log-date-format="%Y-%m-%d %H:%M:%S"

integration-test:
runs-on: ubuntu-latest
concurrency: integration-test
steps:
- uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.8

- name: Install package
run: pip install .[dev]

- name: Set up cloud SDK
uses: google-github-actions/setup-gcloud@master
with:
project_id: ${{ secrets.GCP_PROJECT_ID }}
service_account_key: ${{ secrets.GCP_SA_KEY }}
export_default_credentials: true

- name: Run integration tests
run: pytest --log-format="%(asctime)s %(levelname)s %(message)s" --log-date-format="%Y-%m-%d %H:%M:%S"
env:
ALERT_INGEST_TEST_GCP_PROJECT: ${{ secrets.GCP_PROJECT_ID }}
ALERT_INGEST_TEST_KAFKA_URL: ${{ secrets.KAFKA_TEST_URL }}
ALERT_INGEST_TEST_REGISTRY_URL: ${{ secrets.KAFKA_REGISTRY_URL }}
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,31 @@
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.0.1
rev: v5.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml

- repo: https://github.com/PyCQA/flake8
rev: 3.9.2
rev: 7.1.1
hooks:
- id: flake8

- repo: https://github.com/PyCQA/isort
rev: 5.8.0
rev: 5.13.2
hooks:
- id: isort
additional_dependencies:
- toml

- repo: https://github.com/psf/black
rev: 21.7b0
rev: 24.10.0
hooks:
- id: black

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.910
rev: v1.13.0
hooks:
- id: mypy
additional_dependencies:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.9.7-buster AS base-image
FROM python:3.11-buster AS base-image

# Install system dependencies
RUN apt-get update -y && apt-get install -y libsnappy-dev
Expand Down
36 changes: 22 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Clone, and install with pip (probably in a virtualenv):
```
git clone git@github.com:lsst-dm/alert_database_ingester.git
cd alert_database_ingester
python -m virtualenv virtualenv
python -m venv virtualenv
source virtualenv/bin/activate
pip install .
```
Expand All @@ -26,7 +26,8 @@ pip install .

The ingester is installed by `pip` as a command named `alertdb-ingester`:
```
usage: alertdb-ingester [-h] [--gcp-project GCP_PROJECT] [--gcp-bucket GCP_BUCKET]
usage: alertdb-ingester [-h] [--endpoint-url ENDPOINT_URL]
[--bucket-alerts BUCKET_ALERTS] [--bucket-schemas BUCKET_SCHEMAS]
[--kafka-host KAFKA_HOST] [--kafka-topic KAFKA_TOPIC]
[--kafka-group KAFKA_GROUP]
[--kafka-auth-mechanism {mtls,scram}]
Expand All @@ -40,17 +41,13 @@ Run a worker to copy alerts from Kafka into an object store backend.

optional arguments:
-h, --help show this help message and exit
--gcp-project GCP_PROJECT
when using the google-cloud backend, the name of the GCP
project (default: alert-stream)
--gcp-bucket GCP_BUCKET
when using the google-cloud backend, the name of the Google
Cloud Storage bucket (default: rubin-alert-archive)
--endpoint-url ENDPOINT_URL
when using a remote bucket, the url where the bucket is
located.
--kafka-host KAFKA_HOST
kafka host with alert data (default: alertbroker-
scratch.lsst.codes)
kafka host with alert data (default: usdf-alert-stream-dev.lsst.cloud:9094)
--kafka-topic KAFKA_TOPIC
name of the Kafka topic with alert data (default: alerts)
name of the Kafka topic with alert data (default: alerts-simulated)
--kafka-group KAFKA_GROUP
Name of a Kafka Consumer group to run under (default:
alertdb-ingester)
Expand All @@ -70,7 +67,7 @@ optional arguments:
cert. Only used if --kafka-auth-mechanism=scram. (default: )
--schema-registry-address SCHEMA_REGISTRY_ADDRESS
Address of a Confluent Schema Registry server hosting
schemas (default: https://alertschemas-scratch.lsst.codes:443)
schemas (default: https://usdf-alert-schemas-dev.slac.stanford.edu)
```

The ingester needs a Kafka password. It gets this from you via an environment variable, `ALERTDB_KAFKA_PASSWORD`.
Expand Down Expand Up @@ -123,5 +120,16 @@ export ALERT_INGEST_TEST_REGISTRY_URL=https://username:password@alertschemas-scr
export ALERT_INGEST_TEST_GCP_PROJECT=alert-stream
```

Then, `pytest .` will run the integration tests, which create temporary Kafka,
Google Cloud, and Schema Registry resources and run against them.
Then, `pytest .` will run the integration tests, which create temporary Kafka
and Schema Registry resources and run against them. You must have a test bucket created to run tests using
minio.

If running in a new test environment, you will need to install

pip install lsst-alert-packet
pip install --upgrade kafka-python
pip install aiokafka
pip install pytest

And you will need to run the tests using
env pytest TEST_NAME
40 changes: 20 additions & 20 deletions alertingest/bin/alertdb_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from alertingest.ingester import IngestWorker, KafkaConnectionParams
from alertingest.schema_registry import SchemaRegistryClient
from alertingest.storage import GoogleObjectStorageBackend
from alertingest.storage import USDFObjectStorageBackend


def main():
Expand All @@ -15,33 +15,33 @@ def main():
description="Run a worker to copy alerts from Kafka into an object store backend.",
)
parser.add_argument(
"--gcp-project",
"--endpoint-url",
type=str,
default="alert-stream",
help="when using the google-cloud backend, the name of the GCP project",
default=None,
help="when using a remote bucket, the url where the bucket is located",
)
parser.add_argument(
"--gcp-bucket-alerts",
"--bucket-alerts",
type=str,
default="alert-packets",
help="when using the google-cloud backend, the name of the GCS bucket for alert packets",
default="alerts",
help="when using the usdf backend, the name of the s3 bucket for alert packets",
)
parser.add_argument(
"--gcp-bucket-schemas",
"--bucket-schemas",
type=str,
default="alert-schemas",
help="when using the google-cloud backend, the name of the GCS bucket for alert schemas",
default="schema",
help="when using the usdf backend, the name of the s3 bucket for alert schemas",
)
parser.add_argument(
"--kafka-host",
type=str,
default="alertbroker-scratch.lsst.codes",
default="usdf-alert-stream-dev.lsst.cloud:9094",
help="kafka host with alert data",
)
parser.add_argument(
"--kafka-topic",
type=str,
default="alerts",
default="alerts-simulated",
help="name of the Kafka topic with alert data",
)
parser.add_argument(
Expand All @@ -60,7 +60,7 @@ def main():
parser.add_argument(
"--kafka-username",
type=str,
default="admin",
default="kafka-admin",
help="Username to use when connecting to Kafka. Only used if --kafka-auth-mechanism=ssl",
)
parser.add_argument(
Expand Down Expand Up @@ -93,11 +93,11 @@ def main():
parser.add_argument(
"--schema-registry-address",
type=str,
default="https://alertschemas-scratch.lsst.codes:443",
default="https://usdf-alert-schemas-dev.slac.stanford.edu",
help="Address of a Confluent Schema Registry server hosting schemas",
)
parser.add_argument("--verbose", type="store_true", help="log a bunch")
parser.add_argument("--debug", type="store_true", help="log even more")
parser.add_argument("--verbose", action="store_true", help="log a bunch")
parser.add_argument("--debug", action="store_true", help="log even more")

args = parser.parse_args()

Expand Down Expand Up @@ -128,10 +128,10 @@ def main():
else:
raise AssertionError("--kafka-auth-mechanism must be either scram or mtls")

backend = GoogleObjectStorageBackend(
args.gcp_project,
args.gcp_bucket_alerts,
args.gcp_bucket_schemas,
backend = USDFObjectStorageBackend(
endpoint_url=args.endpoint_url,
alert_bucket_name=args.bucket_alerts,
schema_bucket_name=args.bucket_schemas,
)
registry = SchemaRegistryClient(args.schema_registry_address)

Expand Down
7 changes: 4 additions & 3 deletions alertingest/ingester.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
A worker which copies alerts and schemas into an object store backend.
"""

import io
import logging
import ssl
Expand Down Expand Up @@ -169,9 +170,9 @@ def _create_scram_consumer(self, auto_offset_reset):
group_id=self.kafka_params.group,
sasl_plain_username=self.kafka_params.username,
sasl_plain_password=self.kafka_params.password,
sasl_mechanism="SCRAM-SHA-256",
security_protocol="SASL_SSL",
ssl_context=ssl_ctx,
sasl_mechanism="SCRAM-SHA-512",
security_protocol="SASL_PLAINTEXT",
ssl_context=None,
enable_auto_commit=False,
auto_offset_reset=auto_offset_reset,
)
Expand Down
13 changes: 8 additions & 5 deletions alertingest/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ def __init__(self, address: str):

def get_raw_schema(self, schema_id: int) -> bytes:
url = f"{self.address}/schemas/ids/{schema_id}"
logger.debug("making request to %s", url)
response = requests.get(url, timeout=5)
response.raise_for_status()
parsed_body = json.loads(response.content)
return parsed_body["schema"]
try:
logger.debug("making request to %s", url)
response = requests.get(url, timeout=5)
response.raise_for_status()
parsed_body = json.loads(response.content)
return parsed_body["schema"]
except requests.exceptions.HTTPError:
raise KeyError(f"Schema ID {schema_id} not found")

def get_schema_decoder(self, schema_id: int) -> Decoder:
# If we've already constructed a decoder, use it.
Expand Down
Loading
Loading