Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Emit push job status metrics from controller #1176

Closed

Conversation

m-nagarajan
Copy link
Contributor

@m-nagarajan m-nagarajan commented Sep 14, 2024

Summary

VPJ communicates with the controller to write PushJobDetails to the PUSH_JOB_DETAILS_STORE_NAME system store. This PR introduces new metrics emitted by the parent controller for push job success/failure.

Choice of Stat Type: The original idea is for each event (push job success/failure) to emit 1, which can be aggregated in the metrics receiver in any manners to calculate push job availability.

  • Using OccurrenceRate(SECOND) gives QPS, which needs to be 60x to get the minute’s count. However, with a small number of push jobs and tight SLAs, this might lead to rounding errors.
  • Using OccurrenceRate(MINUTE) and Gauge requires metric collection time to sync with metric fetch time.
  • Count provides an unwindowed cumulative over all time. This was chosen but also needed something windowed for push job availability % calculation in the receiver.
  • So, added CountSinceLastMeasurement via this Tehuti PR to provide a windowed cumulative, emitting the delta every time it’s fetched (every minute in this case) => using tehuti 0.12.2

New Metrics Added (Count and CountSinceLastMeasurement):
batch_push_job_success, batch_push_job_failed_user_error, batch_push_job_failed_non_user_error
incremental_push_job_success, incremental_push_job_failed_user_error, incremental_push_job_failed_non_user_error

Current flow is VPJ checks push.job.status.upload.enable config and sends PushJobDetails to /send_push_job_details path in Venice-controller, which writes it to the push job details system store. Approaches considered to emit metrics:

  1. Derive success/failure in the controller and emit metrics, tying metric emission and push job details upload via push.job.status.upload.enable config. This config is enabled everywhere.
  2. Add a new controller endpoint to emit metrics, duplicating work to talk to controllers and serialize/deserialize data.
  3. Add two new controller endpoints: one to emit metrics if upload is not enabled, and one to do both (emit metrics and upload push status store) to avoid duplicate work.
  4. Evolve PushJobDetails to hold push.job.status.upload.enable config and move the logic of determining success/failure status to VenicePushJob.

Chose approach 1 as it’s the simplest and doesn’t require deployment ordering (controllers -> VPJ) unlike other options and no schema evolution needed.

For Reviewers: Main code changes are in VeniceHelixAdmin.java and PushJobStatusStats.java.

How was this PR tested?

GH CI

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

m-nagarajan and others added 6 commits September 13, 2024 20:04
* [server] Parallel processing for AA/WC workload

This code change introduces a new thread pool to process AA/WC workload,
and essentially, all the AA/WC processing logic will be delegated to
this new thread pool regardless of the consumer threads/pool assignment.
The reason to have such feature:
1. Consumption is not the bottleneck and AA/WC processing logic is cpu
   intensive, so even with one single consumer, the thread pool can be largely
   utilized when there are a lot of AA/WC messages from this single consumer.
2. In prod, we have seen a few scenarios, which have long-tail partitions, which
   would slow down inc push or rebalance significantly.

This code change introduces two new configs:
server.aa.wc.workload.parallel.processing.enabled: default false
server.aa.wc.workload.parallel.processing.thread.pool.size: default 8

Here is how the workflow is changed to adopt the new parallel processing pattern:
1. Consumer threads will send a list of messages belonging to the same partition
   to `ConsumptionTask`, which is an existing behavior.
2. When `StoreIngestionTask` (extending `ConsumptionTask`) receives a batch of messages,
   it would check whether all the messages are AA/WC messages and if yes, it will
   delegate them batch by batch to the new processing thread pool when this feature is enabled.
   For each batch, here are the steps:
   a. Lock the keys in the batch.
   b. Execute the AA/WC logic, and for the updates to the same key, they will be executed sequentially.
   c. Update the transient cache.
   d. Return a new compatible format: `PubSubMessageProcessedResultWrapper`, which includes
      both the original message and the processed result.
3. Continue the existing logic to function: `delegateConsumerRecord`.
4. When processing each message, if the message carries the processed result, use it, otherwise,
   continue the processing synchronously.

There is another change, which will do DIV validation/filtering before step 2, which is used to
filter out duplicate messages as early as possible to reduce the unnecessary compute.

This PR also fixes a race condition in changelog view-writer and the fix is simple, and
Venice Server will update the transient cache before writing to the changelog view topic,
so that when next message tries to lookup the same key, it will get the latest result even
the writing to the version topic is still pending.

* Fixed unit test coverage

* Addressed the review comments

* Fixed rebase issue

* Fixed rebase issue
…inkedin#1178)

If schema repo returns invalid schema id, that fails client requests. This PR adds safeguard against such invalid schemas in meta data handler and client side schema reader.
---------
Co-authored-by: Sourav Maji <tester@linkedin.com>
…1175)

When a store is completely migrated, but there is still some client that didn't have queries at time of migration, and hence didn't migrate to the new cluster. We fixed this issue in linkedin#843, but removed the logic to check the store existence from the ACL handler with the expectation that the next layers would redirect the requests for migrated store.

The interface of the ACL handler doesn't specify how to handle the case of a missing store. So, it is possible that the ACL handler could either:
1. Return "false" to the "hasAccess" call
2. Throw an "AclException"

Only in the second case would the request be forwarded to the next handler. This commit adds the store existence check before checking ACLs and hence, can forward the request to the next handler so that the client can be notified of the store move.

This commit unifies common store ACL handler logic into "AbstractStoreAclHandler" and splits router-specific and server-specific logic into their independent implementations: "RouterStoreAclHandler" and "ServerStoreAclHandler"
Added a valueOf function in EnumUtils to reduce code duplication and
standardize the error message for badly handled enums.

Also added unit tests for the mappings of all VeniceEnumValue.
@m-nagarajan
Copy link
Contributor Author

Messed up the PR with updates. Will open a new one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants