diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 575044cfdf..a60aac54c1 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -42,13 +42,19 @@ public long hash(byte[] key) { } private PegasusTable getTable(String tableName) throws PException { + return getTable(tableName, 0); + } + + private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws PException { PegasusTable table = tableMap.get(tableName); if (table == null) { synchronized (tableMapLock) { table = tableMap.get(tableName); if (table == null) { try { - table = new PegasusTable(this, cluster.openTable(tableName, new PegasusHasher())); + table = + new PegasusTable( + this, cluster.openTable(tableName, new PegasusHasher(), backupRequestDelayMs)); } catch (Throwable e) { throw new PException(e); } @@ -194,6 +200,12 @@ public PegasusTableInterface openTable(String tableName) throws PException { return getTable(tableName); } + @Override + public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) + throws PException { + return getTable(tableName, backupRequestDelayMs); + } + @Override public Properties getConfiguration() { return config; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index 84166c6440..1df0853485 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -41,6 +41,29 @@ public interface PegasusClientInterface { */ public PegasusTableInterface openTable(String tableName) throws PException; + /** + * Open a table, and prepare the sessions and route-table to the replica-servers. + * + *

Please notice that pegasus support two kinds of API: 1. the client-interface way, which is + * provided in this class. 2. the table-interface way, which is provided by {@link + * PegasusTableInterface}. With the client-interface, you don't need to create + * PegasusTableInterface by openTable, so you can access the pegasus cluster conveniently. + * However, the client-interface's api also has some restrictions: 1. we don't provide async + * methods in client-interface. 2. the timeout in client-interface isn't as accurate as the + * table-interface. 3. the client-interface may throw an exception when open table fails. It means + * that you may need to handle this exception in every data access operation, which is annoying. + * 4. You can't specify a per-operation timeout. So we recommend you to use the table-interface. + * + * @param tableName the table should be exist on the server, which is created before by the system + * administrator + * @param backupRequestDelayMs the delay time to send backup request. If backupRequestDelayMs <= + * 0, The backup request is disabled. + * @return the table handler + * @throws PException throws exception if any error occurs. + */ + public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) + throws PException; + /** * Check value exist by key from the cluster * diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 1f8ffaed0a..861458a6e3 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1761,7 +1761,9 @@ public void handleReplicaException( String replicaServer; try { replicaServer = - replicaConfiguration.primary.get_ip() + ":" + replicaConfiguration.primary.get_port(); + replicaConfiguration.primaryAddress.get_ip() + + ":" + + replicaConfiguration.primaryAddress.get_port(); } catch (UnknownHostException e) { promise.setFailure(new PException(e)); return; diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java index c325d3c3d6..c1b0546dfe 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java @@ -10,7 +10,7 @@ import com.xiaomi.infra.pegasus.thrift.TException; public abstract class client_operator { - public client_operator(gpid gpid, String tableName) { + public client_operator(gpid gpid, String tableName, boolean enableBackupRequest) { this.header = new ThriftHeader(); this.meta = new request_meta(); this.meta.setApp_id(gpid.get_app_id()); @@ -18,10 +18,17 @@ public client_operator(gpid gpid, String tableName) { this.pid = gpid; this.tableName = tableName; this.rpc_error = new error_code(); + this.enableBackupRequest = enableBackupRequest; + } + + public client_operator( + gpid gpid, String tableName, long partitionHash, boolean enableBackupRequest) { + this(gpid, tableName, enableBackupRequest); + this.meta.setPartition_hash(partitionHash); } public client_operator(gpid gpid, String tableName, long partitionHash) { - this(gpid, tableName); + this(gpid, tableName, false); this.meta.setPartition_hash(partitionHash); } @@ -32,9 +39,12 @@ public final byte[] prepare_thrift_header(int meta_length, int body_length) { } public final void prepare_thrift_meta( - com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot, int client_timeout) + com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot, + int client_timeout, + boolean isBackupRequest) throws TException { this.meta.setClient_timeout(client_timeout); + this.meta.setIs_backup_request(isBackupRequest); this.meta.write(oprot); } @@ -51,6 +61,7 @@ public String getQPSCounter() { mark = "fail"; break; } + // pegasus.client.put.succ.qps return new StringBuilder() .append("pegasus.client.") @@ -89,4 +100,5 @@ public abstract void recv_data(com.xiaomi.infra.pegasus.thrift.protocol.TProtoco public gpid pid; public String tableName; // only for metrics public error_code rpc_error; + public boolean enableBackupRequest; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_get_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_get_operator.java index a99c0b9b99..f795bdfcbf 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_get_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_get_operator.java @@ -14,7 +14,7 @@ public class rrdb_get_operator extends client_operator { public rrdb_get_operator( com.xiaomi.infra.pegasus.base.gpid gpid, String tableName, blob request, long partitionHash) { - super(gpid, tableName, partitionHash); + super(gpid, tableName, partitionHash, true); this.request = request; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_multi_get_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_multi_get_operator.java index 859d46f315..2a5d23a815 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_multi_get_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_multi_get_operator.java @@ -16,7 +16,7 @@ public rrdb_multi_get_operator( String tableName, multi_get_request request, long partitionHash) { - super(gpid, tableName, partitionHash); + super(gpid, tableName, partitionHash, true); this.request = request; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_ttl_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_ttl_operator.java index 480267c767..b4d9fc43b9 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_ttl_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_ttl_operator.java @@ -13,7 +13,7 @@ public class rrdb_ttl_operator extends client_operator { public rrdb_ttl_operator( com.xiaomi.infra.pegasus.base.gpid gpid, String tableName, blob request, long partitionHash) { - super(gpid, tableName, partitionHash); + super(gpid, tableName, partitionHash, true); this.request = request; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/replication/request_meta.java b/src/main/java/com/xiaomi/infra/pegasus/replication/request_meta.java index 6513ebc4e9..1d4e68ffc7 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/replication/request_meta.java +++ b/src/main/java/com/xiaomi/infra/pegasus/replication/request_meta.java @@ -1,5 +1,5 @@ /** - * Autogenerated by Thrift + * Autogenerated by Thrift Compiler (0.11.0) * *

DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @@ -7,66 +7,62 @@ */ package com.xiaomi.infra.pegasus.replication; -import com.xiaomi.infra.pegasus.thrift.*; -import com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData; -import com.xiaomi.infra.pegasus.thrift.meta_data.FieldValueMetaData; -import com.xiaomi.infra.pegasus.thrift.protocol.*; -import com.xiaomi.infra.pegasus.thrift.scheme.IScheme; -import com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory; -import com.xiaomi.infra.pegasus.thrift.scheme.StandardScheme; -import com.xiaomi.infra.pegasus.thrift.scheme.TupleScheme; -import com.xiaomi.infra.pegasus.thrift.transport.TIOStreamTransport; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.Collections; -import java.util.EnumMap; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) +@javax.annotation.Generated( + value = "Autogenerated by Thrift Compiler (0.11.0)", + date = "2020-01-07") public class request_meta - implements TBase, + implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { - private static final TStruct STRUCT_DESC = new TStruct("meta"); - - private static final TField APP_ID_FIELD_DESC = new TField("app_id", TType.I32, (short) 1); - private static final TField PARTITION_INDEX_FIELD_DESC = - new TField("partition_index", TType.I32, (short) 2); - private static final TField CLIENT_TIMEOUT_FIELD_DESC = - new TField("client_timeout", TType.I32, (short) 3); - private static final TField PARTITION_HASH_FIELD_DESC = - new TField("partition_hash", TType.I64, (short) 4); - - private static final Map, SchemeFactory> schemes = - new HashMap, SchemeFactory>(); - - static { - schemes.put(StandardScheme.class, new metaStandardSchemeFactory()); - schemes.put(TupleScheme.class, new metaTupleSchemeFactory()); - } + private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = + new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("request_meta"); + + private static final com.xiaomi.infra.pegasus.thrift.protocol.TField APP_ID_FIELD_DESC = + new com.xiaomi.infra.pegasus.thrift.protocol.TField( + "app_id", com.xiaomi.infra.pegasus.thrift.protocol.TType.I32, (short) 1); + private static final com.xiaomi.infra.pegasus.thrift.protocol.TField PARTITION_INDEX_FIELD_DESC = + new com.xiaomi.infra.pegasus.thrift.protocol.TField( + "partition_index", com.xiaomi.infra.pegasus.thrift.protocol.TType.I32, (short) 2); + private static final com.xiaomi.infra.pegasus.thrift.protocol.TField CLIENT_TIMEOUT_FIELD_DESC = + new com.xiaomi.infra.pegasus.thrift.protocol.TField( + "client_timeout", com.xiaomi.infra.pegasus.thrift.protocol.TType.I32, (short) 3); + private static final com.xiaomi.infra.pegasus.thrift.protocol.TField PARTITION_HASH_FIELD_DESC = + new com.xiaomi.infra.pegasus.thrift.protocol.TField( + "partition_hash", com.xiaomi.infra.pegasus.thrift.protocol.TType.I64, (short) 4); + private static final com.xiaomi.infra.pegasus.thrift.protocol.TField + IS_BACKUP_REQUEST_FIELD_DESC = + new com.xiaomi.infra.pegasus.thrift.protocol.TField( + "is_backup_request", com.xiaomi.infra.pegasus.thrift.protocol.TType.BOOL, (short) 5); + + private static final com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory + STANDARD_SCHEME_FACTORY = new request_metaStandardSchemeFactory(); + private static final com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = + new request_metaTupleSchemeFactory(); public int app_id; // required public int partition_index; // required public int client_timeout; // required public long partition_hash; // required + public boolean is_backup_request; // required /** * The set of fields this struct contains, along with convenience methods for finding and * manipulating them. */ - public enum _Fields implements TFieldIdEnum { + public enum _Fields implements com.xiaomi.infra.pegasus.thrift.TFieldIdEnum { APP_ID((short) 1, "app_id"), PARTITION_INDEX((short) 2, "partition_index"), CLIENT_TIMEOUT((short) 3, "client_timeout"), - PARTITION_HASH((short) 4, "partition_hash"); + PARTITION_HASH((short) 4, "partition_hash"), + IS_BACKUP_REQUEST((short) 5, "is_backup_request"); - private static final Map byName = new HashMap(); + private static final java.util.Map byName = + new java.util.HashMap(); static { - for (_Fields field : EnumSet.allOf(_Fields.class)) { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { byName.put(field.getFieldName(), field); } } @@ -82,6 +78,8 @@ public static _Fields findByThriftId(int fieldId) { return CLIENT_TIMEOUT; case 4: // PARTITION_HASH return PARTITION_HASH; + case 5: // IS_BACKUP_REQUEST + return IS_BACKUP_REQUEST; default: return null; } @@ -91,19 +89,19 @@ public static _Fields findByThriftId(int fieldId) { public static _Fields findByThriftIdOrThrow(int fieldId) { _Fields fields = findByThriftId(fieldId); if (fields == null) - throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); return fields; } /** Find the _Fields constant that matches name, or null if its not found. */ - public static _Fields findByName(String name) { + public static _Fields findByName(java.lang.String name) { return byName.get(name); } private final short _thriftId; - private final String _fieldName; + private final java.lang.String _fieldName; - _Fields(short thriftId, String fieldName) { + _Fields(short thriftId, java.lang.String fieldName) { _thriftId = thriftId; _fieldName = fieldName; } @@ -112,7 +110,7 @@ public short getThriftFieldId() { return _thriftId; } - public String getFieldName() { + public java.lang.String getFieldName() { return _fieldName; } } @@ -122,34 +120,64 @@ public String getFieldName() { private static final int __PARTITION_INDEX_ISSET_ID = 1; private static final int __CLIENT_TIMEOUT_ISSET_ID = 2; private static final int __PARTITION_HASH_ISSET_ID = 3; + private static final int __IS_BACKUP_REQUEST_ISSET_ID = 4; private byte __isset_bitfield = 0; - public static final Map<_Fields, FieldMetaData> metaDataMap; + public static final java.util.Map< + _Fields, com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData> + metaDataMap; static { - Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class); + java.util.Map<_Fields, com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData> tmpMap = + new java.util.EnumMap<_Fields, com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData>( + _Fields.class); tmpMap.put( _Fields.APP_ID, - new FieldMetaData( - "app_id", TFieldRequirementType.DEFAULT, new FieldValueMetaData(TType.I32))); + new com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData( + "app_id", + com.xiaomi.infra.pegasus.thrift.TFieldRequirementType.DEFAULT, + new com.xiaomi.infra.pegasus.thrift.meta_data.FieldValueMetaData( + com.xiaomi.infra.pegasus.thrift.protocol.TType.I32))); tmpMap.put( _Fields.PARTITION_INDEX, - new FieldMetaData( - "partition_index", TFieldRequirementType.DEFAULT, new FieldValueMetaData(TType.I32))); + new com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData( + "partition_index", + com.xiaomi.infra.pegasus.thrift.TFieldRequirementType.DEFAULT, + new com.xiaomi.infra.pegasus.thrift.meta_data.FieldValueMetaData( + com.xiaomi.infra.pegasus.thrift.protocol.TType.I32))); tmpMap.put( _Fields.CLIENT_TIMEOUT, - new FieldMetaData( - "client_timeout", TFieldRequirementType.DEFAULT, new FieldValueMetaData(TType.I32))); + new com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData( + "client_timeout", + com.xiaomi.infra.pegasus.thrift.TFieldRequirementType.DEFAULT, + new com.xiaomi.infra.pegasus.thrift.meta_data.FieldValueMetaData( + com.xiaomi.infra.pegasus.thrift.protocol.TType.I32))); tmpMap.put( _Fields.PARTITION_HASH, - new FieldMetaData( - "partition_hash", TFieldRequirementType.DEFAULT, new FieldValueMetaData(TType.I64))); - metaDataMap = Collections.unmodifiableMap(tmpMap); - FieldMetaData.addStructMetaDataMap(request_meta.class, metaDataMap); + new com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData( + "partition_hash", + com.xiaomi.infra.pegasus.thrift.TFieldRequirementType.DEFAULT, + new com.xiaomi.infra.pegasus.thrift.meta_data.FieldValueMetaData( + com.xiaomi.infra.pegasus.thrift.protocol.TType.I64))); + tmpMap.put( + _Fields.IS_BACKUP_REQUEST, + new com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData( + "is_backup_request", + com.xiaomi.infra.pegasus.thrift.TFieldRequirementType.DEFAULT, + new com.xiaomi.infra.pegasus.thrift.meta_data.FieldValueMetaData( + com.xiaomi.infra.pegasus.thrift.protocol.TType.BOOL))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData.addStructMetaDataMap( + request_meta.class, metaDataMap); } public request_meta() {} - public request_meta(int app_id, int partition_index, int client_timeout, long partition_hash) { + public request_meta( + int app_id, + int partition_index, + int client_timeout, + long partition_hash, + boolean is_backup_request) { this(); this.app_id = app_id; setApp_idIsSet(true); @@ -159,6 +187,8 @@ public request_meta(int app_id, int partition_index, int client_timeout, long pa setClient_timeoutIsSet(true); this.partition_hash = partition_hash; setPartition_hashIsSet(true); + this.is_backup_request = is_backup_request; + setIs_backup_requestIsSet(true); } /** Performs a deep copy on other. */ @@ -168,6 +198,7 @@ public request_meta(request_meta other) { this.partition_index = other.partition_index; this.client_timeout = other.client_timeout; this.partition_hash = other.partition_hash; + this.is_backup_request = other.is_backup_request; } public request_meta deepCopy() { @@ -184,6 +215,8 @@ public void clear() { this.client_timeout = 0; setPartition_hashIsSet(false); this.partition_hash = 0; + setIs_backup_requestIsSet(false); + this.is_backup_request = false; } public int getApp_id() { @@ -197,16 +230,20 @@ public request_meta setApp_id(int app_id) { } public void unsetApp_id() { - __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __APP_ID_ISSET_ID); + __isset_bitfield = + com.xiaomi.infra.pegasus.thrift.EncodingUtils.clearBit(__isset_bitfield, __APP_ID_ISSET_ID); } /** Returns true if field app_id is set (has been assigned a value) and false otherwise */ public boolean isSetApp_id() { - return EncodingUtils.testBit(__isset_bitfield, __APP_ID_ISSET_ID); + return com.xiaomi.infra.pegasus.thrift.EncodingUtils.testBit( + __isset_bitfield, __APP_ID_ISSET_ID); } public void setApp_idIsSet(boolean value) { - __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __APP_ID_ISSET_ID, value); + __isset_bitfield = + com.xiaomi.infra.pegasus.thrift.EncodingUtils.setBit( + __isset_bitfield, __APP_ID_ISSET_ID, value); } public int getPartition_index() { @@ -220,18 +257,23 @@ public request_meta setPartition_index(int partition_index) { } public void unsetPartition_index() { - __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PARTITION_INDEX_ISSET_ID); + __isset_bitfield = + com.xiaomi.infra.pegasus.thrift.EncodingUtils.clearBit( + __isset_bitfield, __PARTITION_INDEX_ISSET_ID); } /** * Returns true if field partition_index is set (has been assigned a value) and false otherwise */ public boolean isSetPartition_index() { - return EncodingUtils.testBit(__isset_bitfield, __PARTITION_INDEX_ISSET_ID); + return com.xiaomi.infra.pegasus.thrift.EncodingUtils.testBit( + __isset_bitfield, __PARTITION_INDEX_ISSET_ID); } public void setPartition_indexIsSet(boolean value) { - __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PARTITION_INDEX_ISSET_ID, value); + __isset_bitfield = + com.xiaomi.infra.pegasus.thrift.EncodingUtils.setBit( + __isset_bitfield, __PARTITION_INDEX_ISSET_ID, value); } public int getClient_timeout() { @@ -245,16 +287,21 @@ public request_meta setClient_timeout(int client_timeout) { } public void unsetClient_timeout() { - __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __CLIENT_TIMEOUT_ISSET_ID); + __isset_bitfield = + com.xiaomi.infra.pegasus.thrift.EncodingUtils.clearBit( + __isset_bitfield, __CLIENT_TIMEOUT_ISSET_ID); } /** Returns true if field client_timeout is set (has been assigned a value) and false otherwise */ public boolean isSetClient_timeout() { - return EncodingUtils.testBit(__isset_bitfield, __CLIENT_TIMEOUT_ISSET_ID); + return com.xiaomi.infra.pegasus.thrift.EncodingUtils.testBit( + __isset_bitfield, __CLIENT_TIMEOUT_ISSET_ID); } public void setClient_timeoutIsSet(boolean value) { - __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __CLIENT_TIMEOUT_ISSET_ID, value); + __isset_bitfield = + com.xiaomi.infra.pegasus.thrift.EncodingUtils.setBit( + __isset_bitfield, __CLIENT_TIMEOUT_ISSET_ID, value); } public long getPartition_hash() { @@ -268,25 +315,60 @@ public request_meta setPartition_hash(long partition_hash) { } public void unsetPartition_hash() { - __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PARTITION_HASH_ISSET_ID); + __isset_bitfield = + com.xiaomi.infra.pegasus.thrift.EncodingUtils.clearBit( + __isset_bitfield, __PARTITION_HASH_ISSET_ID); } /** Returns true if field partition_hash is set (has been assigned a value) and false otherwise */ public boolean isSetPartition_hash() { - return EncodingUtils.testBit(__isset_bitfield, __PARTITION_HASH_ISSET_ID); + return com.xiaomi.infra.pegasus.thrift.EncodingUtils.testBit( + __isset_bitfield, __PARTITION_HASH_ISSET_ID); } public void setPartition_hashIsSet(boolean value) { - __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PARTITION_HASH_ISSET_ID, value); + __isset_bitfield = + com.xiaomi.infra.pegasus.thrift.EncodingUtils.setBit( + __isset_bitfield, __PARTITION_HASH_ISSET_ID, value); } - public void setFieldValue(_Fields field, Object value) { + public boolean isIs_backup_request() { + return this.is_backup_request; + } + + public request_meta setIs_backup_request(boolean is_backup_request) { + this.is_backup_request = is_backup_request; + setIs_backup_requestIsSet(true); + return this; + } + + public void unsetIs_backup_request() { + __isset_bitfield = + com.xiaomi.infra.pegasus.thrift.EncodingUtils.clearBit( + __isset_bitfield, __IS_BACKUP_REQUEST_ISSET_ID); + } + + /** + * Returns true if field is_backup_request is set (has been assigned a value) and false otherwise + */ + public boolean isSetIs_backup_request() { + return com.xiaomi.infra.pegasus.thrift.EncodingUtils.testBit( + __isset_bitfield, __IS_BACKUP_REQUEST_ISSET_ID); + } + + public void setIs_backup_requestIsSet(boolean value) { + __isset_bitfield = + com.xiaomi.infra.pegasus.thrift.EncodingUtils.setBit( + __isset_bitfield, __IS_BACKUP_REQUEST_ISSET_ID, value); + } + + public void setFieldValue(_Fields field, java.lang.Object value) { switch (field) { case APP_ID: if (value == null) { unsetApp_id(); } else { - setApp_id((Integer) value); + setApp_id((java.lang.Integer) value); } break; @@ -294,7 +376,7 @@ public void setFieldValue(_Fields field, Object value) { if (value == null) { unsetPartition_index(); } else { - setPartition_index((Integer) value); + setPartition_index((java.lang.Integer) value); } break; @@ -302,7 +384,7 @@ public void setFieldValue(_Fields field, Object value) { if (value == null) { unsetClient_timeout(); } else { - setClient_timeout((Integer) value); + setClient_timeout((java.lang.Integer) value); } break; @@ -310,13 +392,21 @@ public void setFieldValue(_Fields field, Object value) { if (value == null) { unsetPartition_hash(); } else { - setPartition_hash((Long) value); + setPartition_hash((java.lang.Long) value); + } + break; + + case IS_BACKUP_REQUEST: + if (value == null) { + unsetIs_backup_request(); + } else { + setIs_backup_request((java.lang.Boolean) value); } break; } } - public Object getFieldValue(_Fields field) { + public java.lang.Object getFieldValue(_Fields field) { switch (field) { case APP_ID: return getApp_id(); @@ -329,8 +419,11 @@ public Object getFieldValue(_Fields field) { case PARTITION_HASH: return getPartition_hash(); + + case IS_BACKUP_REQUEST: + return isIs_backup_request(); } - throw new IllegalStateException(); + throw new java.lang.IllegalStateException(); } /** @@ -339,7 +432,7 @@ public Object getFieldValue(_Fields field) { */ public boolean isSet(_Fields field) { if (field == null) { - throw new IllegalArgumentException(); + throw new java.lang.IllegalArgumentException(); } switch (field) { @@ -351,12 +444,14 @@ public boolean isSet(_Fields field) { return isSetClient_timeout(); case PARTITION_HASH: return isSetPartition_hash(); + case IS_BACKUP_REQUEST: + return isSetIs_backup_request(); } - throw new IllegalStateException(); + throw new java.lang.IllegalStateException(); } @Override - public boolean equals(Object that) { + public boolean equals(java.lang.Object that) { if (that == null) return false; if (that instanceof request_meta) return this.equals((request_meta) that); return false; @@ -364,6 +459,7 @@ public boolean equals(Object that) { public boolean equals(request_meta that) { if (that == null) return false; + if (this == that) return true; boolean this_present_app_id = true; boolean that_present_app_id = true; @@ -393,30 +489,32 @@ public boolean equals(request_meta that) { if (this.partition_hash != that.partition_hash) return false; } + boolean this_present_is_backup_request = true; + boolean that_present_is_backup_request = true; + if (this_present_is_backup_request || that_present_is_backup_request) { + if (!(this_present_is_backup_request && that_present_is_backup_request)) return false; + if (this.is_backup_request != that.is_backup_request) return false; + } + return true; } @Override public int hashCode() { - List list = new ArrayList(); + int hashCode = 1; + + hashCode = hashCode * 8191 + app_id; - boolean present_app_id = true; - list.add(present_app_id); - if (present_app_id) list.add(app_id); + hashCode = hashCode * 8191 + partition_index; - boolean present_partition_index = true; - list.add(present_partition_index); - if (present_partition_index) list.add(partition_index); + hashCode = hashCode * 8191 + client_timeout; - boolean present_client_timeout = true; - list.add(present_client_timeout); - if (present_client_timeout) list.add(client_timeout); + hashCode = + hashCode * 8191 + com.xiaomi.infra.pegasus.thrift.TBaseHelper.hashCode(partition_hash); - boolean present_partition_hash = true; - list.add(present_partition_hash); - if (present_partition_hash) list.add(partition_hash); + hashCode = hashCode * 8191 + ((is_backup_request) ? 131071 : 524287); - return list.hashCode(); + return hashCode; } @Override @@ -427,43 +525,66 @@ public int compareTo(request_meta other) { int lastComparison = 0; - lastComparison = Boolean.valueOf(isSetApp_id()).compareTo(other.isSetApp_id()); + lastComparison = java.lang.Boolean.valueOf(isSetApp_id()).compareTo(other.isSetApp_id()); if (lastComparison != 0) { return lastComparison; } if (isSetApp_id()) { - lastComparison = TBaseHelper.compareTo(this.app_id, other.app_id); + lastComparison = + com.xiaomi.infra.pegasus.thrift.TBaseHelper.compareTo(this.app_id, other.app_id); if (lastComparison != 0) { return lastComparison; } } lastComparison = - Boolean.valueOf(isSetPartition_index()).compareTo(other.isSetPartition_index()); + java.lang.Boolean.valueOf(isSetPartition_index()).compareTo(other.isSetPartition_index()); if (lastComparison != 0) { return lastComparison; } if (isSetPartition_index()) { - lastComparison = TBaseHelper.compareTo(this.partition_index, other.partition_index); + lastComparison = + com.xiaomi.infra.pegasus.thrift.TBaseHelper.compareTo( + this.partition_index, other.partition_index); if (lastComparison != 0) { return lastComparison; } } - lastComparison = Boolean.valueOf(isSetClient_timeout()).compareTo(other.isSetClient_timeout()); + lastComparison = + java.lang.Boolean.valueOf(isSetClient_timeout()).compareTo(other.isSetClient_timeout()); if (lastComparison != 0) { return lastComparison; } if (isSetClient_timeout()) { - lastComparison = TBaseHelper.compareTo(this.client_timeout, other.client_timeout); + lastComparison = + com.xiaomi.infra.pegasus.thrift.TBaseHelper.compareTo( + this.client_timeout, other.client_timeout); if (lastComparison != 0) { return lastComparison; } } - lastComparison = Boolean.valueOf(isSetPartition_hash()).compareTo(other.isSetPartition_hash()); + lastComparison = + java.lang.Boolean.valueOf(isSetPartition_hash()).compareTo(other.isSetPartition_hash()); if (lastComparison != 0) { return lastComparison; } if (isSetPartition_hash()) { - lastComparison = TBaseHelper.compareTo(this.partition_hash, other.partition_hash); + lastComparison = + com.xiaomi.infra.pegasus.thrift.TBaseHelper.compareTo( + this.partition_hash, other.partition_hash); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = + java.lang.Boolean.valueOf(isSetIs_backup_request()) + .compareTo(other.isSetIs_backup_request()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetIs_backup_request()) { + lastComparison = + com.xiaomi.infra.pegasus.thrift.TBaseHelper.compareTo( + this.is_backup_request, other.is_backup_request); if (lastComparison != 0) { return lastComparison; } @@ -475,17 +596,19 @@ public _Fields fieldForId(int fieldId) { return _Fields.findByThriftId(fieldId); } - public void read(TProtocol iprot) throws TException { - schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + public void read(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol iprot) + throws com.xiaomi.infra.pegasus.thrift.TException { + scheme(iprot).read(iprot, this); } - public void write(TProtocol oprot) throws TException { - schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + public void write(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot) + throws com.xiaomi.infra.pegasus.thrift.TException { + scheme(oprot).write(oprot, this); } @Override - public String toString() { - StringBuilder sb = new StringBuilder("meta("); + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("request_meta("); boolean first = true; sb.append("app_id:"); @@ -503,86 +626,105 @@ public String toString() { sb.append("partition_hash:"); sb.append(this.partition_hash); first = false; + if (!first) sb.append(", "); + sb.append("is_backup_request:"); + sb.append(this.is_backup_request); + first = false; sb.append(")"); return sb.toString(); } - public void validate() throws TException { + public void validate() throws com.xiaomi.infra.pegasus.thrift.TException { // check for required fields // check for sub-struct validity } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { try { - write(new TCompactProtocol(new TIOStreamTransport(out))); - } catch (TException te) { + write( + new com.xiaomi.infra.pegasus.thrift.protocol.TCompactProtocol( + new com.xiaomi.infra.pegasus.thrift.transport.TIOStreamTransport(out))); + } catch (com.xiaomi.infra.pegasus.thrift.TException te) { throw new java.io.IOException(te); } } private void readObject(java.io.ObjectInputStream in) - throws java.io.IOException, ClassNotFoundException { + throws java.io.IOException, java.lang.ClassNotFoundException { try { // it doesn't seem like you should have to do this, but java serialization is wacky, and // doesn't call the default constructor. __isset_bitfield = 0; - read(new TCompactProtocol(new TIOStreamTransport(in))); - } catch (TException te) { + read( + new com.xiaomi.infra.pegasus.thrift.protocol.TCompactProtocol( + new com.xiaomi.infra.pegasus.thrift.transport.TIOStreamTransport(in))); + } catch (com.xiaomi.infra.pegasus.thrift.TException te) { throw new java.io.IOException(te); } } - private static class metaStandardSchemeFactory implements SchemeFactory { - public metaStandardScheme getScheme() { - return new metaStandardScheme(); + private static class request_metaStandardSchemeFactory + implements com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory { + public request_metaStandardScheme getScheme() { + return new request_metaStandardScheme(); } } - private static class metaStandardScheme extends StandardScheme { + private static class request_metaStandardScheme + extends com.xiaomi.infra.pegasus.thrift.scheme.StandardScheme { - public void read(TProtocol iprot, request_meta struct) throws TException { - TField schemeField; + public void read(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol iprot, request_meta struct) + throws com.xiaomi.infra.pegasus.thrift.TException { + com.xiaomi.infra.pegasus.thrift.protocol.TField schemeField; iprot.readStructBegin(); while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == TType.STOP) { + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { case 1: // APP_ID - if (schemeField.type == TType.I32) { + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.I32) { struct.app_id = iprot.readI32(); struct.setApp_idIsSet(true); } else { - TProtocolUtil.skip(iprot, schemeField.type); + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; case 2: // PARTITION_INDEX - if (schemeField.type == TType.I32) { + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.I32) { struct.partition_index = iprot.readI32(); struct.setPartition_indexIsSet(true); } else { - TProtocolUtil.skip(iprot, schemeField.type); + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; case 3: // CLIENT_TIMEOUT - if (schemeField.type == TType.I32) { + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.I32) { struct.client_timeout = iprot.readI32(); struct.setClient_timeoutIsSet(true); } else { - TProtocolUtil.skip(iprot, schemeField.type); + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; case 4: // PARTITION_HASH - if (schemeField.type == TType.I64) { + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.I64) { struct.partition_hash = iprot.readI64(); struct.setPartition_hashIsSet(true); } else { - TProtocolUtil.skip(iprot, schemeField.type); + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 5: // IS_BACKUP_REQUEST + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.BOOL) { + struct.is_backup_request = iprot.readBool(); + struct.setIs_backup_requestIsSet(true); + } else { + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; default: - TProtocolUtil.skip(iprot, schemeField.type); + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } iprot.readFieldEnd(); } @@ -592,7 +734,8 @@ public void read(TProtocol iprot, request_meta struct) throws TException { struct.validate(); } - public void write(TProtocol oprot, request_meta struct) throws TException { + public void write(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot, request_meta struct) + throws com.xiaomi.infra.pegasus.thrift.TException { struct.validate(); oprot.writeStructBegin(STRUCT_DESC); @@ -608,23 +751,30 @@ public void write(TProtocol oprot, request_meta struct) throws TException { oprot.writeFieldBegin(PARTITION_HASH_FIELD_DESC); oprot.writeI64(struct.partition_hash); oprot.writeFieldEnd(); + oprot.writeFieldBegin(IS_BACKUP_REQUEST_FIELD_DESC); + oprot.writeBool(struct.is_backup_request); + oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); } } - private static class metaTupleSchemeFactory implements SchemeFactory { - public metaTupleScheme getScheme() { - return new metaTupleScheme(); + private static class request_metaTupleSchemeFactory + implements com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory { + public request_metaTupleScheme getScheme() { + return new request_metaTupleScheme(); } } - private static class metaTupleScheme extends TupleScheme { + private static class request_metaTupleScheme + extends com.xiaomi.infra.pegasus.thrift.scheme.TupleScheme { @Override - public void write(TProtocol prot, request_meta struct) throws TException { - TTupleProtocol oprot = (TTupleProtocol) prot; - BitSet optionals = new BitSet(); + public void write(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol prot, request_meta struct) + throws com.xiaomi.infra.pegasus.thrift.TException { + com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol oprot = + (com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); if (struct.isSetApp_id()) { optionals.set(0); } @@ -637,7 +787,10 @@ public void write(TProtocol prot, request_meta struct) throws TException { if (struct.isSetPartition_hash()) { optionals.set(3); } - oprot.writeBitSet(optionals, 4); + if (struct.isSetIs_backup_request()) { + optionals.set(4); + } + oprot.writeBitSet(optionals, 5); if (struct.isSetApp_id()) { oprot.writeI32(struct.app_id); } @@ -650,12 +803,17 @@ public void write(TProtocol prot, request_meta struct) throws TException { if (struct.isSetPartition_hash()) { oprot.writeI64(struct.partition_hash); } + if (struct.isSetIs_backup_request()) { + oprot.writeBool(struct.is_backup_request); + } } @Override - public void read(TProtocol prot, request_meta struct) throws TException { - TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(4); + public void read(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol prot, request_meta struct) + throws com.xiaomi.infra.pegasus.thrift.TException { + com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol iprot = + (com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(5); if (incoming.get(0)) { struct.app_id = iprot.readI32(); struct.setApp_idIsSet(true); @@ -672,6 +830,18 @@ public void read(TProtocol prot, request_meta struct) throws TException { struct.partition_hash = iprot.readI64(); struct.setPartition_hashIsSet(true); } + if (incoming.get(4)) { + struct.is_backup_request = iprot.readBool(); + struct.setIs_backup_requestIsSet(true); + } } } + + private static S scheme( + com.xiaomi.infra.pegasus.thrift.protocol.TProtocol proto) { + return (com.xiaomi.infra.pegasus.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) + ? STANDARD_SCHEME_FACTORY + : TUPLE_SCHEME_FACTORY) + .getScheme(); + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java index 0aa05166bf..858eff95e7 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java @@ -66,7 +66,7 @@ public static Cluster createCluster(Properties config) throws IllegalArgumentExc public abstract String[] getMetaList(); - public abstract Table openTable(String name, KeyHasher function) + public abstract Table openTable(String name, KeyHasher function, int backupRequestDelayMs) throws ReplicationException, TException; public abstract void close(); diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClientRequestRound.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClientRequestRound.java index 00cafb49b3..15db2c47d5 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClientRequestRound.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClientRequestRound.java @@ -6,6 +6,7 @@ import com.xiaomi.infra.pegasus.metrics.MetricsManager; import com.xiaomi.infra.pegasus.operator.client_operator; import com.xiaomi.infra.pegasus.rpc.Table; +import java.util.concurrent.ScheduledFuture; import org.slf4j.Logger; /** Created by weijiesun on 16-11-25. */ @@ -19,6 +20,9 @@ public final class ClientRequestRound { boolean enableCounter; long createNanoTime; long expireNanoTime; + boolean isCompleted; + ScheduledFuture backupRequestTask; + /** * Constructor. * @@ -37,6 +41,8 @@ public ClientRequestRound( this.enableCounter = enableCounter; createNanoTime = System.nanoTime(); expireNanoTime = createNanoTime + timeoutMs * 1000000L; + isCompleted = false; + backupRequestTask = null; } public com.xiaomi.infra.pegasus.operator.client_operator getOperator() { diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java index 0c1d5b96b1..ecbce5add0 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java @@ -131,8 +131,9 @@ public String[] getMetaList() { } @Override - public TableHandler openTable(String name, KeyHasher h) throws ReplicationException { - return new TableHandler(this, name, h); + public TableHandler openTable(String name, KeyHasher h, int backupRequestDelayMs) + throws ReplicationException { + return new TableHandler(this, name, h, backupRequestDelayMs); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java index 8df5cdb777..2fb7b88144 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java @@ -122,7 +122,8 @@ public void run() { onFinishQueryMeta(round); } }, - eachQueryTimeoutInMills); + eachQueryTimeoutInMills, + false); } void onFinishQueryMeta(final MetaRequestRound round) { diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index a91c494a55..0a87d9ea18 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -29,6 +29,7 @@ public static class RequestEntry { public Runnable callback; public ScheduledFuture timeoutTask; public long timeoutMs; + public boolean isBackupRequest; } public enum ConnState { @@ -79,7 +80,11 @@ public void setMessageResponseFilter(MessageResponseFilter filter) { this.filter = filter; } - public int asyncSend(client_operator op, Runnable callbackFunc, long timeoutInMilliseconds) { + public int asyncSend( + client_operator op, + Runnable callbackFunc, + long timeoutInMilliseconds, + boolean isBackupRequest) { RequestEntry entry = new RequestEntry(); entry.sequenceId = seqId.getAndIncrement(); entry.op = op; @@ -89,6 +94,7 @@ public int asyncSend(client_operator op, Runnable callbackFunc, long timeoutInMi pendingResponse.put(entry.sequenceId, entry); entry.timeoutTask = addTimer(entry.sequenceId, timeoutInMilliseconds); entry.timeoutMs = timeoutInMilliseconds; + entry.isBackupRequest = isBackupRequest; // We store the connection_state & netty channel in a struct so that they can fetch and update // in atomic. diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 5bfa9880a3..1dbbaa4a77 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -18,8 +18,7 @@ import com.xiaomi.infra.pegasus.rpc.Table; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.EventExecutor; -import java.util.ArrayList; -import java.util.Arrays; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -31,8 +30,9 @@ public class TableHandler extends Table { public static final class ReplicaConfiguration { public gpid pid = new gpid(); public long ballot = 0; - public rpc_address primary = new rpc_address(); - public ReplicaSession session = null; + public rpc_address primaryAddress = new rpc_address(); + public ReplicaSession primarySession = null; + public List secondarySessions = new ArrayList<>(); } static final class TableConfiguration { @@ -47,8 +47,10 @@ static final class TableConfiguration { AtomicReference tableConfig_; AtomicBoolean inQuerying_; long lastQueryTime_; + int backupRequestDelayMs; - public TableHandler(ClusterManager mgr, String name, KeyHasher h) throws ReplicationException { + public TableHandler(ClusterManager mgr, String name, KeyHasher h, int backupRequestDelayMs) + throws ReplicationException { int i = 0; for (; i < name.length(); i++) { char c = name.charAt(i); @@ -92,6 +94,10 @@ public TableHandler(ClusterManager mgr, String name, KeyHasher h) throws Replica // members of this manager_ = mgr; executor_ = manager_.getExecutor(name, 1); + this.backupRequestDelayMs = backupRequestDelayMs; + if (backupRequestDelayMs > 0) { + logger.info("the delay time of backup request is \"{}\"", backupRequestDelayMs); + } tableConfig_ = new AtomicReference(null); initTableConfiguration(resp); @@ -105,6 +111,7 @@ public ReplicaConfiguration getReplicaConfig(int index) { } // update the table configuration & appID_ according to to queried response + // there should only be one thread to do the table config update void initTableConfiguration(query_cfg_response resp) { TableConfiguration oldConfig = tableConfig_.get(); @@ -118,22 +125,29 @@ void initTableConfiguration(query_cfg_response resp) { newConfig.replicas.add(newReplicaConfig); } - // set partition configuration by resp, and create sessions + // create sessions for primary and secondaries FutureGroup futureGroup = new FutureGroup<>(resp.getPartition_count()); for (partition_configuration pc : resp.getPartitions()) { ReplicaConfiguration s = newConfig.replicas.get(pc.getPid().get_pidx()); s.ballot = pc.ballot; - s.primary = pc.primary; - if (pc.primary.isInvalid()) { - s.session = null; - } else { - if (s.session == null || !s.session.getAddress().equals(pc.primary)) { - // reset to new primary - s.session = manager_.getReplicaSession(pc.primary); - ChannelFuture fut = s.session.tryConnect(); - if (fut != null) { - futureGroup.add(fut); - } + + // If the primary address is invalid, we don't create secondary session either. + // Because all of these sessions will be recreated later. + s.primaryAddress = pc.primary; + if (!pc.primary.isInvalid()) { + s.primarySession = tryConnect(pc.primary, futureGroup); + + // backup request is enabled, get all secondary sessions + s.secondarySessions.clear(); + if (isBackupRequestEnabled()) { + // secondary sessions + pc.secondaries.forEach( + secondary -> { + ReplicaSession session = tryConnect(secondary, futureGroup); + if (session != null) { + s.secondarySessions.add(session); + } + }); } } } @@ -151,6 +165,20 @@ void initTableConfiguration(query_cfg_response resp) { tableConfig_.set(newConfig); } + public ReplicaSession tryConnect(final rpc_address addr, FutureGroup futureGroup) { + if (addr.isInvalid()) { + return null; + } + + ReplicaSession session = manager_.getReplicaSession(addr); + ChannelFuture fut = session.tryConnect(); + if (fut != null) { + futureGroup.add(fut); + } + + return session; + } + void onUpdateConfiguration(final query_cfg_operator op) { error_types err = MetaSession.getMetaServiceError(op); if (err != error_types.ERR_OK) { @@ -207,12 +235,26 @@ public void run() { } void onRpcReply( - ClientRequestRound round, - int tryId, - ReplicaConfiguration cachedHandle, - long cachedConfigVersion) { - client_operator operator = round.getOperator(); + ClientRequestRound round, int tryId, long cachedConfigVersion, String serverAddr) { + // judge if it is the first response + if (round.isCompleted) { + return; + } else { + synchronized (round) { + // the fastest response has been received + if (round.isCompleted) { + return; + } + round.isCompleted = true; + } + } + + // cancel the backup request task + if (round.backupRequestTask != null) { + round.backupRequestTask.cancel(true); + } + client_operator operator = round.getOperator(); boolean needQueryMeta = false; switch (operator.rpc_error.errno) { case ERR_OK: @@ -224,7 +266,7 @@ void onRpcReply( logger.warn( "{}: replica server({}) rpc timeout for gpid({}), operator({}), try({}), error_code({}), not retry", tableName_, - cachedHandle.session.name(), + serverAddr, operator.get_gpid().toString(), operator, tryId, @@ -238,7 +280,7 @@ void onRpcReply( logger.warn( "{}: replica server({}) doesn't serve gpid({}), operator({}), try({}), error_code({}), need query meta", tableName_, - cachedHandle.session.name(), + serverAddr, operator.get_gpid().toString(), operator, tryId, @@ -252,7 +294,7 @@ void onRpcReply( logger.warn( "{}: replica server({}) can't serve writing for gpid({}), operator({}), try({}), error_code({}), retry later", tableName_, - cachedHandle.session.name(), + serverAddr, operator.get_gpid().toString(), operator, tryId, @@ -264,7 +306,7 @@ void onRpcReply( logger.error( "{}: replica server({}) fails for gpid({}), operator({}), try({}), error_code({}), not retry", tableName_, - cachedHandle.session.name(), + serverAddr, operator.get_gpid().toString(), operator, tryId, @@ -277,7 +319,11 @@ void onRpcReply( tryQueryMeta(cachedConfigVersion); } - tryDelayCall(round, tryId + 1); + // must use new round here, because round.isSuccess is true now + tryDelayCall( + new ClientRequestRound( + round.operator, round.callback, round.enableCounter, round.timeoutMs), + tryId + 1); } void tryDelayCall(final ClientRequestRound round, final int tryId) { @@ -307,16 +353,24 @@ void call(final ClientRequestRound round, final int tryId) { final TableConfiguration tableConfig = tableConfig_.get(); final ReplicaConfiguration handle = tableConfig.replicas.get(round.getOperator().get_gpid().get_pidx()); - if (handle.session != null) { - handle.session.asyncSend( + + if (handle.primarySession != null) { + // if backup request is enabled, schedule to send to secondary + if (round.operator.enableBackupRequest && isBackupRequestEnabled()) { + backupCall(round, tryId); + } + + // send request to primary + handle.primarySession.asyncSend( round.getOperator(), new Runnable() { @Override public void run() { - onRpcReply(round, tryId, handle, tableConfig.updateVersion); + onRpcReply(round, tryId, tableConfig.updateVersion, handle.primarySession.name()); } }, - round.timeoutMs); + round.timeoutMs, + false); } else { logger.warn( "{}: no primary for gpid({}), operator({}), try({}), retry later", @@ -329,6 +383,37 @@ public void run() { } } + void backupCall(final ClientRequestRound round, final int tryId) { + final TableConfiguration tableConfig = tableConfig_.get(); + final ReplicaConfiguration handle = + tableConfig.replicas.get(round.getOperator().get_gpid().get_pidx()); + + round.backupRequestTask = + executor_.schedule( + new Runnable() { + @Override + public void run() { + // pick a secondary at random + ReplicaSession secondarySession = + handle.secondarySessions.get( + new Random().nextInt(handle.secondarySessions.size())); + secondarySession.asyncSend( + round.getOperator(), + new Runnable() { + @Override + public void run() { + onRpcReply( + round, tryId, tableConfig.updateVersion, secondarySession.name()); + } + }, + round.timeoutMs, + true); + } + }, + backupRequestDelayMs, + TimeUnit.MILLISECONDS); + } + @Override public int getPartitionCount() { return tableConfig_.get().replicas.size(); @@ -413,4 +498,8 @@ private void handleMetaException(error_types err_type, ClusterManager mgr, Strin } throw new ReplicationException(err_type, header + message); } + + private boolean isBackupRequestEnabled() { + return backupRequestDelayMs > 0; + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java index 3345f0fa96..8ad321c234 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java @@ -33,7 +33,7 @@ protected void encode(ChannelHandlerContext ctx, ReplicaSession.RequestEntry e, TBinaryProtocol protocol = new TBinaryProtocol(new TByteBufTransport(out)); // write meta - e.op.prepare_thrift_meta(protocol, (int) e.timeoutMs); + e.op.prepare_thrift_meta(protocol, (int) e.timeoutMs, e.isBackupRequest); int meta_length = out.readableBytes() - ThriftHeader.HEADER_LENGTH; // write body diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java index e52d999107..d0772ef3d8 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java @@ -51,7 +51,7 @@ public void testVersion() { public void testHandleReplicationException() throws Exception { String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}; ClusterManager manager = new ClusterManager(1000, 1, false, null, 60, metaList); - TableHandler table = manager.openTable("temp", KeyHasher.DEFAULT); + TableHandler table = manager.openTable("temp", KeyHasher.DEFAULT, 0); DefaultPromise promise = table.newPromise(); update_request req = new update_request(new blob(), new blob(), 100); gpid gpid = table.getGpidByHash(1); @@ -66,7 +66,8 @@ public void testHandleReplicationException() throws Exception { promise.get(); } catch (ExecutionException e) { TableHandler.ReplicaConfiguration replicaConfig = table.getReplicaConfig(gpid.get_pidx()); - String server = replicaConfig.primary.get_ip() + ":" + replicaConfig.primary.get_port(); + String server = + replicaConfig.primaryAddress.get_ip() + ":" + replicaConfig.primaryAddress.get_port(); String msg = String.format( @@ -86,7 +87,7 @@ public void testTimeOutIsZero() throws Exception { // timeout is 0. String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}; ClusterManager manager = new ClusterManager(1000, 1, false, null, 60, metaList); - TableHandler table = manager.openTable("temp", KeyHasher.DEFAULT); + TableHandler table = manager.openTable("temp", KeyHasher.DEFAULT, 0); DefaultPromise promise = table.newPromise(); update_request req = new update_request(new blob(), new blob(), 100); gpid gpid = table.getGpidByHash(1); @@ -99,7 +100,8 @@ public void testTimeOutIsZero() throws Exception { promise.get(); } catch (Exception e) { TableHandler.ReplicaConfiguration replicaConfig = table.getReplicaConfig(gpid.get_pidx()); - String server = replicaConfig.primary.get_ip() + ":" + replicaConfig.primary.get_port(); + String server = + replicaConfig.primaryAddress.get_ip() + ":" + replicaConfig.primaryAddress.get_port(); String msg = String.format( diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java index 7a2eb683e6..7faa8e61ef 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java @@ -47,7 +47,7 @@ public void testOpenTable() throws Exception { TableHandler result = null; try { - result = testManager.openTable("testName", KeyHasher.DEFAULT); + result = testManager.openTable("testName", KeyHasher.DEFAULT, 0); } catch (ReplicationException e) { Assert.assertEquals(error_code.error_types.ERR_SESSION_RESET, e.getErrorType()); } finally { @@ -61,7 +61,7 @@ public void testOpenTable() throws Exception { }; testManager = new ClusterManager(1000, 1, false, null, 60, addr_list2); try { - result = testManager.openTable("hehe", KeyHasher.DEFAULT); + result = testManager.openTable("hehe", KeyHasher.DEFAULT, 0); } catch (ReplicationException e) { Assert.assertEquals(error_code.error_types.ERR_OBJECT_NOT_FOUND, e.getErrorType()); } finally { @@ -70,7 +70,7 @@ public void testOpenTable() throws Exception { // test open an valid table try { - result = testManager.openTable("temp", KeyHasher.DEFAULT); + result = testManager.openTable("temp", KeyHasher.DEFAULT, 0); } catch (ReplicationException e) { Assert.fail(); } finally { diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java index 9f8d41d4b7..4dd8997e5a 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java @@ -72,7 +72,7 @@ public Void call() throws Exception { }); callbacks.add(cb); - rs.asyncSend(op, cb, 1000); + rs.asyncSend(op, cb, 1000, false); } for (FutureTask cb : callbacks) { @@ -129,7 +129,7 @@ public Void call() throws Exception { }); callbacks.add(cb); - rs.asyncSend(op, cb, 500); + rs.asyncSend(op, cb, 500, false); } for (int i = 0; i < 80; ++i) { @@ -151,7 +151,7 @@ public Void call() throws Exception { callbacks.add(cb); // these requests have longer timeout, so they should be responsed later than the server is // killed - rs.asyncSend(op, cb, 2000); + rs.asyncSend(op, cb, 2000, false); } for (FutureTask cb : callbacks) { @@ -206,7 +206,7 @@ public Void call() throws Exception { return null; } }); - rs.asyncSend(op, cb, 2000); + rs.asyncSend(op, cb, 2000, false); Tools.waitUninterruptable(cb, Integer.MAX_VALUE); } } diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java index 22906763bb..f2d89c6fba 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java @@ -60,7 +60,7 @@ public void testOperateOp() throws Exception { System.out.println("TableHandlerTest#testOperateOp"); TableHandler table = null; try { - table = testManager.openTable("temp", KeyHasher.DEFAULT); + table = testManager.openTable("temp", KeyHasher.DEFAULT, 0); } catch (ReplicationException e) { Assert.fail(); } @@ -77,14 +77,14 @@ public void testOperateOp() throws Exception { ReplicaConfiguration handle = table.getReplicaConfig(pid.get_pidx()); // 1. modify the replica handler to a not exist one - final com.xiaomi.infra.pegasus.base.rpc_address old_addr = handle.session.getAddress(); + final com.xiaomi.infra.pegasus.base.rpc_address old_addr = handle.primarySession.getAddress(); logger.info("the right primary for {} is {}", pid.toString(), old_addr.toString()); com.xiaomi.infra.pegasus.base.rpc_address addr = new com.xiaomi.infra.pegasus.base.rpc_address(); addr.fromString("127.0.0.1:123"); handle.ballot--; - handle.session = testManager.getReplicaSession(addr); + handle.primarySession = testManager.getReplicaSession(addr); client_operator op = new Toollet.test_operator(pid, request); @@ -103,7 +103,8 @@ public void testOperateOp() throws Exception { new Toollet.BoolCallable() { @Override public boolean call() { - ReplicaSession session = finalTableRef.getReplicaConfig(pid.get_pidx()).session; + ReplicaSession session = + finalTableRef.getReplicaConfig(pid.get_pidx()).primarySession; if (session == null) return false; return session.getAddress().equals(old_addr); } @@ -128,7 +129,7 @@ public boolean call() { Assert.assertFalse(addr.equals(old_addr)); handle.ballot--; - handle.session = testManager.getReplicaSession(addr); + handle.primarySession = testManager.getReplicaSession(addr); op = new Toollet.test_operator(pid, request); try { @@ -146,7 +147,7 @@ public void testTryQueryMeta() throws Exception { TableHandler table = null; try { - table = testManager.openTable("temp", KeyHasher.DEFAULT); + table = testManager.openTable("temp", KeyHasher.DEFAULT, 0); } catch (ReplicationException e) { Assert.fail(); } @@ -157,14 +158,14 @@ public void testTryQueryMeta() throws Exception { for (int i = 0; i < tableConfig.replicas.size(); ++i) { ReplicaConfiguration handle = tableConfig.replicas.get(i); Assert.assertNotNull(handle); - Assert.assertNotNull(handle.session); + Assert.assertNotNull(handle.primarySession); } // mark a handler to inactive ReplicaConfiguration handle = tableConfig.replicas.get(0); long oldBallot = handle.ballot - 1; handle.ballot = oldBallot; - handle.session = null; + handle.primarySession = null; boolean doTheQuerying = table.tryQueryMeta(tableConfig.updateVersion); Assert.assertTrue(doTheQuerying); @@ -175,7 +176,7 @@ public void testTryQueryMeta() throws Exception { new Toollet.BoolCallable() { @Override public boolean call() { - return finalRef.getReplicaConfig(0).session != null; + return finalRef.getReplicaConfig(0).primarySession != null; } }, 10)); @@ -190,7 +191,7 @@ public void testConnectAfterQueryMeta() throws Exception { TableHandler table = null; try { - table = testManager.openTable("temp", KeyHasher.DEFAULT); + table = testManager.openTable("temp", KeyHasher.DEFAULT, 0); } catch (ReplicationException e) { Assert.fail(); } @@ -199,7 +200,7 @@ public void testConnectAfterQueryMeta() throws Exception { Thread.sleep(100); ArrayList replicas = table.tableConfig_.get().replicas; for (ReplicaConfiguration r : replicas) { - Assert.assertEquals(r.session.getState(), ReplicaSession.ConnState.CONNECTED); + Assert.assertEquals(r.primarySession.getState(), ReplicaSession.ConnState.CONNECTED); } } } diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java index c1d1e0be85..dcc18dcb5b 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java @@ -66,7 +66,7 @@ public void timeoutChecker() { TableHandler handle; try { - handle = manager.openTable("temp", KeyHasher.DEFAULT); + handle = manager.openTable("temp", KeyHasher.DEFAULT, 0); } catch (ReplicationException e) { e.printStackTrace(); Assert.fail();