Skip to content

Commit

Permalink
feat: implement backup request (apache#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhao liwei authored Mar 17, 2020
1 parent d4a30a0 commit 28b67ee
Show file tree
Hide file tree
Showing 20 changed files with 536 additions and 211 deletions.
14 changes: 13 additions & 1 deletion src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,19 @@ public long hash(byte[] key) {
}

private PegasusTable getTable(String tableName) throws PException {
return getTable(tableName, 0);
}

private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws PException {
PegasusTable table = tableMap.get(tableName);
if (table == null) {
synchronized (tableMapLock) {
table = tableMap.get(tableName);
if (table == null) {
try {
table = new PegasusTable(this, cluster.openTable(tableName, new PegasusHasher()));
table =
new PegasusTable(
this, cluster.openTable(tableName, new PegasusHasher(), backupRequestDelayMs));
} catch (Throwable e) {
throw new PException(e);
}
Expand Down Expand Up @@ -194,6 +200,12 @@ public PegasusTableInterface openTable(String tableName) throws PException {
return getTable(tableName);
}

@Override
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
throws PException {
return getTable(tableName, backupRequestDelayMs);
}

@Override
public Properties getConfiguration() {
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,29 @@ public interface PegasusClientInterface {
*/
public PegasusTableInterface openTable(String tableName) throws PException;

/**
* Open a table, and prepare the sessions and route-table to the replica-servers.
*
* <p>Please notice that pegasus support two kinds of API: 1. the client-interface way, which is
* provided in this class. 2. the table-interface way, which is provided by {@link
* PegasusTableInterface}. With the client-interface, you don't need to create
* PegasusTableInterface by openTable, so you can access the pegasus cluster conveniently.
* However, the client-interface's api also has some restrictions: 1. we don't provide async
* methods in client-interface. 2. the timeout in client-interface isn't as accurate as the
* table-interface. 3. the client-interface may throw an exception when open table fails. It means
* that you may need to handle this exception in every data access operation, which is annoying.
* 4. You can't specify a per-operation timeout. So we recommend you to use the table-interface.
*
* @param tableName the table should be exist on the server, which is created before by the system
* administrator
* @param backupRequestDelayMs the delay time to send backup request. If backupRequestDelayMs <=
* 0, The backup request is disabled.
* @return the table handler
* @throws PException throws exception if any error occurs.
*/
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
throws PException;

/**
* Check value exist by key from the cluster
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,9 @@ public void handleReplicaException(
String replicaServer;
try {
replicaServer =
replicaConfiguration.primary.get_ip() + ":" + replicaConfiguration.primary.get_port();
replicaConfiguration.primaryAddress.get_ip()
+ ":"
+ replicaConfiguration.primaryAddress.get_port();
} catch (UnknownHostException e) {
promise.setFailure(new PException(e));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,25 @@
import com.xiaomi.infra.pegasus.thrift.TException;

public abstract class client_operator {
public client_operator(gpid gpid, String tableName) {
public client_operator(gpid gpid, String tableName, boolean enableBackupRequest) {
this.header = new ThriftHeader();
this.meta = new request_meta();
this.meta.setApp_id(gpid.get_app_id());
this.meta.setPartition_index(gpid.get_pidx());
this.pid = gpid;
this.tableName = tableName;
this.rpc_error = new error_code();
this.enableBackupRequest = enableBackupRequest;
}

public client_operator(
gpid gpid, String tableName, long partitionHash, boolean enableBackupRequest) {
this(gpid, tableName, enableBackupRequest);
this.meta.setPartition_hash(partitionHash);
}

public client_operator(gpid gpid, String tableName, long partitionHash) {
this(gpid, tableName);
this(gpid, tableName, false);
this.meta.setPartition_hash(partitionHash);
}

Expand All @@ -32,9 +39,12 @@ public final byte[] prepare_thrift_header(int meta_length, int body_length) {
}

public final void prepare_thrift_meta(
com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot, int client_timeout)
com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot,
int client_timeout,
boolean isBackupRequest)
throws TException {
this.meta.setClient_timeout(client_timeout);
this.meta.setIs_backup_request(isBackupRequest);
this.meta.write(oprot);
}

Expand All @@ -51,6 +61,7 @@ public String getQPSCounter() {
mark = "fail";
break;
}

// pegasus.client.put.succ.qps
return new StringBuilder()
.append("pegasus.client.")
Expand Down Expand Up @@ -89,4 +100,5 @@ public abstract void recv_data(com.xiaomi.infra.pegasus.thrift.protocol.TProtoco
public gpid pid;
public String tableName; // only for metrics
public error_code rpc_error;
public boolean enableBackupRequest;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class rrdb_get_operator extends client_operator {
public rrdb_get_operator(
com.xiaomi.infra.pegasus.base.gpid gpid, String tableName, blob request, long partitionHash) {
super(gpid, tableName, partitionHash);
super(gpid, tableName, partitionHash, true);
this.request = request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public rrdb_multi_get_operator(
String tableName,
multi_get_request request,
long partitionHash) {
super(gpid, tableName, partitionHash);
super(gpid, tableName, partitionHash, true);
this.request = request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
public class rrdb_ttl_operator extends client_operator {
public rrdb_ttl_operator(
com.xiaomi.infra.pegasus.base.gpid gpid, String tableName, blob request, long partitionHash) {
super(gpid, tableName, partitionHash);
super(gpid, tableName, partitionHash, true);
this.request = request;
}

Expand Down
Loading

0 comments on commit 28b67ee

Please sign in to comment.