-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Minor Refactoring and fixes #15419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Minor Refactoring and fixes #15419
Conversation
// before acquiring semaphore. These scenarios can lead to creation of small segments because segment | ||
// consumption is terminated early. | ||
_consumeStartTime = now(); | ||
setConsumeEndTime(_segmentZKMetadata, _consumeStartTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much benefit of this. Since mostly the endCriteriaTime is determined from zkMetadata only and extra consume time would be max 10 minutes in rare cases.
Probably need a revisit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has side effect of different replicas stops at different time. We need to use the time in the constructor so that the time is relatively aligned for the commit protocol to work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Although this can happen currently as well where The consuming segment was just about to be completed, but that server went down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #15419 +/- ##
============================================
- Coverage 62.95% 62.90% -0.05%
- Complexity 1381 1383 +2
============================================
Files 2848 2863 +15
Lines 161547 162439 +892
Branches 24752 24879 +127
============================================
+ Hits 101700 102182 +482
- Misses 52167 52573 +406
- Partials 7680 7684 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
@@ -518,15 +518,13 @@ public void addConsumingSegment(String segmentName) | |||
|
|||
private void doAddConsumingSegment(String segmentName) | |||
throws AttemptsExceededException, RetriableOperationException { | |||
SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName); | |||
if (!_enforceConsumptionInOrder && zkMetadata.getStatus().isCompleted()) { | |||
SegmentZKMetadata zkMetadata = fetchZKMetadataNullable(segmentName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do want it to throw exception when ZK metadata is removed. This is an unexpected scenario
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do want it to throw exception when ZK metadata is removed.
That will put segment in error state.
This is an unexpected scenario
Why? Like this is valid scenario:
- offline -> consuming
- consuming -> dropped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't conclude actually if this is unexpected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to be consistent with other state transition handling. When a segment is deleted, we throw exception during ZK metadata check, and then ERROR -> DROPPED state transition should be able to remove it properly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
E.g. in doAddOnlineSegment()
we don't allow null
ZK metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. fixed this.
} catch (Throwable t) { | ||
// In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from | ||
// ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released. | ||
// Hence releasing the semaphore here to unblock reset operation via Helix Admin. | ||
releaseConsumerSemaphore(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch
@@ -1722,16 +1726,13 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf | |||
_state = State.INITIAL_CONSUMING; | |||
_latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000); | |||
_consumeStartTime = now(); | |||
setConsumeEndTime(segmentZKMetadata, _consumeStartTime); | |||
setConsumeEndTime(segmentZKMetadata, now()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) This can be reverted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually _consumeStartTime
should not be initiliased in contructor.
@@ -747,6 +747,10 @@ public void run() { | |||
|
|||
_segmentLogger.info("Acquired consumer semaphore."); | |||
|
|||
_consumeStartTime = now(); | |||
_segmentLogger.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor)
_segmentLogger.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", | |
_segmentLogger.info("Starting consumption on segment: {}, maxRowCount: {}, maxEndTime: {}", |
@@ -311,7 +311,7 @@ public void deleteSegmentFile() { | |||
private final AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0); | |||
private final String _instanceId; | |||
private final ServerSegmentCompletionProtocolHandler _protocolHandler; | |||
private final long _consumeStartTime; | |||
private long _consumeStartTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Move it in front of _lastLogTime
if (_timeCheckCounter.incrementAndGet() <= FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 4) { | ||
// now() is called once in the run() method, then once on setting consumeStartTime, once before each batch | ||
// reading and once for every row indexation | ||
if (_timeCheckCounter.incrementAndGet() <= FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 5) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tried removing this painful test depending on _timeCheckCounter but the usage of it is spread across and requires some good refactoring and was taking longer than expected
Minor refactoring post #15404