-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[controller] Emit push job status metrics from controller #1176
Closed
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
* [server] Parallel processing for AA/WC workload This code change introduces a new thread pool to process AA/WC workload, and essentially, all the AA/WC processing logic will be delegated to this new thread pool regardless of the consumer threads/pool assignment. The reason to have such feature: 1. Consumption is not the bottleneck and AA/WC processing logic is cpu intensive, so even with one single consumer, the thread pool can be largely utilized when there are a lot of AA/WC messages from this single consumer. 2. In prod, we have seen a few scenarios, which have long-tail partitions, which would slow down inc push or rebalance significantly. This code change introduces two new configs: server.aa.wc.workload.parallel.processing.enabled: default false server.aa.wc.workload.parallel.processing.thread.pool.size: default 8 Here is how the workflow is changed to adopt the new parallel processing pattern: 1. Consumer threads will send a list of messages belonging to the same partition to `ConsumptionTask`, which is an existing behavior. 2. When `StoreIngestionTask` (extending `ConsumptionTask`) receives a batch of messages, it would check whether all the messages are AA/WC messages and if yes, it will delegate them batch by batch to the new processing thread pool when this feature is enabled. For each batch, here are the steps: a. Lock the keys in the batch. b. Execute the AA/WC logic, and for the updates to the same key, they will be executed sequentially. c. Update the transient cache. d. Return a new compatible format: `PubSubMessageProcessedResultWrapper`, which includes both the original message and the processed result. 3. Continue the existing logic to function: `delegateConsumerRecord`. 4. When processing each message, if the message carries the processed result, use it, otherwise, continue the processing synchronously. There is another change, which will do DIV validation/filtering before step 2, which is used to filter out duplicate messages as early as possible to reduce the unnecessary compute. This PR also fixes a race condition in changelog view-writer and the fix is simple, and Venice Server will update the transient cache before writing to the changelog view topic, so that when next message tries to lookup the same key, it will get the latest result even the writing to the version topic is still pending. * Fixed unit test coverage * Addressed the review comments * Fixed rebase issue * Fixed rebase issue
…inkedin#1178) If schema repo returns invalid schema id, that fails client requests. This PR adds safeguard against such invalid schemas in meta data handler and client side schema reader. --------- Co-authored-by: Sourav Maji <tester@linkedin.com>
… version is invalid (linkedin#1181)
…1175) When a store is completely migrated, but there is still some client that didn't have queries at time of migration, and hence didn't migrate to the new cluster. We fixed this issue in linkedin#843, but removed the logic to check the store existence from the ACL handler with the expectation that the next layers would redirect the requests for migrated store. The interface of the ACL handler doesn't specify how to handle the case of a missing store. So, it is possible that the ACL handler could either: 1. Return "false" to the "hasAccess" call 2. Throw an "AclException" Only in the second case would the request be forwarded to the next handler. This commit adds the store existence check before checking ACLs and hence, can forward the request to the next handler so that the client can be notified of the store move. This commit unifies common store ACL handler logic into "AbstractStoreAclHandler" and splits router-specific and server-specific logic into their independent implementations: "RouterStoreAclHandler" and "ServerStoreAclHandler"
Added a valueOf function in EnumUtils to reduce code duplication and standardize the error message for badly handled enums. Also added unit tests for the mappings of all VeniceEnumValue.
Messed up the PR with updates. Will open a new one. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
VPJ
communicates with the controller to writePushJobDetails
to thePUSH_JOB_DETAILS_STORE_NAME
system store. This PR introduces new metrics emitted by the parent controller for push job success/failure.Choice of Stat Type: The original idea is for each event (push job success/failure) to emit 1, which can be aggregated in the metrics receiver in any manners to calculate push job availability.
OccurrenceRate(SECOND)
gives QPS, which needs to be 60x to get the minute’s count. However, with a small number of push jobs and tight SLAs, this might lead to rounding errors.OccurrenceRate(MINUTE)
andGauge
requires metric collection time to sync with metric fetch time.Count
provides an unwindowed cumulative over all time. This was chosen but also needed something windowed for push job availability % calculation in the receiver.CountSinceLastMeasurement
via this Tehuti PR to provide a windowed cumulative, emitting the delta every time it’s fetched (every minute in this case) => using tehuti 0.12.2New Metrics Added (
Count
andCountSinceLastMeasurement
):batch_push_job_success
,batch_push_job_failed_user_error
,batch_push_job_failed_non_user_error
incremental_push_job_success
,incremental_push_job_failed_user_error
,incremental_push_job_failed_non_user_error
Current flow is
VPJ
checkspush.job.status.upload.enable
config and sendsPushJobDetails
to/send_push_job_details
path inVenice-controller
, which writes it to the push job details system store. Approaches considered to emit metrics:push.job.status.upload.enable
config. This config is enabled everywhere.PushJobDetails
to holdpush.job.status.upload.enable
config and move the logic of determining success/failure status toVenicePushJob
.Chose approach 1 as it’s the simplest and doesn’t require deployment ordering (controllers -> VPJ) unlike other options and no schema evolution needed.
For Reviewers: Main code changes are in
VeniceHelixAdmin.java
andPushJobStatusStats.java
.How was this PR tested?
GH CI
Does this PR introduce any user-facing changes?