Skip to content

Commit

Permalink
fix: connection tracker update after failover
Browse files Browse the repository at this point in the history
  • Loading branch information
joyc-bq committed Jan 6, 2025
1 parent 9e0cfe9 commit 958cff4
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export class ConnectionStringHostListProvider implements StaticHostListProvider
return Promise.resolve(this.hostList);
}

getHostRole(client: AwsClient): Promise<HostRole> {
getHostRole(client: ClientWrapper): Promise<HostRole> {
throw new AwsWrapperError("ConnectionStringHostListProvider does not support getHostRole.");
}

Expand Down
2 changes: 1 addition & 1 deletion common/lib/host_list_provider/host_list_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export interface HostListProvider {

forceRefresh(client: ClientWrapper): Promise<HostInfo[]>;

getHostRole(client: AwsClient, dialect: DatabaseDialect): Promise<HostRole>;
getHostRole(client: ClientWrapper, dialect: DatabaseDialect): Promise<HostRole>;

identifyConnection(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo | null>;

Expand Down
8 changes: 4 additions & 4 deletions common/lib/host_list_provider/rds_host_list_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ export class RdsHostListProvider implements DynamicHostListProvider {
throw new AwsWrapperError("Could not retrieve targetClient.");
}

async getHostRole(client: AwsClient, dialect: DatabaseDialect): Promise<HostRole> {
async getHostRole(client: ClientWrapper, dialect: DatabaseDialect): Promise<HostRole> {
if (!this.isTopologyAwareDatabaseDialect(dialect)) {
throw new TypeError(Messages.get("RdsHostListProvider.incorrectDialect"));
}

if (client.targetClient) {
return dialect.getHostRole(client.targetClient);
if (client) {
return await dialect.getHostRole(client);
} else {
throw new AwsWrapperError(Messages.get("AwsClient targetClient not defined."));
throw new AwsWrapperError(Messages.get("AwsClient.targetClientNotDefined"));
}
}

Expand Down
2 changes: 1 addition & 1 deletion common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import { ClientWrapper } from "./client_wrapper";
import { logger } from "../logutils";
import { Messages } from "./utils/messages";
import { DatabaseDialectCodes } from "./database_dialect/database_dialect_codes";
import { getWriter } from "./utils/utils";
import { getWriter, logTopology } from "./utils/utils";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { DriverDialect } from "./driver_dialect/driver_dialect";
import { ConfigurationProfile } from "./profile/configuration_profile";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ export class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin impl

private async checkWriterChanged(): Promise<void> {
const hostInfoAfterFailover = this.getWriter(this.pluginService.getHosts());

if (this.currentWriter === null) {
this.currentWriter = hostInfoAfterFailover;
this.needUpdateCurrentWriter = false;
Expand All @@ -127,10 +126,12 @@ export class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin impl
async notifyHostListChanged(changes: Map<string, Set<HostChangeOptions>>): Promise<void> {
for (const [key, _] of changes.entries()) {
const hostChanges = changes.get(key);

if (hostChanges) {
if (hostChanges.has(HostChangeOptions.PROMOTED_TO_READER)) {
await this.tracker.invalidateAllConnectionsMultipleHosts(key);
}

if (hostChanges.has(HostChangeOptions.PROMOTED_TO_WRITER)) {
this.needUpdateCurrentWriter = true;
}
Expand Down
20 changes: 18 additions & 2 deletions common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
private static readonly TELEMETRY_WRITER_FAILOVER = "failover to writer instance";
private static readonly TELEMETRY_READER_FAILOVER = "failover to replica";
private static readonly METHOD_END = "end";

private static readonly subscribedMethods: Set<string> = new Set([
"initHostProvider",
"connect",
Expand Down Expand Up @@ -392,7 +393,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
await this.pluginService.setCurrentClient(result.client, result.newHost);
this.pluginService.getCurrentHostInfo()?.removeAlias(Array.from(oldAliases));
await this.updateTopology(true);
this.failoverReaderSuccessCounter.inc();
await this.throwFailoverSuccessError();
} catch (error: any) {
this.failoverReaderFailedCounter.inc();
throw error;
Expand All @@ -405,6 +406,21 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
}
}

async throwFailoverSuccessError() {
if (this._isInTransaction || this.pluginService.isInTransaction()) {
this.pluginService.setInTransaction(false);

// "Transaction resolution unknown. Please re-configure session state if required and try
// restarting transaction."
logger.debug(Messages.get("Failover.transactionResolutionUnknownError"));
throw new TransactionResolutionUnknownError(Messages.get("Failover.transactionResolutionUnknownError"));
} else {
// "The active SQL connection has changed due to a connection failure. Please re-configure
// session state if required."
throw new FailoverSuccessError(Messages.get("Failover.connectionChangedError"));
}
}

async failoverWriter() {
logger.debug(Messages.get("Failover.startWriterFailover"));

Expand Down Expand Up @@ -439,7 +455,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
await this.pluginService.setCurrentClient(result.client, writerHostInfo);
logger.debug(Messages.get("Failover.establishedConnection", this.pluginService.getCurrentHostInfo()?.host ?? ""));
await this.pluginService.refreshHostList();
this.failoverWriterSuccessCounter.inc();
await this.throwFailoverSuccessError();
} catch (error: any) {
this.failoverWriterFailedCounter.inc();
throw error;
Expand Down
33 changes: 18 additions & 15 deletions common/lib/plugins/failover/reader_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,14 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
// ensure new connection is to a reader host
await this.pluginService.refreshHostList();
const topology = this.pluginService.getHosts();

for (let i = 0; i < topology.length; i++) {
const host = topology[i];
if (host.host === result.newHost.host) {
// found new connection host in the latest topology
if (host.role === HostRole.READER) {
return result;
}
try {
if ((await this.pluginService.getHostRole(result.client)) !== HostRole.READER) {
return result;
}
} catch (error) {
logger.debug(Messages.get("ClusterAwareReaderFailoverHandler.errorGettingHostRole", error.message));
}

// New host is not found in the latest topology. There are few possible reasons for that.
// - Host is not yet presented in the topology due to failover process in progress
// - Host is in the topology but its role isn't a
// READER (that is not acceptable option due to this.strictReader setting)
// Need to continue this loop and to make another try to connect to a reader.

try {
await this.pluginService.abortTargetClient(result.client);
} catch (error) {
Expand Down Expand Up @@ -276,7 +267,9 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler

const hostsByPriority: HostInfo[] = [...activeReaders];
const numReaders: number = activeReaders.length + downHostList.length;
if (writerHost && (!this.enableFailoverStrictReader || numReaders === 0)) {
// Since the writer instance may change during failover, the original writer is likely now a reader. We will include
// it and then verify the role once connected if using "strict-reader".
if (writerHost || numReaders === 0) {
hostsByPriority.push(writerHost);
}
hostsByPriority.push(...downHostList);
Expand Down Expand Up @@ -322,6 +315,16 @@ class ConnectionAttemptTask {
);
try {
this.targetClient = await this.pluginService.forceConnect(this.newHost, copy);

// Ensure that new connection is a connection to a reader host
try {
if ((await this.pluginService.getHostRole(this.targetClient)) === HostRole.READER) {
return this.targetClient;
}
} catch (error: any) {
logger.debug(Messages.get("ClusterAwareReaderFailoverHandler.errorGettingHostRole", error.message));
}

this.pluginService.setAvailability(this.newHost.allAliases, HostAvailability.AVAILABLE);
logger.info(Messages.get("ClusterAwareReaderFailoverHandler.successfulReaderConnection", this.newHost.host));
if (this.taskHandler.getSelectedConnectionAttemptTask(this.failoverTaskId) === -1) {
Expand Down
4 changes: 3 additions & 1 deletion common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"ClusterAwareWriterFailoverHandler.standaloneHost": "[TaskB] Host %s is not yet connected to a cluster. The cluster is still being reconfigured.",
"ClusterAwareWriterFailoverHandler.taskBAttemptConnectionToNewWriter": "[TaskB] Trying to connect to a new writer: '%s'",
"ClusterAwareWriterFailoverHandler.alreadyWriter": "Current reader connection is actually a new writer connection.",
"ClusterAwareReaderFailoverHandler.errorGettingHostRole": "An error occurred while trying to determine the role of the reader candidate: %s.",
"Failover.TransactionResolutionUnknownError": "Transaction resolution unknown. Please re-configure session state if required and try restarting the transaction.",
"Failover.connectionChangedError": "The active SQL connection has changed due to a connection failure. Please re-configure session state if required.",
"Failover.parameterValue": "%s = %s",
Expand Down Expand Up @@ -193,5 +194,6 @@
"ConfigurationProfileBuilder.notFound": "Configuration profile '%s' not found.",
"ConfigurationProfileBuilder.profileNameRequired": "Profile name is required.",
"ConfigurationProfileBuilder.canNotUpdateKnownPreset": "Can't add or update a built-in preset configuration profile '%s'.",
"AwsClient.configurationProfileNotFound": "Configuration profile '%s' not found."
"AwsClient.configurationProfileNotFound": "Configuration profile '%s' not found.",
"AwsClient.targetClientNotDefined": "AwsClient targetClient not defined."
}
2 changes: 1 addition & 1 deletion pg/lib/dialect/aurora_pg_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class AuroraPgDatabaseDialect extends PgDatabaseDialect implements Topolo

async getHostRole(targetClient: ClientWrapper): Promise<HostRole> {
const res = await targetClient.query(AuroraPgDatabaseDialect.IS_READER_QUERY);
return Promise.resolve(res.rows[0]["is_reader"] === "true" ? HostRole.READER : HostRole.WRITER);
return Promise.resolve(res.rows[0]["is_reader"] === true ? HostRole.READER : HostRole.WRITER);
}

async isDialect(targetClient: ClientWrapper): Promise<boolean> {
Expand Down

0 comments on commit 958cff4

Please sign in to comment.