Skip to content

Minor Refactoring and fixes #15419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 15, 2025
Merged

Minor Refactoring and fixes #15419

merged 11 commits into from
Apr 15, 2025

Conversation

noob-se7en
Copy link
Contributor

Minor refactoring post #15404

// before acquiring semaphore. These scenarios can lead to creation of small segments because segment
// consumption is terminated early.
_consumeStartTime = now();
setConsumeEndTime(_segmentZKMetadata, _consumeStartTime);
Copy link
Contributor Author

@noob-se7en noob-se7en Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not much benefit of this. Since mostly the endCriteriaTime is determined from zkMetadata only and extra consume time would be max 10 minutes in rare cases.
Probably need a revisit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has side effect of different replicas stops at different time. We need to use the time in the constructor so that the time is relatively aligned for the commit protocol to work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Although this can happen currently as well where The consuming segment was just about to be completed, but that server went down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@noob-se7en noob-se7en changed the title Refresh segment consumption end time Minor Refactoring Mar 31, 2025
@codecov-commenter
Copy link

codecov-commenter commented Mar 31, 2025

Codecov Report

Attention: Patch coverage is 75.00000% with 2 lines in your changes missing coverage. Please review.

Project coverage is 62.90%. Comparing base (9145e92) to head (d8023c7).
Report is 22 commits behind head on master.

Files with missing lines Patch % Lines
...a/manager/realtime/RealtimeSegmentDataManager.java 85.71% 0 Missing and 1 partial ⚠️
...ata/manager/realtime/RealtimeTableDataManager.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #15419      +/-   ##
============================================
- Coverage     62.95%   62.90%   -0.05%     
- Complexity     1381     1383       +2     
============================================
  Files          2848     2863      +15     
  Lines        161547   162439     +892     
  Branches      24752    24879     +127     
============================================
+ Hits         101700   102182     +482     
- Misses        52167    52573     +406     
- Partials       7680     7684       +4     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 62.82% <75.00%> (-0.09%) ⬇️
java-21 62.87% <75.00%> (-0.07%) ⬇️
skip-bytebuffers-false 62.83% <75.00%> (-0.12%) ⬇️
skip-bytebuffers-true 62.86% <75.00%> (-0.04%) ⬇️
temurin 62.90% <75.00%> (-0.05%) ⬇️
unittests 62.90% <75.00%> (-0.05%) ⬇️
unittests1 55.86% <75.00%> (-0.06%) ⬇️
unittests2 33.55% <0.00%> (-0.13%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@noob-se7en noob-se7en requested a review from Jackie-Jiang April 10, 2025 16:36
@noob-se7en noob-se7en changed the title Minor Refactoring Minor Refactoring and fixes Apr 10, 2025
@noob-se7en noob-se7en changed the title Minor Refactoring and fixes Minor Refactoring and fixes in Ingestion Apr 10, 2025
@@ -518,15 +518,13 @@ public void addConsumingSegment(String segmentName)

private void doAddConsumingSegment(String segmentName)
throws AttemptsExceededException, RetriableOperationException {
SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
if (!_enforceConsumptionInOrder && zkMetadata.getStatus().isCompleted()) {
SegmentZKMetadata zkMetadata = fetchZKMetadataNullable(segmentName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do want it to throw exception when ZK metadata is removed. This is an unexpected scenario

Copy link
Contributor Author

@noob-se7en noob-se7en Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do want it to throw exception when ZK metadata is removed.

That will put segment in error state.

This is an unexpected scenario

Why? Like this is valid scenario:

  1. offline -> consuming
  2. consuming -> dropped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't conclude actually if this is unexpected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to be consistent with other state transition handling. When a segment is deleted, we throw exception during ZK metadata check, and then ERROR -> DROPPED state transition should be able to remove it properly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g. in doAddOnlineSegment() we don't allow null ZK metadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. fixed this.

} catch (Throwable t) {
// In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from
// ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released.
// Hence releasing the semaphore here to unblock reset operation via Helix Admin.
releaseConsumerSemaphore();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

@@ -1722,16 +1726,13 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
_state = State.INITIAL_CONSUMING;
_latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000);
_consumeStartTime = now();
setConsumeEndTime(segmentZKMetadata, _consumeStartTime);
setConsumeEndTime(segmentZKMetadata, now());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) This can be reverted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually _consumeStartTime should not be initiliased in contructor.

@@ -747,6 +747,10 @@ public void run() {

_segmentLogger.info("Acquired consumer semaphore.");

_consumeStartTime = now();
_segmentLogger.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor)

Suggested change
_segmentLogger.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}",
_segmentLogger.info("Starting consumption on segment: {}, maxRowCount: {}, maxEndTime: {}",

@@ -311,7 +311,7 @@ public void deleteSegmentFile() {
private final AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0);
private final String _instanceId;
private final ServerSegmentCompletionProtocolHandler _protocolHandler;
private final long _consumeStartTime;
private long _consumeStartTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Move it in front of _lastLogTime

if (_timeCheckCounter.incrementAndGet() <= FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 4) {
// now() is called once in the run() method, then once on setting consumeStartTime, once before each batch
// reading and once for every row indexation
if (_timeCheckCounter.incrementAndGet() <= FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 5) {
Copy link
Contributor Author

@noob-se7en noob-se7en Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried removing this painful test depending on _timeCheckCounter but the usage of it is spread across and requires some good refactoring and was taking longer than expected

@noob-se7en noob-se7en requested a review from Jackie-Jiang April 10, 2025 21:08
@noob-se7en noob-se7en changed the title Minor Refactoring and fixes in Ingestion Minor Refactoring and fixes Apr 11, 2025
@Jackie-Jiang Jackie-Jiang merged commit 1fe72ae into apache:master Apr 15, 2025
21 of 22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants