-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not fail Trino is Kafka event listener cannot connect #23606
base: master
Are you sure you want to change the base?
Do not fail Trino is Kafka event listener cannot connect #23606
Conversation
@GuardedBy("this") | ||
private Instant lastKafkaProducerTime = Instant.EPOCH; | ||
|
||
private final Executor connectExecutor = Executors.newSingleThreadScheduledExecutor(daemonThreadsNamed("kafka-connect-%s")); | ||
|
||
public KafkaEventPublisher(KafkaEventListenerConfig config, KafkaProducerFactory producerFactory, KafkaEventListenerJmxStats stats) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is wrapped in try catch
in io.trino.plugin.eventlistener.kafka.KafkaEventListener#KafkaEventListener
. Why it's not sufficient?
private static final java.time.Duration KAFKA_CONNECT_INTERVAL = Duration.of(10, SECONDS); | ||
|
||
@PostConstruct | ||
public void createKafkaProducer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate commit? Only for logging purposes?
throw lastKafkaProducerFailure; | ||
} | ||
try { | ||
kafkaProducer = kafkaProducerSupplier.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can it be long operation that could throttle query concurrency. Should it be async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - maybe we should try to create it when bootstrapping, and then just have a loop which tries to reinitialize if boostrapping failed.
Then we would not call kafkaProducerSupplier
here.
if (kafkaProducer != null) { | ||
return kafkaProducer; | ||
} | ||
if (lastKafkaProducerTime.plus(KAFKA_CONNECT_INTERVAL).isAfter(now())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it solve really? I think Kafka listener should either be present or not. There is kafka-event-listener.terminate-on-initialization-failure
if we don't want Trino startup to fail in case listener creation fails
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh - I did not know that there is this mechanism. It solves problem of not terminating trino if initialization fails, yeah. But do you want it to not try to connect again when intialization failed? Seems like it should try until it establishes connection. Similar like it would if you loose connection to Kafka while Trino is running.
All bootstrap-related errors should be caught here: https://github.com/trinodb/trino/blob/master/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java#L52-L66 A related question: if Kafka event listener cannot connect to the brokers, isn't that sufficient reason to fail the cluster and let the user know so they can investigate? There can be critical observability infra built on top of the event-listener, so having no data coming in (but the cluster executing queries) can be misleading |
How is lack of connectivity at startup different from loosing connectivity while cluster is up and running? |
eaa9f41
to
53c2e78
Compare
53c2e78
to
29e2c38
Compare
@sopel39 restrucutred a bit - PTAL |
One could be misconfiguration, the other could be intermittent connectivity issue. E.g. Even if query finished event delivery fails the query will still succeed IIRC, so there isn't much you can do anyway |
} | ||
catch (Exception e) { | ||
if (config.getTerminateOnInitializationFailure()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #23606 (comment). I don't really think we need to complicate it further
while (true) { | ||
try { | ||
// success | ||
KafkaProducer<String, String> kafkaProducer = kafkaProducerSupplier.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to recreate it? Will kafkaProducer
stop working permanently in case of some intermittent issues?
@@ -47,7 +47,7 @@ public class KafkaEventListenerConfig | |||
private Set<String> excludedFields = Collections.emptySet(); | |||
private Map<String, String> kafkaClientOverrides = Collections.emptyMap(); | |||
private Duration requestTimeout = new Duration(10, SECONDS); | |||
private boolean terminateOnInitializationFailure = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably could just keep this commit, but I still don't understand the rationale? Does it fail in tests or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a 100% sure this is a good change, but I can be convinced. I think that if a user explicitly configures a kafka-event-listener, they would want to know if the brokers are unreachable instead of the plugin silently "failing". They can always set this flag to false explicitly, if they are fine with ignoring this problem.
@losipiuk To clarify the context, what is the problem we are trying to solve? Is it that if we lose connectivity to Kafka while Trino is already running, then our queries start failing? Because it shouldn't be the case right now, we should just hit a timeout exception here: https://github.com/trinodb/trino/blob/master/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java#L103 |
The problem (maybe not important) is that we generally want Trino to start up if there are networking problems during start time. But with just this change if we see kafka is not running on startup it will not self-heal later on if network problem goes away. Queries are not failing of course. |
@losipiuk Thanks, the PR makes more sense to me now.
I would just like to clarify Karol's comment, to make sure the change is really needed, because it does seem to complicate the code logic. Thanks |
Is it motivate by real use case? I still think some startup connectivity validation is needed before cluster is considered ready for use. Otherwise if we cannot reach kafka later on, we should probably fail queries |
It is motivated by the real use case. But I agree the decision what should be done in such a case how important it is for event-listener to work. I would say that most of the time we probably care about queries working more than about event listener - but maybe there are cases where having queries working but event listener not, can cause lots of problems. |
I added a suggested RN entry .. looks good @losipiuk ? |
Similar discussion was around #22911 (see https://trinodb.slack.com/archives/GKZ8GS0SK/p1723336559476059?thread_ts=1723222353.289599&cid=GKZ8GS0SK) and we've decided that anything that makes events non-durable by default could be violation of auditing policy and is a breaking change. This PR adds some complexity, which I'm not sure is needed TBH |
That is indeed true. Hence the current check is during cluster startup, not while queries are running already. |
Description
We do not want Trino to fail startup on intermittent network failure (e.g. DNS not being available).
Not sure if that need RN.
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
(x) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: