Skip to content

Commit

Permalink
[BEAM-12164] Support querying against Postgres for the SpannerIO chan…
Browse files Browse the repository at this point in the history
…ge streams connector (apache#24390)

* Initial commit

* Added all

* SecondCommit

* thirdCommit

* commitFive

* Addresswd Thiago's comments

* Disable integration test

* Addressed more comments

* formatted files

* Removed testing changes

* Update ChangeStreamRecordMapper.java

* Update ChangeStreamDao.java

* Update SpannerChangeStreamPostgresIT.java

* Update ChangeStreamDao.java

* fix failing tests

* fixed errors

Co-authored-by: Nancy Xu <nancyxu@google.com>
  • Loading branch information
nancyxu123 and nancyxu825 authored Dec 1, 2022
1 parent 61f9667 commit 57e5b69
Show file tree
Hide file tree
Showing 19 changed files with 1,847 additions and 359 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.cloud.ServiceFactory;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbortedException;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ErrorCode;
Expand Down Expand Up @@ -1598,10 +1599,11 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
getMetadataInstance(), changeStreamDatabaseId.getInstanceId().getInstance());
final String partitionMetadataDatabaseId =
MoreObjects.firstNonNull(getMetadataDatabase(), changeStreamDatabaseId.getDatabase());
final String partitionMetadataTableName =
MoreObjects.firstNonNull(
getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId));

final DatabaseId fullPartitionMetadataDatabaseId =
DatabaseId.of(
getSpannerConfig().getProjectId().get(),
partitionMetadataInstanceId,
partitionMetadataDatabaseId);
SpannerConfig changeStreamSpannerConfig = getSpannerConfig();
// Set default retryable errors for ReadChangeStream
if (changeStreamSpannerConfig.getRetryableCodes() == null) {
Expand All @@ -1628,6 +1630,21 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
.setInstanceId(StaticValueProvider.of(partitionMetadataInstanceId))
.setDatabaseId(StaticValueProvider.of(partitionMetadataDatabaseId))
.build();
Dialect changeStreamDatabaseDialect = getDialect(changeStreamSpannerConfig);
Dialect metadataDatabaseDialect = getDialect(partitionMetadataSpannerConfig);
LOG.info(
"The Spanner database "
+ changeStreamDatabaseId
+ " has dialect "
+ changeStreamDatabaseDialect);
LOG.info(
"The Spanner database "
+ fullPartitionMetadataDatabaseId
+ " has dialect "
+ metadataDatabaseDialect);
final String partitionMetadataTableName =
MoreObjects.firstNonNull(
getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId));
final String changeStreamName = getChangeStreamName();
final Timestamp startTimestamp = getInclusiveStartAt();
// Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the
Expand All @@ -1636,7 +1653,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
? MAX_INCLUSIVE_END_AT
: getInclusiveEndAt();
final MapperFactory mapperFactory = new MapperFactory();
final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect);
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
final DaoFactory daoFactory =
Expand All @@ -1646,7 +1663,9 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
partitionMetadataSpannerConfig,
partitionMetadataTableName,
rpcPriority,
input.getPipeline().getOptions().getJobName());
input.getPipeline().getOptions().getJobName(),
changeStreamDatabaseDialect,
metadataDatabaseDialect);
final ActionFactory actionFactory = new ActionFactory();

final InitializeDoFn initializeDoFn =
Expand Down Expand Up @@ -1696,6 +1715,11 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
}
}

private static Dialect getDialect(SpannerConfig spannerConfig) {
DatabaseClient databaseClient = SpannerAccessor.getOrCreate(spannerConfig).getDatabaseClient();
return databaseClient.getDialect();
}

