Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pauseless ingestion without failure scenarios #14741

Merged

Conversation

9aman
Copy link
Contributor

@9aman 9aman commented Jan 2, 2025

Pauseless Consumption

In the current Apache Pinot architecture, real-time consumption pauses data ingestion until the previous segment's build and upload phases are completed. This process can introduce significant delays, with the build and upload phase potentially taking up to 10 minutes to finish. Consequently, users are unable to query the most recent data, creating a gap in real-time analytics capabilities.

To address this limitation, a new feature is being proposed in this design document. The proposed enhancement would allow Pinot to continue ingesting data during the build and upload phase of the previous segment. Under this design, the system would process new incoming data in a fresh segment while simultaneously completing the build and upload of the older segment. If implemented, this parallel processing approach could provide users with access to more up-to-date information, significantly reducing the lag between data ingestion and query availability.

GH Issue: Link

Scope of the PR

The PR focuses on the happy path of the realtime ingestion i.e. it doesn't cover any failure scenarios. PR (link) tackles the failure scenarios in detail.

Summary

Pauseless ingestion required changing the current segment commit protocol to allow the server to continue ingesting new segment while the existing one is being build and uploaded. Following sequence diagrams capture the current and new segment commit protocol. The details of the changes will be covered in the following sections.

Old segment commit protocol

Screenshot 2025-01-02 at 5 32 57 PM

New segment commit protocol

Screenshot 2025-01-02 at 5 33 51 PM

Changes Introduced in This PR

Controller Side Changes

New ZK State

In the new commit protocol the segment is marked COMMITTING at the COMMIT_START call (see sequence diagram above)
This is an intermediate state after the segment has been consumed/ ingested but before it is built and uploaded.

PauselessSegmentCompletionFSM

In PauselessSegmentCompletionFSM at the COMMIT_START call, the controller creates ZK metadata for the new consuming segment along with updating the ideal state for the committing (ONLINE) and the new consuming segment (CONSUMING).

Screenshot 2025-01-02 at 6 01 29 PM

For non-committing servers, the controller will respond the server to continue building the segment (KEEP) instead of HOLD after the COMMIT_START is successful. This allows the non-committing servers to continue with the ingestion while the committing segment is being built and uploaded.

Server Side Changes

Changes in the commit protocol

The sequence of events during the segment commit protocol change on the server side.

Old: Build Segment -> COMMIT_START -> Upload segment -> COMMIT_END_METADATA
New: COMMIT_START -> Build Segment -> Upload segment -> COMMIT_END_METADATA

This ensures that a new consuming segment is created by the controller even before the committing segment is built.

Segment Download

In normal ingestion the segment is marked ONLINE, in the ideal state, only after it's been uploaded. This is not the case in the above commit protocol. Thus, a waited download has been added that repeatedly does the following for a set duration:

  1. Checks for presence of download url in the segment ZK metadata. If present, it downloads the segment.
  2. Try peer download if enabled.

This becomes crucial for slow replicas as they rely on the download url/ peer download.

Broker side changes

No changes are needed for query.

9aman added 5 commits January 2, 2025 16:57
1. Changing FSM
2. Changing the 3 steps performed during the commit protocol to update ZK and Ideal state
1. Changes in the commit protocol to start segment commit before the build
2. Changes in the BaseTableDataManager to ensure that the locally built segment is replaced by a downloaded one
   only when the CRC is present in the ZK Metadata
3. Changes in the download segment method to allow waited download in case of pauseless consumption
…segment commit end metadata call

Refactoing code for redability
… ingestion by moving it out of streamConfigMap
@codecov-commenter
Copy link

codecov-commenter commented Jan 2, 2025

Codecov Report

Attention: Patch coverage is 29.81651% with 153 lines in your changes missing coverage. Please review.

Project coverage is 63.80%. Comparing base (59551e4) to head (81ea9ee).
Report is 1557 commits behind head on master.

Files with missing lines Patch % Lines
...x/core/realtime/PauselessSegmentCompletionFSM.java 0.00% 42 Missing ⚠️
...ata/manager/realtime/RealtimeTableDataManager.java 2.32% 42 Missing ⚠️
.../core/realtime/PinotLLCRealtimeSegmentManager.java 59.45% 28 Missing and 2 partials ⚠️
...a/manager/realtime/RealtimeSegmentDataManager.java 0.00% 15 Missing and 1 partial ⚠️
...ta/manager/realtime/PauselessSegmentCommitter.java 0.00% 13 Missing ⚠️
...ix/core/realtime/BlockingSegmentCompletionFSM.java 62.50% 6 Missing ⚠️
.../pinot/core/data/manager/BaseTableDataManager.java 60.00% 0 Missing and 2 partials ⚠️
...data/manager/realtime/SegmentCommitterFactory.java 0.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14741      +/-   ##
============================================
+ Coverage     61.75%   63.80%   +2.05%     
- Complexity      207     1611    +1404     
============================================
  Files          2436     2707     +271     
  Lines        133233   151153   +17920     
  Branches      20636    23345    +2709     
