Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check and warn for invalid partition weight #1033

Merged
merged 12 commits into from
Nov 4, 2024
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.62.0] - 2024-10-28
- Check and take configurable action for invalid partition weight

## [29.61.0] - 2024-10-24
- Disable dark traffic dispatching during dark warmup

Expand Down Expand Up @@ -5752,7 +5755,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.61.0...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.62.0...master
[29.62.0]: https://github.com/linkedin/rest.li/compare/v29.61.0...v29.62.0
[29.61.0]: https://github.com/linkedin/rest.li/compare/v29.60.0...v29.61.0
[29.60.0]: https://github.com/linkedin/rest.li/compare/v29.59.0...v29.60.0
[29.59.0]: https://github.com/linkedin/rest.li/compare/v29.58.11...v29.59.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

package com.linkedin.d2.balancer.servers;

import com.google.common.annotations.VisibleForTesting;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.Collections;
Expand All @@ -44,6 +47,8 @@
import com.linkedin.util.ArgumentUtil;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -93,6 +98,23 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
private final AtomicLong _markDownStartAtRef = new AtomicLong(Long.MAX_VALUE);

private volatile Map<Integer, PartitionData> _partitionDataMap;
/**
* If not null, it defines two rules for d2 weight validation:
* 1. The maximum d2 weight allowed.
* 2. The maximum number of decimal places allowed. Use 0s on decimal places to indicate it.
* For example, 100.00 means the max weight allowed is 100 and the max number of decimal places is 2.
* CAUTION: BigDecimal yields accurate scale when constructed with a string of the number, instead of a double/float.
* E.g: new BigDecimal("100.00") instead of new BigDecimal(100.00).
*/
private final BigDecimal _maxWeight;
/**
* The action to take when d2 weight breaches validation rules.
*/
private ActionOnWeightBreach _actionOnWeightBreach = ActionOnWeightBreach.IGNORE;
bohhyang marked this conversation as resolved.
Show resolved Hide resolved

private final AtomicInteger _maxWeightBreachedCount = new AtomicInteger(0);
private final AtomicInteger _weightDecimalPlacesBreachedCount = new AtomicInteger(0);

private volatile Map<String, Object> _uriSpecificProperties;

private ServiceDiscoveryEventEmitter _eventEmitter;
Expand All @@ -102,6 +124,12 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
* it will try to bring up the server again on ZK if the connection goes down, or a new store is set
*/
private boolean _isUp;
/**
* Whether the announcer has completed sending a markup intent. NOTE THAT a mark-up intent sent does NOT mean the
* announcement status on service discovery registry is up. Service discovery registry may further process the host
* and determine its status. Check on service discovery registry for the final status.
*/
private final AtomicBoolean _isMarkUpIntentSent = new AtomicBoolean(false);

// Field to indicate if warm up was started. If it is true, it will try to end the warm up
// by marking down on ZK if the connection goes down
Expand All @@ -123,13 +151,17 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
private volatile boolean _markUpFailed;

// ScheduledExecutorService to schedule the end of dark warm-up, defaults to null
private ScheduledExecutorService _executorService;
private final ScheduledExecutorService _executorService;

// Boolean flag to indicate if dark warm-up is enabled, defaults to false
private boolean _isDarkWarmupEnabled;
private final boolean _isDarkWarmupEnabled;
/**
* Whether the announcer has completed sending a dark warmup cluster markup intent.
*/
private final AtomicBoolean _isDarkWarmupMarkUpIntentSent = new AtomicBoolean(false);

// String to store the name of the dark warm-up cluster, defaults to null
private String _warmupClusterName;
private final String _warmupClusterName;
// Similar as _znodePath and _znodeData above but for the warm up cluster.
private final AtomicReference<String> _warmupClusterZnodePathRef = new AtomicReference<>();
private final AtomicReference<String> _warmupClusterZnodeDataRef = new AtomicReference<>();
Expand All @@ -139,7 +171,18 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
private final AtomicLong _warmupClusterMarkDownStartAtRef = new AtomicLong(Long.MAX_VALUE);

// Field to store the dark warm-up time duration in seconds, defaults to zero
private int _warmupDuration;
private final int _warmupDuration;

public enum ActionOnWeightBreach {
// Ignore and no op.
IGNORE,
bohhyang marked this conversation as resolved.
Show resolved Hide resolved
// only log warnings
WARN,
// throw exceptions
THROW,
// rectify the invalid weight (e.g: cap to the max, round to the nearest valid decimal places)
RECTIFY
}

