Skip to content

Commit

Permalink
feat: multi-az failover2 (#396)
Browse files Browse the repository at this point in the history
  • Loading branch information
sophia-bq authored Feb 7, 2025
1 parent 4e4760d commit 78bcf60
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 174 deletions.
2 changes: 2 additions & 0 deletions common/lib/host_list_provider/host_list_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export type StaticHostListProvider = HostListProvider;

export interface BlockingHostListProvider extends HostListProvider {
forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]>;

clearAll(): Promise<void>;
}

export interface HostListProvider {
Expand Down
2 changes: 1 addition & 1 deletion common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
return false;
}

protected isBlockingHostListProvider(arg: any): arg is BlockingHostListProvider {
isBlockingHostListProvider(arg: any): arg is BlockingHostListProvider {
return arg;
}

Expand Down
9 changes: 6 additions & 3 deletions common/lib/plugins/failover2/failover2_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import { HostAvailability } from "../../host_availability/host_availability";
import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level";
import { HostRole } from "../../host_role";
import { CanReleaseResources } from "../../can_release_resources";
import { MonitoringRdsHostListProvider } from "../../host_list_provider/monitoring/monitoring_host_list_provider";
import { ReaderFailoverResult } from "../failover/reader_failover_result";
import { HostListProvider } from "../../host_list_provider/host_list_provider";

export class Failover2Plugin extends AbstractConnectionPlugin implements CanReleaseResources {
private static readonly TELEMETRY_WRITER_FAILOVER = "failover to writer instance";
Expand Down Expand Up @@ -401,7 +401,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele

if ((await this.pluginService.getHostRole(writerCandidateClient)) !== HostRole.WRITER) {
try {
await writerCandidateClient.end();
await writerCandidateClient?.end();
} catch (error) {
// Do nothing.
}
Expand Down Expand Up @@ -481,6 +481,9 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele
}

async releaseResources(): Promise<void> {
await (this.pluginService.getHostListProvider() as MonitoringRdsHostListProvider).clearAll();
const hostListProvider: HostListProvider = this.pluginService.getHostListProvider();
if (!!this.pluginService.isBlockingHostListProvider(hostListProvider)) {
await hostListProvider.clearAll();
}
}
}
3 changes: 2 additions & 1 deletion common/lib/topology_aware_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ export interface TopologyAwareDatabaseDialect {

getHostRole(client: ClientWrapper): Promise<HostRole>;

getWriterId(client: ClientWrapper): Promise<string | null>;
// Returns the host id of the targetClient if it is connected to a writer, null otherwise.
getWriterId(targetClient: ClientWrapper): Promise<string | null>;
}
31 changes: 27 additions & 4 deletions mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@ import { AwsWrapperError } from "../../../common/lib/utils/errors";
import { TopologyAwareDatabaseDialect } from "../../../common/lib/topology_aware_database_dialect";
import { RdsHostListProvider } from "../../../common/lib/host_list_provider/rds_host_list_provider";
import { FailoverRestriction } from "../../../common/lib/plugins/failover/failover_restriction";
import { WrapperProperties } from "../../../common/lib/wrapper_property";
import { PluginService } from "../../../common/lib/plugin_service";
import { MonitoringRdsHostListProvider } from "../../../common/lib/host_list_provider/monitoring/monitoring_host_list_provider";

export class RdsMultiAZMySQLDatabaseDialect extends MySQLDatabaseDialect implements TopologyAwareDatabaseDialect {
private static readonly TOPOLOGY_QUERY: string = "SELECT id, endpoint, port FROM mysql.rds_topology";
private static readonly TOPOLOGY_TABLE_EXIST_QUERY: string =
"SELECT 1 AS tmp FROM information_schema.tables WHERE" + " table_schema = 'mysql' AND table_name = 'rds_topology'";
// For reader hosts, the query should return a writer host id. For a writer host, the query should return no data.
private static readonly FETCH_WRITER_HOST_QUERY: string = "SHOW REPLICA STATUS";
private static readonly FETCH_WRITER_HOST_QUERY_COLUMN_NAME: string = "Source_Server_Id";
private static readonly HOST_ID_QUERY: string = "SELECT @@server_id AS host";
private static readonly HOST_ID_QUERY_COLUMN_NAME: string = "host";
private static readonly IS_READER_QUERY: string = "SELECT @@read_only";
private static readonly IS_READER_QUERY: string = "SELECT @@read_only AS is_reader";
private static readonly IS_READER_QUERY_COLUMN_NAME: string = "is_reader";

async isDialect(targetClient: ClientWrapper): Promise<boolean> {
const res = await targetClient.query(RdsMultiAZMySQLDatabaseDialect.TOPOLOGY_TABLE_EXIST_QUERY).catch(() => false);
Expand All @@ -48,6 +53,9 @@ export class RdsMultiAZMySQLDatabaseDialect extends MySQLDatabaseDialect impleme
}

getHostListProvider(props: Map<string, any>, originalUrl: string, hostListProviderService: HostListProviderService): HostListProvider {
if (WrapperProperties.PLUGINS.get(props).includes("failover2")) {
return new MonitoringRdsHostListProvider(props, originalUrl, hostListProviderService, <PluginService>hostListProviderService);
}
return new RdsHostListProvider(props, originalUrl, hostListProviderService);
}

Expand Down Expand Up @@ -118,11 +126,26 @@ export class RdsMultiAZMySQLDatabaseDialect extends MySQLDatabaseDialect impleme
}

async getHostRole(client: ClientWrapper): Promise<HostRole> {
return (await this.executeTopologyRelatedQuery(client, RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY)) ? HostRole.WRITER : HostRole.READER;
return (await this.executeTopologyRelatedQuery(client, RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY, RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY_COLUMN_NAME)) == "0" ? HostRole.WRITER : HostRole.READER;
}

getWriterId(client: ClientWrapper): Promise<string> {
throw new Error("Method not implemented.");
async getWriterId(targetClient: ClientWrapper): Promise<string> {
try {
const writerHostId: string = await this.executeTopologyRelatedQuery(
targetClient,
RdsMultiAZMySQLDatabaseDialect.FETCH_WRITER_HOST_QUERY,
RdsMultiAZMySQLDatabaseDialect.FETCH_WRITER_HOST_QUERY_COLUMN_NAME
);
// The above query returns the writer host id if it is a reader, nothing if the writer.
if (!writerHostId) {
const currentConnection = await this.identifyConnection(targetClient);
return currentConnection ?? null;
} else {
return null;
}
} catch (error: any) {
throw new AwsWrapperError(Messages.get("RdsMultiAZMySQLDatabaseDialect.invalidQuery", error.message));
}
}

async identifyConnection(client: ClientWrapper): Promise<string> {
Expand Down
29 changes: 23 additions & 6 deletions pg/lib/dialect/rds_multi_az_pg_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import { RdsHostListProvider } from "../../../common/lib/host_list_provider/rds_
import { PgDatabaseDialect } from "./pg_database_dialect";
import { ErrorHandler } from "../../../common/lib/error_handler";
import { MultiAzPgErrorHandler } from "../multi_az_pg_error_handler";
import { error, info, query } from "winston";
import { WrapperProperties } from "../../../common/lib/wrapper_property";
import { PluginService } from "../../../common/lib/plugin_service";
import { MonitoringRdsHostListProvider } from "../../../common/lib/host_list_provider/monitoring/monitoring_host_list_provider";

export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements TopologyAwareDatabaseDialect {
constructor() {
Expand All @@ -42,7 +44,8 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To
private static readonly FETCH_WRITER_HOST_QUERY_COLUMN_NAME: string = "multi_az_db_cluster_source_dbi_resource_id";
private static readonly HOST_ID_QUERY: string = "SELECT dbi_resource_id FROM rds_tools.dbi_resource_id()";
private static readonly HOST_ID_QUERY_COLUMN_NAME: string = "dbi_resource_id";
private static readonly IS_READER_QUERY: string = "SELECT pg_is_in_recovery()";
private static readonly IS_READER_QUERY: string = "SELECT pg_is_in_recovery() AS is_reader";
private static readonly IS_READER_QUERY_COLUMN_NAME: string = "is_reader";

async isDialect(targetClient: ClientWrapper): Promise<boolean> {
const res = await targetClient.query(RdsMultiAZPgDatabaseDialect.WRITER_HOST_FUNC_EXIST_QUERY).catch(() => false);
Expand All @@ -55,6 +58,9 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To
}

getHostListProvider(props: Map<string, any>, originalUrl: string, hostListProviderService: HostListProviderService): HostListProvider {
if (WrapperProperties.PLUGINS.get(props).includes("failover2")) {
return new MonitoringRdsHostListProvider(props, originalUrl, hostListProviderService, <PluginService>hostListProviderService);
}
return new RdsHostListProvider(props, originalUrl, hostListProviderService);
}

Expand All @@ -77,7 +83,7 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To
}
}

private async executeTopologyRelatedQuery(targetClient: ClientWrapper, query: string, resultColumnName?: string): Promise<string> {
private async executeTopologyRelatedQuery(targetClient: ClientWrapper, query: string, resultColumnName?: string): Promise<any> {
const res = await targetClient.query(query);
const rows: any[] = res.rows;
if (rows.length > 0) {
Expand Down Expand Up @@ -125,11 +131,22 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To
}

async getHostRole(client: ClientWrapper): Promise<HostRole> {
return (await this.executeTopologyRelatedQuery(client, RdsMultiAZPgDatabaseDialect.IS_READER_QUERY)) ? HostRole.WRITER : HostRole.READER;
return (await this.executeTopologyRelatedQuery(client, RdsMultiAZPgDatabaseDialect.IS_READER_QUERY, RdsMultiAZPgDatabaseDialect.IS_READER_QUERY_COLUMN_NAME)) === false ? HostRole.WRITER : HostRole.READER;
}

getWriterId(client: ClientWrapper): Promise<string> {
throw new Error("Method not implemented.");
async getWriterId(targetClient: ClientWrapper): Promise<string> {
try {
const writerHostId: string = await this.executeTopologyRelatedQuery(
targetClient,
RdsMultiAZPgDatabaseDialect.FETCH_WRITER_HOST_QUERY,
RdsMultiAZPgDatabaseDialect.FETCH_WRITER_HOST_QUERY_COLUMN_NAME
);
const currentConnection = await this.identifyConnection(targetClient);

return (currentConnection && currentConnection === writerHostId) ? currentConnection : null;
} catch (error: any) {
throw new AwsWrapperError(Messages.get("RdsMultiAZPgDatabaseDialect.invalidQuery", error.message));
}
}

getErrorHandler(): ErrorHandler {
Expand Down
145 changes: 4 additions & 141 deletions tests/integration/container/tests/aurora_failover2.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import { TestEnvironment } from "./utils/test_environment";
import { DriverHelper } from "./utils/driver_helper";
import { AuroraTestUtility } from "./utils/aurora_test_utility";
import { FailoverSuccessError, TransactionResolutionUnknownError } from "../../../../common/lib/utils/errors";
import { DatabaseEngine } from "./utils/database_engine";
import { QueryResult } from "pg";
import { ProxyHelper } from "./utils/proxy_helper";
import { RdsUtils } from "../../../../common/lib/utils/rds_utils";
import { logger } from "../../../../common/logutils";
import { features, instanceCount } from "./config";
import { TestEnvironmentFeatures } from "./utils/test_environment_features";
import { PluginManager } from "../../../../common/lib";
import { DatabaseEngine } from "./utils/database_engine";
import { TransactionIsolationLevel } from "../../../../common/lib/utils/transaction_isolation_level";
import { RdsUtils } from "../../../../common/lib/utils/rds_utils";
import { QueryResult } from "pg";

const itIf =
features.includes(TestEnvironmentFeatures.FAILOVER_SUPPORTED) &&
Expand All @@ -36,7 +36,6 @@ const itIf =
? it
: it.skip;
const itIfTwoInstance = instanceCount == 2 ? itIf : it.skip;
const itIfMinThreeInstance = instanceCount >= 3 ? itIf : it.skip;

let env: TestEnvironment;
let driver;
Expand Down Expand Up @@ -66,27 +65,6 @@ async function initDefaultConfig(host: string, port: number, connectToProxy: boo
return config;
}

async function initConfigWithRWSplitting(host: string, port: number, connectToProxy: boolean): Promise<any> {
let config: any = {
user: env.databaseInfo.username,
host: host,
database: env.databaseInfo.defaultDbName,
password: env.databaseInfo.password,
port: port,
plugins: "readWriteSplitting,failover2",
failoverTimeoutMs: 400000,
enableTelemetry: true,
telemetryTracesBackend: "OTLP",
telemetryMetricsBackend: "OTLP"
};

if (connectToProxy) {
config["clusterInstanceHostPattern"] = "?." + env.proxyDatabaseInfo.instanceEndpointSuffix;
}
config = DriverHelper.addDriverSpecificConfiguration(config, env.engine);
return config;
}

describe("aurora failover2", () => {
beforeEach(async () => {
logger.info(`Test started: ${expect.getState().currentTestName}`);
Expand All @@ -97,8 +75,6 @@ describe("aurora failover2", () => {
initClientFunc = DriverHelper.getClient(driver);
await ProxyHelper.enableAllConnectivity();
await TestEnvironment.verifyClusterStatus();
await TestEnvironment.verifyAllInstancesHasRightState("available");
await TestEnvironment.verifyAllInstancesUp();

client = null;
secondaryClient = null;
Expand All @@ -112,6 +88,7 @@ describe("aurora failover2", () => {
// pass
}
}

if (secondaryClient !== null) {
try {
await secondaryClient.end();
Expand Down Expand Up @@ -281,118 +258,4 @@ describe("aurora failover2", () => {
},
1320000
);

itIfMinThreeInstance(
"test failover to new writer set read only true false",
async () => {
// Connect to writer instance
const writerConfig = await initConfigWithRWSplitting(
env.proxyDatabaseInfo.writerInstanceEndpoint,
env.proxyDatabaseInfo.instanceEndpointPort,
true
);
client = initClientFunc(writerConfig);
await client.connect();

const initialWriterId = await auroraTestUtility.queryInstanceId(client);
expect(await auroraTestUtility.isDbInstanceWriter(initialWriterId)).toStrictEqual(true);

// Kill all reader instances
for (const host of env.proxyDatabaseInfo.instances) {
if (host.instanceId && host.instanceId !== initialWriterId) {
await ProxyHelper.disableConnectivity(env.engine, host.instanceId);
}
}

// Force internal reader connection to the writer instance
await client.setReadOnly(true);
const currentId0 = await auroraTestUtility.queryInstanceId(client);

expect(currentId0).toStrictEqual(initialWriterId);
await client.setReadOnly(false);

await ProxyHelper.enableAllConnectivity();
// Crash instance 1 and nominate a new writer
await auroraTestUtility.failoverClusterAndWaitUntilWriterChanged();
await TestEnvironment.verifyClusterStatus();

await expect(async () => {
await auroraTestUtility.queryInstanceId(client);
}).rejects.toThrow(FailoverSuccessError);
const newWriterId = await auroraTestUtility.queryInstanceId(client);
expect(await auroraTestUtility.isDbInstanceWriter(newWriterId)).toStrictEqual(true);
expect(newWriterId).not.toBe(initialWriterId);

await client.setReadOnly(true);
const currentReaderId = await auroraTestUtility.queryInstanceId(client);
expect(currentReaderId).not.toBe(newWriterId);

await client.setReadOnly(false);
const currentId = await auroraTestUtility.queryInstanceId(client);
expect(currentId).toStrictEqual(newWriterId);
},
1320000
);

itIfMinThreeInstance(
"test failover to new reader set read only false true",
async () => {
// Connect to writer instance
const writerConfig = await initConfigWithRWSplitting(
env.proxyDatabaseInfo.writerInstanceEndpoint,
env.proxyDatabaseInfo.instanceEndpointPort,
true
);
writerConfig["failoverMode"] = "reader-or-writer";
client = initClientFunc(writerConfig);

await client.connect();
const initialWriterId = await auroraTestUtility.queryInstanceId(client);
expect(await auroraTestUtility.isDbInstanceWriter(initialWriterId)).toStrictEqual(true);
await client.setReadOnly(true);

const readerConnectionId = await auroraTestUtility.queryInstanceId(client);
expect(readerConnectionId).not.toBe(initialWriterId);

// Get a reader instance
let otherReaderId;
for (const host of env.proxyDatabaseInfo.instances) {
if (host.instanceId && host.instanceId !== readerConnectionId && host.instanceId !== initialWriterId) {
otherReaderId = host.instanceId;
break;
}
}

if (!otherReaderId) {
throw new Error("Could not find a reader instance");
}
// Kill all instances except one other reader
for (const host of env.proxyDatabaseInfo.instances) {
if (host.instanceId && host.instanceId !== otherReaderId) {
await ProxyHelper.disableConnectivity(env.engine, host.instanceId);
}
}

await expect(async () => {
await auroraTestUtility.queryInstanceId(client);
}).rejects.toThrow(FailoverSuccessError);

const currentReaderId0 = await auroraTestUtility.queryInstanceId(client);

expect(currentReaderId0).toStrictEqual(otherReaderId);
expect(currentReaderId0).not.toBe(readerConnectionId);

await ProxyHelper.enableAllConnectivity();
await client.setReadOnly(false);

const currentId = await auroraTestUtility.queryInstanceId(client);
expect(currentId).toStrictEqual(initialWriterId);

await client.setReadOnly(true);

const currentReaderId2 = await auroraTestUtility.queryInstanceId(client);
expect(currentReaderId2).toStrictEqual(otherReaderId);
},
1320000
);
});
Loading

0 comments on commit 78bcf60

Please sign in to comment.