-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pauseless ingestion without failure scenarios #14741
Pauseless ingestion without failure scenarios #14741
Conversation
1. Changing FSM 2. Changing the 3 steps performed during the commit protocol to update ZK and Ideal state
1. Changes in the commit protocol to start segment commit before the build 2. Changes in the BaseTableDataManager to ensure that the locally built segment is replaced by a downloaded one only when the CRC is present in the ZK Metadata 3. Changes in the download segment method to allow waited download in case of pauseless consumption
…segment commit end metadata call Refactoing code for redability
… ingestion by moving it out of streamConfigMap
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14741 +/- ##
============================================
+ Coverage 61.75% 63.80% +2.05%
- Complexity 207 1611 +1404
============================================
Files 2436 2707 +271
Lines 133233 151153 +17920
Branches 20636 23345 +2709
============================================
+ Hits 82274 96441 +14167
- Misses 44911 47491 +2580
- Partials 6048 7221 +1173
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
@@ -119,6 +121,10 @@ public class RealtimeTableDataManager extends BaseTableDataManager { | |||
|
|||
public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); | |||
|
|||
public static final long TIMEOUT_MINUTES = 5; | |||
public static final long TIMEOUT_MS = TIMEOUT_MINUTES * 60 * 1000; | |||
public static final long SLEEP_INTERVAL_MS = 30000; // 30 seconds sleep interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make whatever of these possible as configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have made it configurable via table config. The tableConfig will be fetched at every download call of a segment only for pauseless tables to ensure that this can be changed without requiring server restarts.
@@ -459,4 +460,12 @@ public Set<String> getAllReferencedColumns() { | |||
} | |||
return allColumns; | |||
} | |||
|
|||
public boolean isPauselessConsumptionEnabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove this. It is fine to just keep in StreamIngestionConfig and don't support IndexingConfig for backward compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
import static org.testng.Assert.assertTrue; | ||
|
||
|
||
public class PauselessRealtimeIngestionIntegrationTest extends BaseClusterIntegrationTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to add another test as well that verifies nothing goes wrong when a live table that's consuming data is switching from blocking to pauseless and vice-versa
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job!
Currently pauseless is determined by table config on both controller and server. Will it have problem if user changes table config from blocking to pauseless in the middle of the consumption? Or user needs to first pause consumption, then make the switch?
@@ -44,6 +44,8 @@ public class SegmentZKMetadata implements ZKMetadata { | |||
private boolean _endTimeMsCached; | |||
private long _endTimeMs; | |||
|
|||
public static final long DEFAULT_CRC_VALUE = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this is introduced to identify the committing segment. Suggest checking the status()
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing out. Yes we should be relying on status
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have replaced all the checks with status checks.
|
||
|
||
public class PauselessConsumptionUtils { | ||
public static final String PAUSELESS_CONSUMPTION_ENABLED = "pauselessConsumptionEnabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
* @return true if pauseless consumption is explicitly enabled, false otherwise | ||
* @throws NullPointerException if tableConfig is null | ||
*/ | ||
public static boolean isPauselessEnabled(@NotNull TableConfig tableConfig) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this config is newly added, we don't need to add it in 2 places. Let's just keep it within the StreamIngestionConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have removed it from IndexingConfig and retained it only in StreamIngestionConfig
* from the map, and things start over. In this case, we respond to the server with a 'hold' so | ||
* that they re-transmit their segmentConsumed() message and start over. | ||
*/ | ||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overridden methods in this class are almost identical to the original ones. Suggest isolating the modified part as a separate method to improve the readability.
For this method, can we override committerNotifiedCommit()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have refactored the code to reduce repetition.
// this aims to handle the failures during commitSegmentStartMetadata | ||
// we abort the state machine to allow commit protocol to start from the beginning | ||
// the server would then retry the commit protocol from the start | ||
return abortAndReturnFailed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to explicitly catch and handle exception here? Seems we are not catching exception in other places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commitStart call of PauselessFSM is similar to commitEnd call of BlockingFSM. In the commitEnd of BlockingFSM we do catch the exception and abort the FSM along with returning a failure.
Aborting FSM helps other replicas to start the commit. I have kept the behavior same as before.
if (_peerDownloadScheme != null) { | ||
_logger.info("Peer download is enabled for the segment: {}", zkMetadata.getSegmentName()); | ||
try { | ||
return downloadSegmentFromPeers(zkMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check external view to find the ONLINE
replica (i.e. call PeerServerSegmentFinder.getOnlineServersFromExternalView()
), then start downloading only if we can find one server online. This way we can prevent the exponential backoff within the method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have added the check where presence of Online server is a pre-requisite for us to attempt peer download.
I didn't realize that directly going for peer-download would introduce yet another retry mechanism. We already are retrying.
Thanks for pointing it out.
@@ -119,6 +121,10 @@ public class RealtimeTableDataManager extends BaseTableDataManager { | |||
|
|||
public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); | |||
|
|||
public static final long TIMEOUT_MINUTES = 5; | |||
public static final long TIMEOUT_MS = TIMEOUT_MINUTES * 60 * 1000; | |||
public static final long SLEEP_INTERVAL_MS = 30000; // 30 seconds sleep interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Thread.sleep(Math.min(SLEEP_INTERVAL_MS, timeRemaining)); | ||
} | ||
|
||
// If we exit the loop without returning, throw an exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -907,6 +925,22 @@ public void run() { | |||
} | |||
} | |||
|
|||
boolean startSegmentCommit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean startSegmentCommit() { | |
private boolean startSegmentCommit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// See design doc for the new commit protocol: | ||
// https://docs.google.com/document/d/1d-xttk7sXFIOqfyZvYw5W_KeGS6Ztmi8eYevBcCrT_c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's link the PR instead of the design doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have linked the PR and updated the comment.
The user needs to pause the ingestion to enable and disable pauseless. |
2. Fixing few formatting issues. 3. Fixing few comments for better readability. 4. Removing unused code: contructors, constants etc
1. Relying on segment status to make the call on whether to use waited download instead of tableConfig. - This is needed for pauseless tables as the pauseless can be disabled by changing the table config - Segment status is a better decision criteria. 2. Checking presence of an ONLINE server before attempting peer download to prevent waiting for exponential backoff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
minor: We should address having some of the configs from root streamConfig instead from first streamConfigMap
Pauseless Consumption
In the current Apache Pinot architecture, real-time consumption pauses data ingestion until the previous segment's build and upload phases are completed. This process can introduce significant delays, with the build and upload phase potentially taking up to 10 minutes to finish. Consequently, users are unable to query the most recent data, creating a gap in real-time analytics capabilities.
To address this limitation, a new feature is being proposed in this design document. The proposed enhancement would allow Pinot to continue ingesting data during the build and upload phase of the previous segment. Under this design, the system would process new incoming data in a fresh segment while simultaneously completing the build and upload of the older segment. If implemented, this parallel processing approach could provide users with access to more up-to-date information, significantly reducing the lag between data ingestion and query availability.
GH Issue: Link
Scope of the PR
The PR focuses on the happy path of the realtime ingestion i.e. it doesn't cover any failure scenarios. PR (link) tackles the failure scenarios in detail.
Summary
Pauseless ingestion required changing the current segment commit protocol to allow the server to continue ingesting new segment while the existing one is being build and uploaded. Following sequence diagrams capture the current and new segment commit protocol. The details of the changes will be covered in the following sections.
Old segment commit protocol
New segment commit protocol
Changes Introduced in This PR
Controller Side Changes
New ZK State
In the new commit protocol the segment is marked
COMMITTING
at the COMMIT_START call (see sequence diagram above)This is an intermediate state after the segment has been consumed/ ingested but before it is built and uploaded.
PauselessSegmentCompletionFSM
In PauselessSegmentCompletionFSM at the COMMIT_START call, the controller creates ZK metadata for the new consuming segment along with updating the ideal state for the committing (ONLINE) and the new consuming segment (CONSUMING).
For non-committing servers, the controller will respond the server to continue building the segment (KEEP) instead of HOLD after the COMMIT_START is successful. This allows the non-committing servers to continue with the ingestion while the committing segment is being built and uploaded.
Server Side Changes
Changes in the commit protocol
The sequence of events during the segment commit protocol change on the server side.
Old: Build Segment -> COMMIT_START -> Upload segment -> COMMIT_END_METADATA
New: COMMIT_START -> Build Segment -> Upload segment -> COMMIT_END_METADATA
This ensures that a new consuming segment is created by the controller even before the committing segment is built.
Segment Download
In normal ingestion the segment is marked ONLINE, in the ideal state, only after it's been uploaded. This is not the case in the above commit protocol. Thus, a waited download has been added that repeatedly does the following for a set duration:
This becomes crucial for slow replicas as they rely on the download url/ peer download.
Broker side changes
No changes are needed for query.