Skip to content

Commit

Permalink
Completing TODO for Cassandra IOWrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
VardhanThigle committed Jan 6, 2025
1 parent d7d58c6 commit 7c79275
Show file tree
Hide file tree
Showing 7 changed files with 485 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper;

import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;

public class CassandraDefaults {

/** Fluent Backoff for Cassandra Schema Discovery. */
public static final FluentBackoff DEFAULT_CASSANDRA_SCHEMA_DISCOVERY_BACKOFF =
FluentBackoff.DEFAULT.withMaxCumulativeBackoff(Duration.standardMinutes(5L));

private CassandraDefaults() {}
;
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static CassandraIOWrapperFactory fromPipelineOptions(SourceDbToSpannerOpt
/** Create an {@link IoWrapper} instance for a list of SourceTables. */
@Override
public IoWrapper getIOWrapper(List<String> sourceTables, OnSignal<?> waitOnSignal) {
/** TODO(vardhanvthigle@) */
return null;
/** TODO(vardhanvthigle@) incorporate waitOnSignal */
return new CassandraIoWrapper(gcsConfigPath(), sourceTables);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper;

import static com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper.CassandraDefaults.DEFAULT_CASSANDRA_SCHEMA_DISCOVERY_BACKOFF;

import com.google.cloud.teleport.v2.source.reader.io.cassandra.schema.CassandraSchemaDiscovery;
import com.google.cloud.teleport.v2.source.reader.io.datasource.DataSource;
import com.google.cloud.teleport.v2.source.reader.io.exception.SchemaDiscoveryException;
import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow;
import com.google.cloud.teleport.v2.source.reader.io.schema.SchemaDiscovery;
import com.google.cloud.teleport.v2.source.reader.io.schema.SchemaDiscoveryImpl;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchema;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableReference;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableSchema;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapper.MapperType;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.FileNotFoundException;
import java.util.List;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Static Utility Class to provide basic functionality to {@link CassandraIoWrapper}. */
class CassandraIOWrapperHelper {

private static final Logger LOG = LoggerFactory.getLogger(CassandraIOWrapperHelper.class);

static DataSource buildDataSource(String gcsPath) {
DataSource dataSource;
try {
dataSource =
DataSource.ofCassandra(
CassandraDataSource.builder().setOptionsMapFromGcsFile(gcsPath).build());
} catch (FileNotFoundException e) {
LOG.error("Unable to find driver config file in {}. Cause ", gcsPath, e);
throw (new SchemaDiscoveryException(e));
}
return dataSource;
}

static SchemaDiscovery buildSchemaDiscovery() {
return new SchemaDiscoveryImpl(
new CassandraSchemaDiscovery(), DEFAULT_CASSANDRA_SCHEMA_DISCOVERY_BACKOFF);
}

static ImmutableList<String> getTablesToRead(
List<String> sourceTables,
DataSource dataSource,
SchemaDiscovery schemaDiscovery,
SourceSchemaReference sourceSchemaReference) {
ImmutableList<String> tablesToRead;
if (sourceTables.isEmpty()) {
tablesToRead = schemaDiscovery.discoverTables(dataSource, sourceSchemaReference);
LOG.info("Auto Discovered SourceTables = {}, Tables = {}", sourceTables, tablesToRead);
} else {
tablesToRead = ImmutableList.copyOf(sourceTables);
LOG.info("Using passed SourceTables = {}", sourceTables);
}
return tablesToRead;
}

static SourceSchema getSourceSchema(
SchemaDiscovery schemaDiscovery,
DataSource dataSource,
SourceSchemaReference sourceSchemaReference,
ImmutableList<String> tables) {

SourceSchema.Builder sourceSchemaBuilder =
SourceSchema.builder().setSchemaReference(sourceSchemaReference);
ImmutableMap<String, ImmutableMap<String, SourceColumnType>> tableSchemas =
schemaDiscovery.discoverTableSchema(dataSource, sourceSchemaReference, tables);
LOG.info("Found table schemas: {}", tableSchemas);
tableSchemas.entrySet().stream()
.map(
tableEntry -> {
SourceTableSchema.Builder sourceTableSchemaBuilder =
SourceTableSchema.builder(MapperType.CASSANDRA).setTableName(tableEntry.getKey());
tableEntry
.getValue()
.entrySet()
.forEach(
colEntry ->
sourceTableSchemaBuilder.addSourceColumnNameToSourceColumnType(
colEntry.getKey(), colEntry.getValue()));
return sourceTableSchemaBuilder.build();
})
.forEach(sourceSchemaBuilder::addTableSchema);
return sourceSchemaBuilder.build();
}

static ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>>
getTableReaders(DataSource dataSource, SourceSchema sourceSchema) {
CassandraTableReaderFactory cassandraTableReaderFactory = new CassandraTableReaderFactory();
ImmutableMap.Builder<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>>
tableReadersBuilder = ImmutableMap.builder();
SourceSchemaReference sourceSchemaReference = sourceSchema.schemaReference();
sourceSchema
.tableSchemas()
.forEach(
tableSchema -> {
SourceTableReference sourceTableReference =
SourceTableReference.builder()
.setSourceSchemaReference(sourceSchemaReference)
.setSourceTableSchemaUUID(tableSchema.tableSchemaUUID())
.setSourceTableName(tableSchema.tableName())
.build();
var tableReader =
cassandraTableReaderFactory.getTableReader(
dataSource.cassandra(), sourceSchemaReference, tableSchema);
tableReadersBuilder.put(sourceTableReference, tableReader);
});
return tableReadersBuilder.build();
}

private CassandraIOWrapperHelper() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,54 @@
package com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper;

import com.google.cloud.teleport.v2.source.reader.io.IoWrapper;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.schema.CassandraSchemaReference;
import com.google.cloud.teleport.v2.source.reader.io.datasource.DataSource;
import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow;
import com.google.cloud.teleport.v2.source.reader.io.schema.SchemaDiscovery;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchema;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;

/** IOWrapper for Cassandra Source. */
public class CassandraIoWrapper implements IoWrapper {
public final class CassandraIoWrapper implements IoWrapper {
private SourceSchema sourceSchema;
private ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>>
tableReaders;

public CassandraIoWrapper(String gcsPath, List<String> sourceTables) {
DataSource dataSource = CassandraIOWrapperHelper.buildDataSource(gcsPath);
SchemaDiscovery schemaDiscovery = CassandraIOWrapperHelper.buildSchemaDiscovery();
SourceSchemaReference sourceSchemaReference =
SourceSchemaReference.ofCassandra(
CassandraSchemaReference.builder()
.setKeyspaceName(dataSource.cassandra().loggedKeySpace())
.build());

ImmutableList<String> tablesToRead =
CassandraIOWrapperHelper.getTablesToRead(
sourceTables, dataSource, schemaDiscovery, sourceSchemaReference);
this.sourceSchema =
CassandraIOWrapperHelper.getSourceSchema(
schemaDiscovery, dataSource, sourceSchemaReference, tablesToRead);
this.tableReaders = CassandraIOWrapperHelper.getTableReaders(dataSource, sourceSchema);
}

/** Get a list of reader transforms for Cassandra source. */
@Override
public ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>>
getTableReaders() {
// TODO(vardhanvthigle)
return null;
return tableReaders;
}

/** Discover source schema for Cassandra. */
@Override
public SourceSchema discoverTableSchema() {
// TODO(vardhanvthigle)
return null;
return sourceSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public PTransform<PBegin, PCollection<SourceRow>> getTableReader(
@VisibleForTesting
protected CassandraIO.Read<SourceRow> setCredentials(
CassandraIO.Read<SourceRow> tableReader, DriverExecutionProfile profile) {
if (profile.getString(TypedDriverOption.AUTH_PROVIDER_USER_NAME.getRawOption()) != null) {
if (profile.isDefined(TypedDriverOption.AUTH_PROVIDER_USER_NAME.getRawOption())) {
tableReader =
tableReader.withUsername(
profile.getString(TypedDriverOption.AUTH_PROVIDER_USER_NAME.getRawOption()));
}
if (profile.getString(TypedDriverOption.AUTH_PROVIDER_PASSWORD.getRawOption()) != null) {
if (profile.isDefined(TypedDriverOption.AUTH_PROVIDER_PASSWORD.getRawOption())) {
tableReader =
tableReader.withPassword(
profile.getString(TypedDriverOption.AUTH_PROVIDER_PASSWORD.getRawOption()));
Expand Down
Loading

0 comments on commit 7c79275

Please sign in to comment.