/**
* @deprecated Use the constructor {@link #ZooKeeperAnnouncer(LoadBalancerServer)} instead.
Expand Down Expand Up @@ -195,26 +238,18 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter)
{
_server = server;
// initialIsUp is used for delay mark up. If it's false, there won't be markup when the announcer is started.
_isUp = initialIsUp;
_isWarmingUp = false;
_isRetryWarmup = false;
_pendingMarkDown = new ArrayDeque<>();
_pendingMarkUp = new ArrayDeque<>();
_pendingWarmupMarkDown = new ArrayDeque<>();

_isDarkWarmupEnabled = isDarkWarmupEnabled;
_warmupClusterName = warmupClusterName;
_warmupDuration = warmupDuration;
_executorService = executorService;
_eventEmitter = eventEmitter;

server.setServiceDiscoveryEventHelper(this);
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService, eventEmitter, null, ActionOnWeightBreach.IGNORE);
}

public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter)
{
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService, eventEmitter, null, ActionOnWeightBreach.IGNORE);
}

public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService,
ServiceDiscoveryEventEmitter eventEmitter, BigDecimal maxWeight, ActionOnWeightBreach actionOnWeightBreach)
{
_server = server;
// initialIsUp is used for delay mark up. If it's false, there won't be markup when the announcer is started.
Expand All @@ -231,6 +266,12 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
_executorService = executorService;
_eventEmitter = eventEmitter;

_maxWeight = maxWeight;
if (actionOnWeightBreach != null)
{
_actionOnWeightBreach = actionOnWeightBreach;
}

if (server instanceof ZooKeeperServer)
{
((ZooKeeperServer) server).setServiceDiscoveryEventHelper(this);
Expand Down Expand Up @@ -354,6 +395,7 @@ public void onError(Throwable e)
@Override
public void onSuccess(None result)
{
_isMarkUpIntentSent.set(true);
emitSDStatusActiveUpdateIntentAndWriteEvents(_cluster, true, true, _markUpStartAtRef.get());
_markUpFailed = false;
_log.info("markUp for uri = {} on cluster {} succeeded.", _uri, _cluster);
Expand Down Expand Up @@ -413,6 +455,7 @@ public void onError(Throwable e)
@Override
public void onSuccess(None result)
{
_isDarkWarmupMarkUpIntentSent.set(false);
emitSDStatusActiveUpdateIntentAndWriteEvents(_warmupClusterName, false, true, _warmupClusterMarkDownStartAtRef.get());
// Mark _isWarmingUp to false to indicate warm up has completed
_isWarmingUp = false;
Expand Down Expand Up @@ -463,6 +506,7 @@ public void onError(Throwable e)
@Override
public void onSuccess(None result)
{
_isDarkWarmupMarkUpIntentSent.set(true);
emitSDStatusActiveUpdateIntentAndWriteEvents(_warmupClusterName, true, true, _warmupClusterMarkUpStartAtRef.get());
_log.info("markUp for uri {} on warm-up cluster {} succeeded", _uri, _warmupClusterName);
// Mark _isWarmingUp to true to indicate warm up is in progress
Expand Down Expand Up @@ -537,6 +581,7 @@ public void onError(Throwable e)
@Override
public void onSuccess(None result)
{
_isMarkUpIntentSent.set(false);
emitSDStatusActiveUpdateIntentAndWriteEvents(_cluster, false, true, _markDownStartAtRef.get());
_log.info("markDown for uri = {} succeeded.", _uri);
// Note that the pending callbacks we see at this point are
Expand Down Expand Up @@ -750,12 +795,12 @@ public void setWeight(double weight)

Map<Integer, PartitionData> partitionDataMap = new HashMap<>(1);
partitionDataMap.put(partitionId, new PartitionData(weight));
_partitionDataMap = Collections.unmodifiableMap(partitionDataMap);
setPartitionData(partitionDataMap);
}

public void setPartitionData(Map<Integer, PartitionData> partitionData)
{
_partitionDataMap = Collections.unmodifiableMap(new HashMap<>(partitionData));
_partitionDataMap = Collections.unmodifiableMap(new HashMap<>(validatePartitionData(partitionData)));
}

public Map<Integer, PartitionData> getPartitionData()
Expand All @@ -774,6 +819,26 @@ public boolean isMarkUpFailed()
return _markUpFailed;
}

public boolean isMarkUpIntentSent()
{
return _isMarkUpIntentSent.get();
}

public boolean isDarkWarmupMarkUpIntentSent()
{
return _isDarkWarmupMarkUpIntentSent.get();
}

public int getMaxWeightBreachedCount()
{
return _maxWeightBreachedCount.get();
}

public int getWeightDecimalPlacesBreachedCount()
{
return _weightDecimalPlacesBreachedCount.get();
}

public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) {
_eventEmitter = emitter;
}
Expand Down Expand Up @@ -833,4 +898,70 @@ private ImmutablePair<String, String> getZnodePathAndData(String cluster) {
public boolean isWarmingUp() {
return _isWarmingUp;
}

@VisibleForTesting
Map<Integer, PartitionData> validatePartitionData(Map<Integer, PartitionData> partitionData) {
Map<Integer, PartitionData> res = new HashMap<>(partitionData); // modifiable copy in case the input is unmodifiable
for (Map.Entry<Integer, PartitionData> entry : res.entrySet()) {
BigDecimal weight = BigDecimal.valueOf(entry.getValue().getWeight());
// check negative weight
if (weight.compareTo(BigDecimal.ZERO) < 0) {
throw new IllegalArgumentException(String.format("Weight %s in Partition %d is negative. Please correct it.",
weight, entry.getKey()));
}

if (_maxWeight == null) {
break;
}

// check max weight
if (weight.compareTo(_maxWeight) > 0) {
_maxWeightBreachedCount.incrementAndGet();
switch (_actionOnWeightBreach) {
case WARN:
_log.warn("", getMaxWeightBreachException(weight, entry.getKey()));
break;
case THROW:
throw getMaxWeightBreachException(weight, entry.getKey());
case RECTIFY:
entry.setValue(new PartitionData(_maxWeight.intValue()));
bohhyang marked this conversation as resolved.
Show resolved Hide resolved
weight = _maxWeight;
_log.warn("Capped weight {} in Partition {} to the max weight allowed: {}.", weight, entry.getKey(),
_maxWeight);
break;
case IGNORE:
default:
break;
}
}

// check decimal places
if (weight.scale() > _maxWeight.scale()) {
_weightDecimalPlacesBreachedCount.incrementAndGet();
switch (_actionOnWeightBreach) {
case WARN: // both WARN and THROW only log the warning. Don't throw exception for decimal places.
case THROW:
_log.warn("", new IllegalArgumentException(String.format("Weight %s in Partition %d has more than %d"
+ " decimal places. It will be rounded in the future.", weight, entry.getKey(), _maxWeight.scale())));
break;
case RECTIFY:
double newWeight = weight.setScale(_maxWeight.scale(), RoundingMode.HALF_UP).doubleValue();
entry.setValue(new PartitionData(newWeight));
bohhyang marked this conversation as resolved.
Show resolved Hide resolved
_log.warn("Rounded weight {} in Partition {} to {} decimal places: {}.", weight, entry.getKey(),
_maxWeight.scale(), newWeight);
break;
case IGNORE:
default:
break;
}
}
}
return res;
}

private IllegalArgumentException getMaxWeightBreachException(BigDecimal weight, int partition) {
return new IllegalArgumentException(String.format("[ACTION NEEDED] Weight %s in Partition %d is greater"
+ " than the max weight allowed: %s. Please correct the weight. It will be force-capped to the max weight "
+ "in the future.", weight, partition, _maxWeight));
}
}
23 changes: 23 additions & 0 deletions d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,27 @@ public Map<Integer, PartitionData> getPartitionData()
public boolean isMarkUpFailed() {
return _announcer.isMarkUpFailed();
}

@Override
public boolean isMarkUpIntentSent()
{
return _announcer.isMarkUpIntentSent();
}

@Override
public boolean isDarkWarmupMarkUpIntentSent() {
return _announcer.isDarkWarmupMarkUpIntentSent();
}

@Override
public int getMaxWeightBreachedCount()
{
return _announcer.getMaxWeightBreachedCount();
}

@Override
public int getWeightDecimalPlacesBreachedCount()
{
return _announcer.getWeightDecimalPlacesBreachedCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,27 @@ void setPartitionDataUsingJson(String partitionDataJson)
void setPartitionData(Map<Integer, PartitionData> partitionData);

boolean isMarkUpFailed();

/**
* @return true if the announcer has completed sending a markup intent. NOTE THAT a mark-up intent sent does NOT mean the
* announcement status on service discovery registry is up. Service discovery registry may further process the host
* and determine its status. Check on service discovery registry for the final status.
*/
boolean isMarkUpIntentSent();

/**
* @return true if the announcer has completed sending a dark warmup cluster markup intent.
*/
boolean isDarkWarmupMarkUpIntentSent();

/**
* @return the times that the max weight has been breached.
*/
int getMaxWeightBreachedCount();

/**
*
* @return the times that the max number of decimal places on weight has been breached.
*/
int getWeightDecimalPlacesBreachedCount();
}
Loading
Loading