Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Helix threads getting blocked for Realtime Tables #14771

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

noob-se7en
Copy link
Contributor

@noob-se7en noob-se7en commented Jan 7, 2025

Problem
Consider a scenario for Realtime Tables where

  1. Table replication > 1
  2. Table consuming from 'k' Kafka partitions
  3. A subset of servers is slow in consuming segments. They can be slow in consumption due to various reasons like: partial upsert related logic handling, etc.
  4. Fast server has committed multiple segments.

In above case for slow servers, k helix threads will be in running state consuming data slowly from k partitions (result of catchup to final offset).
But fast servers will commit segments and will be quite ahead from slow servers.
In above case, the slow servers can eventually end up in a state where no helix task thread is free (MAX_HELIX_TASK_THREADS - k will be in a waiting state as they are waiting to acquire partition semaphore for onBecomeConsumingFromOffline event.)

Solution

  1. Change semaphore acquire logic such that if a Helix threads fails to acquire the semaphore, check if the fast server has already committed the segment. If yes, simply return as eventually the slow server will receive a consuming -> online state transition message for the same segment.

@noob-se7en noob-se7en changed the title Fixes Helix threads getting blocked edge case Fixes Helix threads getting blocked edge case for Realtime Tables Jan 7, 2025
@noob-se7en noob-se7en changed the title Fixes Helix threads getting blocked edge case for Realtime Tables Fixes Helix threads getting blocked for Realtime Tables Jan 7, 2025
@codecov-commenter
Copy link

codecov-commenter commented Jan 7, 2025

Codecov Report

Attention: Patch coverage is 0% with 12 lines in your changes missing coverage. Please review.

Project coverage is 63.92%. Comparing base (59551e4) to head (922f6e2).
Report is 1557 commits behind head on master.

Files with missing lines Patch % Lines
...a/manager/realtime/RealtimeSegmentDataManager.java 0.00% 4 Missing and 1 partial ⚠️
...ata/manager/realtime/RealtimeTableDataManager.java 0.00% 5 Missing ⚠️
...anager/realtime/SegmentAlreadyExistsException.java 0.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14771      +/-   ##
============================================
+ Coverage     61.75%   63.92%   +2.17%     
- Complexity      207     1610    +1403     
============================================
  Files          2436     2705     +269     
  Lines        133233   150986   +17753     
  Branches      20636    23322    +2686     
============================================
+ Hits          82274    96514   +14240     
- Misses        44911    47252    +2341     
- Partials       6048     7220    +1172     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.84% <0.00%> (+2.13%) ⬆️
java-21 63.78% <0.00%> (+2.16%) ⬆️
skip-bytebuffers-false 63.90% <0.00%> (+2.15%) ⬆️
skip-bytebuffers-true 63.72% <0.00%> (+35.99%) ⬆️
temurin 63.92% <0.00%> (+2.17%) ⬆️
unittests 63.91% <0.00%> (+2.17%) ⬆️
unittests1 56.29% <0.00%> (+9.40%) ⬆️
unittests2 34.20% <0.00%> (+6.47%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully follow the analysis. Consuming thread is not helix thread. If you find the lock not being released, there should be a different cause for it.
What symptoms did you observe?

@noob-se7en
Copy link
Contributor Author

@Jackie-Jiang the consuming threads will be interrupted during consuming -> online transition and catch up to final offset will be done by the Helix Thread (HelixTaskExecutor-message_handle_thread).

@Jackie-Jiang
Copy link
Contributor

I'm curious about why the lock is not released. Slower consumer shouldn't cause lock to be held. That is the core to this problem

@noob-se7en
Copy link
Contributor Author

I'm curious about why the lock is not released. Slower consumer shouldn't cause lock to be held. That is the core to this problem

Yes, I understand the core of the problem is that the semaphore is acquired for too long by the consumers and we should focus on that problem. There has been 2 incidents where the blame has been put on partial upserts.
For Instance:

  1. A table with partial upserts enabled resulted in all helix threads being blocked.
  2. A server took 16 hours to load consuming segment for table which had partial upsert enabled.

What I am trying to address here is that we should not run into situation where slow consuming segments block entire ingestion. For example - If we have K Kafka partitions, K helix threads acquiring the semaphore (catching up to offset) should not stop downloading of other segments for same table OR consumption of segments for diff tables.

@9aman
Copy link
Contributor

9aman commented Jan 8, 2025

I'm curious about why the lock is not released. Slower consumer shouldn't cause lock to be held. That is the core to this problem

Yes, I understand the core of the problem is that the semaphore is acquired for too long by the consumers and we should focus on that problem. There has been 2 incidents where the blame has been put on partial upserts. For Instance:

  1. A table with partial upserts enabled resulted in all helix threads being blocked.
  2. A server took 16 hours to load consuming segment for table which had partial upsert enabled.

What I am trying to address here is that we should not run into situation where slow consuming segments block entire ingestion. For example - If we have K Kafka partitions, K helix threads acquiring the semaphore (catching up to offset) should not stop downloading of other segments for same table OR consumption of segments for diff tables.

The cathcup is allowed only for 30 seconds post which the segment is downloaded. Correct me if I am wrong.
How is that blocking everything ?

@noob-se7en
Copy link
Contributor Author

The cathcup is allowed only for 30 seconds post which the segment is downloaded. Correct me if I am wrong. How is that blocking everything ?

Yes, correct but these threads are still blocking the remaining threads right (30s condition is checked after indexing each message batch)? Do you see any disadvantage of current approach. It's better to do timed acquire on semaphore than the current approach to avoid extremely slow ingestion.

// reload segment metadata to get latest status
segmentZKMetadata = _realtimeTableDataManager.fetchZKMetadata(_segmentNameStr);

if (segmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this check to == Status.DONE

reason being in pauseless, the segment can be in a new COMMITTING state where it still doesn't have download url.

// reload segment metadata to get latest status
segmentZKMetadata = _realtimeTableDataManager.fetchZKMetadata(_segmentNameStr);

if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to first check if segmentZKMetadata != null. In corner cases, latest consuming segment might be deleted, and it can be null

Copy link
Contributor

@KKcorps KKcorps Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also cover the case where segment status is UPLOADED

@@ -1619,7 +1620,17 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf

// Acquire semaphore to create stream consumers
try {
_partitionGroupConsumerSemaphore.acquire();
while (!_partitionGroupConsumerSemaphore.tryAcquire(5, TimeUnit.MINUTES)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are doing a timed acquire, let's also log a warning if it cannot get the lock, and also log the wait time so far

@Jackie-Jiang
Copy link
Contributor

This PR can be used as a workaround. As @9aman pointed out, the catch up should take only 30 seconds. Maybe there is a bug which prevents the lock to be released when the consumption is slow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants