Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not fail Trino is Kafka event listener cannot connect #23606

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

losipiuk
Copy link
Member

@losipiuk losipiuk commented Sep 30, 2024

Description

We do not want Trino to fail startup on intermittent network failure (e.g. DNS not being available).

Not sure if that need RN.

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
(x) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## General

* Prevent network issues in the Kafka event listener from causing cluster startup problems. ({issue}`23606`)

@GuardedBy("this")
private Instant lastKafkaProducerTime = Instant.EPOCH;

private final Executor connectExecutor = Executors.newSingleThreadScheduledExecutor(daemonThreadsNamed("kafka-connect-%s"));

public KafkaEventPublisher(KafkaEventListenerConfig config, KafkaProducerFactory producerFactory, KafkaEventListenerJmxStats stats)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrapped in try catch in io.trino.plugin.eventlistener.kafka.KafkaEventListener#KafkaEventListener. Why it's not sufficient?

private static final java.time.Duration KAFKA_CONNECT_INTERVAL = Duration.of(10, SECONDS);

@PostConstruct
public void createKafkaProducer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate commit? Only for logging purposes?

throw lastKafkaProducerFailure;
}
try {
kafkaProducer = kafkaProducerSupplier.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be long operation that could throttle query concurrency. Should it be async?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - maybe we should try to create it when bootstrapping, and then just have a loop which tries to reinitialize if boostrapping failed.
Then we would not call kafkaProducerSupplier here.

if (kafkaProducer != null) {
return kafkaProducer;
}
if (lastKafkaProducerTime.plus(KAFKA_CONNECT_INTERVAL).isAfter(now())) {
Copy link
Member

@sopel39 sopel39 Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it solve really? I think Kafka listener should either be present or not. There is kafka-event-listener.terminate-on-initialization-failure if we don't want Trino startup to fail in case listener creation fails

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh - I did not know that there is this mechanism. It solves problem of not terminating trino if initialization fails, yeah. But do you want it to not try to connect again when intialization failed? Seems like it should try until it establishes connection. Similar like it would if you loose connection to Kafka while Trino is running.

@marton-bod
Copy link
Contributor

All bootstrap-related errors should be caught here: https://github.com/trinodb/trino/blob/master/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java#L52-L66
Wouldn't it be sufficient to make terminateOnInitializationFailure default to false? https://github.com/trinodb/trino/blob/master/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListenerConfig.java#L50

A related question: if Kafka event listener cannot connect to the brokers, isn't that sufficient reason to fail the cluster and let the user know so they can investigate? There can be critical observability infra built on top of the event-listener, so having no data coming in (but the cluster executing queries) can be misleading

@losipiuk
Copy link
Member Author

All bootstrap-related errors should be caught here: https://github.com/trinodb/trino/blob/master/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java#L52-L66 Wouldn't it be sufficient to make terminateOnInitializationFailure default to false? https://github.com/trinodb/trino/blob/master/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListenerConfig.java#L50

A related question: if Kafka event listener cannot connect to the brokers, isn't that sufficient reason to fail the cluster and let the user know so they can investigate? There can be critical observability infra built on top of the event-listener, so having no data coming in (but the cluster executing queries) can be misleading

How is lack of connectivity at startup different from loosing connectivity while cluster is up and running?

@losipiuk losipiuk force-pushed the lukaszos/drop-unused-variables-a1e32c branch from eaa9f41 to 53c2e78 Compare September 30, 2024 15:14
@losipiuk losipiuk force-pushed the lukaszos/drop-unused-variables-a1e32c branch from 53c2e78 to 29e2c38 Compare September 30, 2024 15:16
@losipiuk
Copy link
Member Author

@sopel39 restrucutred a bit - PTAL

@sopel39
Copy link
Member

sopel39 commented Sep 30, 2024

How is lack of connectivity at startup different from loosing connectivity while cluster is up and running?

One could be misconfiguration, the other could be intermittent connectivity issue. E.g. Even if query finished event delivery fails the query will still succeed IIRC, so there isn't much you can do anyway

}
catch (Exception e) {
if (config.getTerminateOnInitializationFailure()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #23606 (comment). I don't really think we need to complicate it further

while (true) {
try {
// success
KafkaProducer<String, String> kafkaProducer = kafkaProducerSupplier.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to recreate it? Will kafkaProducer stop working permanently in case of some intermittent issues?

@@ -47,7 +47,7 @@ public class KafkaEventListenerConfig
private Set<String> excludedFields = Collections.emptySet();
private Map<String, String> kafkaClientOverrides = Collections.emptyMap();
private Duration requestTimeout = new Duration(10, SECONDS);
private boolean terminateOnInitializationFailure = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably could just keep this commit, but I still don't understand the rationale? Does it fail in tests or something else?

Copy link
Contributor

@marton-bod marton-bod Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a 100% sure this is a good change, but I can be convinced. I think that if a user explicitly configures a kafka-event-listener, they would want to know if the brokers are unreachable instead of the plugin silently "failing". They can always set this flag to false explicitly, if they are fine with ignoring this problem.

@marton-bod
Copy link
Contributor

@losipiuk To clarify the context, what is the problem we are trying to solve? Is it that if we lose connectivity to Kafka while Trino is already running, then our queries start failing? Because it shouldn't be the case right now, we should just hit a timeout exception here: https://github.com/trinodb/trino/blob/master/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java#L103
Did you observe something different?

@losipiuk
Copy link
Member Author

losipiuk commented Oct 1, 2024

@losipiuk To clarify the context, what is the problem we are trying to solve?

The problem (maybe not important) is that we generally want Trino to start up if there are networking problems during start time.
For that alone we could only change the TerminateOnInitializationFailure flag to be false by default.

But with just this change if we see kafka is not running on startup it will not self-heal later on if network problem goes away.
The second commit addresses that.

Queries are not failing of course.

@marton-bod
Copy link
Contributor

@losipiuk Thanks, the PR makes more sense to me now.

Why do we need to recreate it? Will kafkaProducer stop working permanently in case of some intermittent issues?

I would just like to clarify Karol's comment, to make sure the change is really needed, because it does seem to complicate the code logic. Thanks

@sopel39
Copy link
Member

sopel39 commented Oct 1, 2024

The problem (maybe not important) is that we generally want Trino to start up if there are networking problems during start time.
For that alone we could only change the TerminateOnInitializationFailure flag to be false by default.

But with just this change if we see kafka is not running on startup it will not self-heal later on if network problem goes away.
The second commit addresses that.

Is it motivate by real use case? I still think some startup connectivity validation is needed before cluster is considered ready for use. Otherwise if we cannot reach kafka later on, we should probably fail queries

@losipiuk
Copy link
Member Author

losipiuk commented Oct 1, 2024

The problem (maybe not important) is that we generally want Trino to start up if there are networking problems during start time.
For that alone we could only change the TerminateOnInitializationFailure flag to be false by default.

But with just this change if we see kafka is not running on startup it will not self-heal later on if network problem goes away.
The second commit addresses that.

Is it motivate by real use case? I still think some startup connectivity validation is needed before cluster is considered ready for use. Otherwise if we cannot reach kafka later on, we should probably fail queries

It is motivated by the real use case. But I agree the decision what should be done in such a case how important it is for event-listener to work. I would say that most of the time we probably care about queries working more than about event listener - but maybe there are cases where having queries working but event listener not, can cause lots of problems.

@mosabua
Copy link
Member

mosabua commented Oct 1, 2024

I added a suggested RN entry .. looks good @losipiuk ?

@losipiuk
Copy link
Member Author

losipiuk commented Oct 1, 2024

Yeah - looks good. Thanks @mosabua.

@sopel39 do you feel strongly it should not go in? Maybe worth a chat offline?

@sopel39
Copy link
Member

sopel39 commented Oct 1, 2024

It is motivated by the real use case. But I agree the decision what should be done in such a case how important it is for event-listener to work. I would say that most of the time we probably care about queries working more than about event listener - but maybe there are cases where having queries working but event listener not, can cause lots of problems.

Similar discussion was around #22911 (see https://trinodb.slack.com/archives/GKZ8GS0SK/p1723336559476059?thread_ts=1723222353.289599&cid=GKZ8GS0SK) and we've decided that anything that makes events non-durable by default could be violation of auditing policy and is a breaking change.

This PR adds some complexity, which I'm not sure is needed TBH

@sopel39
Copy link
Member

sopel39 commented Oct 1, 2024

I would say that most of the time we probably care about queries working more than about event listener

That is indeed true. Hence the current check is during cluster startup, not while queries are running already.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

5 participants