============================================
+ Hits          82274    96441   +14167     
- Misses        44911    47491    +2580     
- Partials       6048     7221    +1173     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.78% <29.81%> (+2.07%) ⬆️
java-21 63.68% <29.81%> (+2.05%) ⬆️
skip-bytebuffers-false 63.80% <29.81%> (+2.05%) ⬆️
skip-bytebuffers-true 63.66% <29.81%> (+35.93%) ⬆️
temurin 63.80% <29.81%> (+2.05%) ⬆️
unittests 63.79% <29.81%> (+2.05%) ⬆️
unittests1 56.27% <11.76%> (+9.38%) ⬆️
unittests2 34.10% <25.68%> (+6.36%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@9aman 9aman marked this pull request as ready for review January 2, 2025 14:08
@KKcorps KKcorps requested a review from Jackie-Jiang January 3, 2025 03:40
@@ -119,6 +121,10 @@ public class RealtimeTableDataManager extends BaseTableDataManager {

public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5);

public static final long TIMEOUT_MINUTES = 5;
public static final long TIMEOUT_MS = TIMEOUT_MINUTES * 60 * 1000;
public static final long SLEEP_INTERVAL_MS = 30000; // 30 seconds sleep interval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make whatever of these possible as configurable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made it configurable via table config. The tableConfig will be fetched at every download call of a segment only for pauseless tables to ensure that this can be changed without requiring server restarts.

@@ -459,4 +460,12 @@ public Set<String> getAllReferencedColumns() {
}
return allColumns;
}

public boolean isPauselessConsumptionEnabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this. It is fine to just keep in StreamIngestionConfig and don't support IndexingConfig for backward compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import static org.testng.Assert.assertTrue;


public class PauselessRealtimeIngestionIntegrationTest extends BaseClusterIntegrationTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add another test as well that verifies nothing goes wrong when a live table that's consuming data is switching from blocking to pauseless and vice-versa

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job!

Currently pauseless is determined by table config on both controller and server. Will it have problem if user changes table config from blocking to pauseless in the middle of the consumption? Or user needs to first pause consumption, then make the switch?

@@ -44,6 +44,8 @@ public class SegmentZKMetadata implements ZKMetadata {
private boolean _endTimeMsCached;
private long _endTimeMs;

public static final long DEFAULT_CRC_VALUE = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this is introduced to identify the committing segment. Suggest checking the status() instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. Yes we should be relying on status

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have replaced all the checks with status checks.



public class PauselessConsumptionUtils {
public static final String PAUSELESS_CONSUMPTION_ENABLED = "pauselessConsumptionEnabled";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

* @return true if pauseless consumption is explicitly enabled, false otherwise
* @throws NullPointerException if tableConfig is null
*/
public static boolean isPauselessEnabled(@NotNull TableConfig tableConfig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this config is newly added, we don't need to add it in 2 places. Let's just keep it within the StreamIngestionConfig

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed it from IndexingConfig and retained it only in StreamIngestionConfig

* from the map, and things start over. In this case, we respond to the server with a 'hold' so
* that they re-transmit their segmentConsumed() message and start over.
*/
@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overridden methods in this class are almost identical to the original ones. Suggest isolating the modified part as a separate method to improve the readability.
For this method, can we override committerNotifiedCommit()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refactored the code to reduce repetition.

// this aims to handle the failures during commitSegmentStartMetadata
// we abort the state machine to allow commit protocol to start from the beginning
// the server would then retry the commit protocol from the start
return abortAndReturnFailed();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to explicitly catch and handle exception here? Seems we are not catching exception in other places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commitStart call of PauselessFSM is similar to commitEnd call of BlockingFSM. In the commitEnd of BlockingFSM we do catch the exception and abort the FSM along with returning a failure.

Aborting FSM helps other replicas to start the commit. I have kept the behavior same as before.

if (_peerDownloadScheme != null) {
_logger.info("Peer download is enabled for the segment: {}", zkMetadata.getSegmentName());
try {
return downloadSegmentFromPeers(zkMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check external view to find the ONLINE replica (i.e. call PeerServerSegmentFinder.getOnlineServersFromExternalView()), then start downloading only if we can find one server online. This way we can prevent the exponential backoff within the method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added the check where presence of Online server is a pre-requisite for us to attempt peer download.
I didn't realize that directly going for peer-download would introduce yet another retry mechanism. We already are retrying.

Thanks for pointing it out.

@@ -119,6 +121,10 @@ public class RealtimeTableDataManager extends BaseTableDataManager {

public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5);

public static final long TIMEOUT_MINUTES = 5;
public static final long TIMEOUT_MS = TIMEOUT_MINUTES * 60 * 1000;
public static final long SLEEP_INTERVAL_MS = 30000; // 30 seconds sleep interval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Thread.sleep(Math.min(SLEEP_INTERVAL_MS, timeRemaining));
}

// If we exit the loop without returning, throw an exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -907,6 +925,22 @@ public void run() {
}
}

boolean startSegmentCommit() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
boolean startSegmentCommit() {
private boolean startSegmentCommit() {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 854 to 855
// See design doc for the new commit protocol:
// https://docs.google.com/document/d/1d-xttk7sXFIOqfyZvYw5W_KeGS6Ztmi8eYevBcCrT_c
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's link the PR instead of the design doc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have linked the PR and updated the comment.

@9aman
Copy link
Contributor Author

9aman commented Jan 7, 2025

Good job!

Currently pauseless is determined by table config on both controller and server. Will it have problem if user changes table config from blocking to pauseless in the middle of the consumption? Or user needs to first pause consumption, then make the switch?

The user needs to pause the ingestion to enable and disable pauseless.

9aman added 5 commits January 7, 2025 13:43
2. Fixing few formatting issues.
3. Fixing few comments for better readability.
4. Removing unused code: contructors, constants etc
1. Relying on segment status to make the call on whether to use waited download instead of tableConfig.
   - This is needed for pauseless tables as the pauseless can be disabled by changing the table config
   - Segment status is a better decision criteria.
2. Checking presence of an ONLINE server before attempting peer download to prevent waiting for exponential backoff.
Copy link
Contributor

@KKcorps KKcorps left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
minor: We should address having some of the configs from root streamConfig instead from first streamConfigMap

@Jackie-Jiang Jackie-Jiang merged commit d953d7c into apache:master Jan 14, 2025
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation feature ingestion real-time release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants