-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes Helix threads getting blocked for Realtime Tables #14771
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14771 +/- ##
============================================
+ Coverage 61.75% 63.92% +2.17%
- Complexity 207 1610 +1403
============================================
Files 2436 2705 +269
Lines 133233 150986 +17753
Branches 20636 23322 +2686
============================================
+ Hits 82274 96514 +14240
- Misses 44911 47252 +2341
- Partials 6048 7220 +1172
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully follow the analysis. Consuming thread is not helix thread. If you find the lock not being released, there should be a different cause for it.
What symptoms did you observe?
@Jackie-Jiang the consuming threads will be interrupted during |
I'm curious about why the lock is not released. Slower consumer shouldn't cause lock to be held. That is the core to this problem |
Yes, I understand the core of the problem is that the semaphore is acquired for too long by the consumers and we should focus on that problem. There has been 2 incidents where the blame has been put on partial upserts.
What I am trying to address here is that we should not run into situation where slow consuming segments block entire ingestion. For example - If we have |
The cathcup is allowed only for 30 seconds post which the segment is downloaded. Correct me if I am wrong. |
Yes, correct but these threads are still blocking the remaining threads right (30s condition is checked after indexing each message batch)? Do you see any disadvantage of current approach. It's better to do timed acquire on semaphore than the current approach to avoid extremely slow ingestion. |
// reload segment metadata to get latest status | ||
segmentZKMetadata = _realtimeTableDataManager.fetchZKMetadata(_segmentNameStr); | ||
|
||
if (segmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.IN_PROGRESS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this check to == Status.DONE
reason being in pauseless, the segment can be in a new COMMITTING state where it still doesn't have download url.
// reload segment metadata to get latest status | ||
segmentZKMetadata = _realtimeTableDataManager.fetchZKMetadata(_segmentNameStr); | ||
|
||
if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to first check if segmentZKMetadata != null
. In corner cases, latest consuming segment might be deleted, and it can be null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also cover the case where segment status is UPLOADED
@@ -1619,7 +1620,17 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf | |||
|
|||
// Acquire semaphore to create stream consumers | |||
try { | |||
_partitionGroupConsumerSemaphore.acquire(); | |||
while (!_partitionGroupConsumerSemaphore.tryAcquire(5, TimeUnit.MINUTES)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are doing a timed acquire, let's also log a warning if it cannot get the lock, and also log the wait time so far
This PR can be used as a workaround. As @9aman pointed out, the catch up should take only 30 seconds. Maybe there is a bug which prevents the lock to be released when the consumption is slow. |
Problem
Consider a scenario for Realtime Tables where
In above case for slow servers, k helix threads will be in running state consuming data slowly from k partitions (result of catchup to final offset).
But fast servers will commit segments and will be quite ahead from slow servers.
In above case, the slow servers can eventually end up in a state where no helix task thread is free (
MAX_HELIX_TASK_THREADS - k
will be in a waiting state as they are waiting to acquire partition semaphore foronBecomeConsumingFromOffline
event.)Solution
consuming -> online
state transition message for the same segment.