-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka shim #11
base: master
Are you sure you want to change the base?
Kafka shim #11
Conversation
blueox/__init__.py
Outdated
be silently dropped. | ||
|
||
Currently we support logging through the network (and the configured host and port) to a blueoxd instances, or | ||
to the specified recorder function | ||
""" | ||
if recorder: | ||
if int(OVERRIDE_KAFKA_RECORDER) == 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not gonna lie, not a fan of magical overrides via env vars. Why not just have it come in as a recorder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, fair. I was hoping to avoid a bunch of edits throughout postal changing how it's called, but it would be simple enough to call
import blueox
from blueox.recorders import kafka
blueox.configure(recorder=kafka.send)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh huh, wait, if thats the case no go back to your original way lol
blueox/recorders/kafka.py
Outdated
if _kafka_hosts and threadLocal.kp is not None: | ||
try: | ||
log.debug("Sending msg") | ||
threadLocal.kp.send('events', context_data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually going to get really slow and backup blueox and cause an outage.
You need a non blocking way to send these. Also I do worry a little bit about the threadlocal approach, because Python and threading never play well together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer
Per the docs, it's thread-safe and async
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also a number of kwargs that can configure this better to prevent things from backing up:
-
acks (0, 1, 'all') –
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:0: Producer will not wait for any acknowledgment from the server.
The message will immediately be added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1.
1: Wait for leader to write the record to its local log only.
Broker will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
all: Wait for the full set of in-sync replicas to write the record.
This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
If unset, defaults to acks=1.
-
retries (int) – Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max_in_flight_requests_per_connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Default: 0.
-
max_block_ms (int) – Number of milliseconds to block during send() and partitions_for(). These methods can be blocked either because the buffer is full or metadata unavailable. Blocking in the user-supplied serializers or partitioner will not be counted against this timeout. Default: 60000.
-
retry_backoff_ms (int) – Milliseconds to backoff when retrying on errors. Default: 100.
request_timeout_ms (int) – Client request timeout in milliseconds. Default: 30000.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My $.02, let's not tightly couple postal-main to Kafka. kafka-python uses a thread under the hood to send events and things can go unexpected in the presence of multiprocessing and monkey patching (which we may do some day, who knows).
Proposal:
-
We configure a separate source / sink in cernan / cernan-events for blueox events:
- Kafka bootstrapping is done async.
- We can tune the blueox sources/sinks to discard (or not) in the presence of Kafka slowness as we see fit.
- Financial events are already slated to use cernan, so we are properly entrenched already.
-
Using pycernan forces us to use avro for blueox like we are going to do with everything else.
-
RUST!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me. Takes the burden of async off of blueox/postal, and I was considering the avro route anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re: cernan, I was under the impression that we're migrating to fluentbit for log transport?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fluentbit is just replacing cernan when it comes to consuming on disk logs, no?
Now we get into a meaty discussion around whether or not blueox emits logs or events.
There's a semi-simple way to override the
_recorder_function
for all of BlueOx. This PR aims to provide an easily configurable way to access this override to publish events to Pycernan instead of Zmq.Todo:
contrib
to use pycernan first, falling back to zmq