/**
* Interface to display the name of the metadata table on Dataflow UI. This is only used for
* internal purpose. This should not be used to pass the name of the metadata table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,28 @@
*/
public class NameGenerator {

private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
"CDC_Partitions_Metadata_%s_%s";
private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = "Metadata_%s_%s";
private static final int MAX_TABLE_NAME_LENGTH = 63;

/**
* Generates an unique name for the partition metadata table in the form of {@code
* "CDC_Partitions_Metadata_<databaseId>_<uuid>"}.
* "Metadata_<databaseId>_<uuid>"}.
*
* @param databaseId The database id where the table will be created
* @return the unique generated name of the partition metadata table
*/
public static String generatePartitionMetadataTableName(String databaseId) {
// Maximum Spanner table name length is 128 characters.
// There are 25 characters in the name format.
// There are 11 characters in the name format.
// Maximum Spanner database ID length is 30 characters.
// UUID always generates a String with 36 characters.
// 128 - (25 + 30 + 36) = 37 characters short of the limit
return String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, UUID.randomUUID())
.replaceAll("-", "_");
// Since the Postgres table name length is 63, we may need to truncate the table name depending
// on the database length.
String fullString =
String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, UUID.randomUUID())
.replaceAll("-", "_");
if (fullString.length() < MAX_TABLE_NAME_LENGTH) {
return fullString;
}
return fullString.substring(0, MAX_TABLE_NAME_LENGTH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public ProcessContinuation run(
while (resultSet.next()) {
final List<ChangeStreamRecord> records =
changeStreamRecordMapper.toChangeStreamRecords(
updatedPartition, resultSet.getCurrentRowAsStruct(), resultSet.getMetadata());
updatedPartition, resultSet, resultSet.getMetadata());

Optional<ProcessContinuation> maybeContinuation;
for (final ChangeStreamRecord record : records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.ResultSet;
Expand All @@ -35,6 +36,7 @@ public class ChangeStreamDao {
private final DatabaseClient databaseClient;
private final RpcPriority rpcPriority;
private final String jobName;
private final Dialect dialect;

/**
* Constructs a change stream dao. All the queries performed by this class will be for the given
Expand All @@ -50,11 +52,13 @@ public class ChangeStreamDao {
String changeStreamName,
DatabaseClient databaseClient,
RpcPriority rpcPriority,
String jobName) {
String jobName,
Dialect dialect) {
this.changeStreamName = changeStreamName;
this.databaseClient = databaseClient;
this.rpcPriority = rpcPriority;
this.jobName = jobName;
this.dialect = dialect;
}

/**
Expand Down Expand Up @@ -84,33 +88,54 @@ public ChangeStreamResultSet changeStreamQuery(
final String partitionTokenOrNull =
InitialPartition.isInitialPartition(partitionToken) ? null : partitionToken;

final String query =
"SELECT * FROM READ_"
+ changeStreamName
+ "("
+ " start_timestamp => @startTimestamp,"
+ " end_timestamp => @endTimestamp,"
+ " partition_token => @partitionToken,"
+ " read_options => null,"
+ " heartbeat_milliseconds => @heartbeatMillis"
+ ")";
String query = "";
Statement statement;
if (this.isPostgres()) {
query =
"SELECT * FROM \"spanner\".\"read_json_" + changeStreamName + "\"($1, $2, $3, $4, null)";
statement =
Statement.newBuilder(query)
.bind("p1")
.to(startTimestamp)
.bind("p2")
.to(endTimestamp)
.bind("p3")
.to(partitionTokenOrNull)
.bind("p4")
.to(heartbeatMillis)
.build();
} else {
query =
"SELECT * FROM READ_"
+ changeStreamName
+ "("
+ " start_timestamp => @startTimestamp,"
+ " end_timestamp => @endTimestamp,"
+ " partition_token => @partitionToken,"
+ " read_options => null,"
+ " heartbeat_milliseconds => @heartbeatMillis"
+ ")";
statement =
Statement.newBuilder(query)
.bind("startTimestamp")
.to(startTimestamp)
.bind("endTimestamp")
.to(endTimestamp)
.bind("partitionToken")
.to(partitionTokenOrNull)
.bind("heartbeatMillis")
.to(heartbeatMillis)
.build();
}
final ResultSet resultSet =
databaseClient
.singleUse()
.executeQuery(
Statement.newBuilder(query)
.bind("startTimestamp")
.to(startTimestamp)
.bind("endTimestamp")
.to(endTimestamp)
.bind("partitionToken")
.to(partitionTokenOrNull)
.bind("heartbeatMillis")
.to(heartbeatMillis)
.build(),
Options.priority(rpcPriority),
Options.tag("job=" + jobName));
.executeQuery(statement, Options.priority(rpcPriority), Options.tag("job=" + jobName));

return new ChangeStreamResultSet(resultSet);
}

private boolean isPostgres() {
return this.dialect == Dialect.POSTGRESQL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,31 @@ public boolean next() {
* <p>If {@link ChangeStreamResultSet#next()} was not called or if it was called but there are no
* more records in the stream, null will be returned.
*
* <p>Should only be used for GoogleSQL databases.
*
* @return a change stream record as a {@link Struct} or null
*/
public Struct getCurrentRowAsStruct() {
recordReadAt = Timestamp.now();
return resultSet.getCurrentRowAsStruct();
}

/**
* Returns the record at the current pointer as {@link JsonB}. It also updates the timestamp at
* which the record was read.
*
* <p>If {@link ChangeStreamResultSet#next()} was not called or if it was called but there are no
* more records in the stream, null will be returned.
*
* <p>Should only be used for PostgreSQL databases.
*
* @return a change stream record as a {@link Struct} or null
*/
public String getPgJsonb(int index) {
recordReadAt = Timestamp.now();
return resultSet.getPgJsonb(index);
}

/**
* Returns the gathered metadata for the change stream query so far.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;

import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.Options.RpcPriority;
import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
Expand Down Expand Up @@ -46,6 +47,8 @@ public class DaoFactory implements Serializable {
private final String partitionMetadataTableName;
private final RpcPriority rpcPriority;
private final String jobName;
private final Dialect spannerChangeStreamDatabaseDialect;
private final Dialect metadataDatabaseDialect;

/**
* Constructs a {@link DaoFactory} with the configuration to be used for the underlying instances.
Expand All @@ -63,7 +66,9 @@ public DaoFactory(
SpannerConfig metadataSpannerConfig,
String partitionMetadataTableName,
RpcPriority rpcPriority,
String jobName) {
String jobName,
Dialect spannerChangeStreamDatabaseDialect,
Dialect metadataDatabaseDialect) {
if (metadataSpannerConfig.getInstanceId() == null) {
throw new IllegalArgumentException("Metadata instance can not be null");
}
Expand All @@ -76,6 +81,8 @@ public DaoFactory(
this.partitionMetadataTableName = partitionMetadataTableName;
this.rpcPriority = rpcPriority;
this.jobName = jobName;
this.spannerChangeStreamDatabaseDialect = spannerChangeStreamDatabaseDialect;
this.metadataDatabaseDialect = metadataDatabaseDialect;
}

/**
Expand All @@ -95,7 +102,8 @@ public synchronized PartitionMetadataAdminDao getPartitionMetadataAdminDao() {
databaseAdminClient,
metadataSpannerConfig.getInstanceId().get(),
metadataSpannerConfig.getDatabaseId().get(),
partitionMetadataTableName);
partitionMetadataTableName,
this.metadataDatabaseDialect);
}
return partitionMetadataAdminDao;
}
Expand All @@ -112,7 +120,9 @@ public synchronized PartitionMetadataDao getPartitionMetadataDao() {
if (partitionMetadataDaoInstance == null) {
partitionMetadataDaoInstance =
new PartitionMetadataDao(
this.partitionMetadataTableName, spannerAccessor.getDatabaseClient());
this.partitionMetadataTableName,
spannerAccessor.getDatabaseClient(),
this.metadataDatabaseDialect);
}
return partitionMetadataDaoInstance;
}
Expand All @@ -129,7 +139,11 @@ public synchronized ChangeStreamDao getChangeStreamDao() {
if (changeStreamDaoInstance == null) {
changeStreamDaoInstance =
new ChangeStreamDao(
this.changeStreamName, spannerAccessor.getDatabaseClient(), rpcPriority, jobName);
this.changeStreamName,
spannerAccessor.getDatabaseClient(),
rpcPriority,
jobName,
this.spannerChangeStreamDatabaseDialect);
}
return changeStreamDaoInstance;
}
Expand Down
Loading

0 comments on commit 57e5b69

Please sign in to comment.