Skip to content

Commit

Permalink
Renaming all forked classes to Local* as per pattern followed in Span…
Browse files Browse the repository at this point in the history
…nerIO
  • Loading branch information
VardhanThigle committed Dec 31, 2024
1 parent c782055 commit 5e0babe
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraFieldMapper;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueExtractor;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueMapper;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraSourceRowMapper;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapping;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.IntervalNano;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.UnifiedMappingProvider;
Expand Down Expand Up @@ -125,10 +126,7 @@ public static ImmutableMap<String, UnifiedTypeMapping> getMapping() {
return CASSANDRA_MAPPINGS.typeMapping();
}

/**
* Field Mappers for {@link
* com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraSourceRowMapper}.
*/
/** Field Mappers for {@link CassandraSourceRowMapper}. */
public static ImmutableMap<String, CassandraFieldMapper<?>> getFieldMapping() {
return CASSANDRA_MAPPINGS.fieldMapping();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
import java.io.Serializable;
import java.util.Iterator;
import java.util.concurrent.Future;
import org.apache.beam.sdk.io.localcassandra.Mapper;
import org.apache.beam.sdk.io.localcassandra.LocalMapper;
import org.apache.commons.collections4.iterators.TransformIterator;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

@AutoValue
public abstract class CassandraSourceRowMapper implements Mapper<SourceRow>, Serializable {
public abstract class CassandraSourceRowMapper implements LocalMapper<SourceRow>, Serializable {
abstract SourceSchemaReference sourceSchemaReference();

abstract SourceTableSchema sourceTableSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
* <p>Alternatively, one may use {@code LocalCassandraIO.<Person>readAll()
* .withCoder(SerializableCoder.of(Person.class))} to query a subset of the Cassandra database by
* creating a PCollection of {@code LocalCassandraIO.Read<Person>} each with their own query or
* RingRange.
* LocalRingRange.
*
* <h3>Writing to Apache Cassandra</h3>
*
Expand Down Expand Up @@ -188,10 +188,10 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>

abstract @Nullable ValueProvider<Integer> readTimeout();

abstract @Nullable SerializableFunction<Session, Mapper> mapperFactoryFn();
abstract @Nullable SerializableFunction<Session, LocalMapper> mapperFactoryFn();

@Nullable
abstract ValueProvider<Set<RingRange>> ringRanges();
abstract ValueProvider<Set<LocalRingRange>> ringRanges();

@Nullable
abstract ValueProvider<SSLOptions> sslOptions();
Expand Down Expand Up @@ -371,21 +371,21 @@ public Read<T> withReadTimeout(ValueProvider<Integer> timeout) {
}

/**
* A factory to create a specific {@link Mapper} for a given Cassandra Session. This is useful
* A factory to create a specific {@link LocalMapper} for a given Cassandra Session. This is useful
* to provide mappers that don't rely in Cassandra annotated objects.
*/
public Read<T> withMapperFactoryFn(SerializableFunction<Session, Mapper> mapperFactory) {
public Read<T> withMapperFactoryFn(SerializableFunction<Session, LocalMapper> mapperFactory) {
checkArgument(
mapperFactory != null,
"LocalCassandraIO.withMapperFactory" + "(withMapperFactory) called with null value");
return builder().setMapperFactoryFn(mapperFactory).build();
}

public Read<T> withRingRanges(Set<RingRange> ringRange) {
return withRingRanges(ValueProvider.StaticValueProvider.of(ringRange));
public Read<T> withRingRanges(Set<LocalRingRange> localRingRange) {
return withRingRanges(ValueProvider.StaticValueProvider.of(localRingRange));
}

public Read<T> withRingRanges(ValueProvider<Set<RingRange>> ringRange) {
public Read<T> withRingRanges(ValueProvider<Set<LocalRingRange>> ringRange) {
return builder().setRingRanges(ringRange).build();
}

Expand Down Expand Up @@ -426,13 +426,13 @@ private static class SplitFn<T> extends DoFn<Read<T>, Read<T>> {
@ProcessElement
public void process(
@Element Read<T> read, OutputReceiver<Read<T>> outputReceiver) {
Set<RingRange> ringRanges = getRingRanges(read);
for (RingRange rr : ringRanges) {
Set<LocalRingRange> localRingRanges = getRingRanges(read);
for (LocalRingRange rr : localRingRanges) {
outputReceiver.output(read.withRingRanges(ImmutableSet.of(rr)));
}
}

private static <T> Set<RingRange> getRingRanges(Read<T> read) {
private static <T> Set<LocalRingRange> getRingRanges(Read<T> read) {
try (Cluster cluster =
getCluster(
read.hosts(),
Expand All @@ -456,10 +456,10 @@ private static <T> Set<RingRange> getRingRanges(Read<T> read) {
cluster.getMetadata().getTokenRanges().stream()
.map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString()))
.collect(Collectors.toList());
SplitGenerator splitGenerator =
new SplitGenerator(cluster.getMetadata().getPartitioner());
LocalSplitGenerator localSplitGenerator =
new LocalSplitGenerator(cluster.getMetadata().getPartitioner());

return splitGenerator.generateSplits(splitCount, tokens).stream()
return localSplitGenerator.generateSplits(splitCount, tokens).stream()
.flatMap(List::stream)
.collect(Collectors.toSet());

Expand All @@ -468,11 +468,11 @@ private static <T> Set<RingRange> getRingRanges(Read<T> read) {
"Only Murmur3Partitioner is supported for splitting, using an unique source for "
+ "the read");
String partitioner = cluster.getMetadata().getPartitioner();
RingRange totalRingRange =
RingRange.of(
SplitGenerator.getRangeMin(partitioner),
SplitGenerator.getRangeMax(partitioner));
return Collections.singleton(totalRingRange);
LocalRingRange totalLocalRingRange =
LocalRingRange.of(
LocalSplitGenerator.getRangeMin(partitioner),
LocalSplitGenerator.getRangeMax(partitioner));
return Collections.singleton(totalLocalRingRange);

Check warning on line 475 in v2/sourcedb-to-spanner/src/main/java/org/apache/beam/sdk/io/localcassandra/LocalCassandraIO.java

View check run for this annotation

Codecov / codecov/patch

v2/sourcedb-to-spanner/src/main/java/org/apache/beam/sdk/io/localcassandra/LocalCassandraIO.java#L470-L475

Added lines #L470 - L475 were not covered by tests
}
}
}
Expand Down Expand Up @@ -510,19 +510,19 @@ abstract static class Builder<T> {

abstract Builder<T> setReadTimeout(ValueProvider<Integer> timeout);

abstract Builder<T> setMapperFactoryFn(SerializableFunction<Session, Mapper> mapperFactoryFn);
abstract Builder<T> setMapperFactoryFn(SerializableFunction<Session, LocalMapper> mapperFactoryFn);

abstract Optional<SerializableFunction<Session, Mapper>> mapperFactoryFn();
abstract Optional<SerializableFunction<Session, LocalMapper>> mapperFactoryFn();

abstract Builder<T> setRingRanges(ValueProvider<Set<RingRange>> ringRange);
abstract Builder<T> setRingRanges(ValueProvider<Set<LocalRingRange>> ringRange);

abstract Builder<T> setSslOptions(ValueProvider<SSLOptions> sslOptions);

abstract Read<T> autoBuild();

public Read<T> build() {
if (!mapperFactoryFn().isPresent() && entity().isPresent()) {
setMapperFactoryFn(new DefaultObjectMapperFactory(entity().get()));
setMapperFactoryFn(new LocalDefaultObjectMapperFactory(entity().get()));
}
return autoBuild();
}
Expand Down Expand Up @@ -568,7 +568,7 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone>

abstract @Nullable ValueProvider<SSLOptions> sslOptions();

abstract @Nullable SerializableFunction<Session, Mapper> mapperFactoryFn();
abstract @Nullable SerializableFunction<Session, LocalMapper> mapperFactoryFn();

abstract Builder<T> builder();

Expand Down Expand Up @@ -740,7 +740,7 @@ public Write<T> withReadTimeout(ValueProvider<Integer> timeout) {
return builder().setReadTimeout(timeout).build();

Check warning on line 740 in v2/sourcedb-to-spanner/src/main/java/org/apache/beam/sdk/io/localcassandra/LocalCassandraIO.java

View check run for this annotation

Codecov / codecov/patch

v2/sourcedb-to-spanner/src/main/java/org/apache/beam/sdk/io/localcassandra/LocalCassandraIO.java#L740

Added line #L740 was not covered by tests
}

public Write<T> withMapperFactoryFn(SerializableFunction<Session, Mapper> mapperFactoryFn) {
public Write<T> withMapperFactoryFn(SerializableFunction<Session, LocalMapper> mapperFactoryFn) {
checkArgument(
mapperFactoryFn != null,
"LocalCassandraIO."
Expand Down Expand Up @@ -836,9 +836,9 @@ abstract static class Builder<T> {

abstract Builder<T> setReadTimeout(ValueProvider<Integer> timeout);

abstract Builder<T> setMapperFactoryFn(SerializableFunction<Session, Mapper> mapperFactoryFn);
abstract Builder<T> setMapperFactoryFn(SerializableFunction<Session, LocalMapper> mapperFactoryFn);

abstract Optional<SerializableFunction<Session, Mapper>> mapperFactoryFn();
abstract Optional<SerializableFunction<Session, LocalMapper>> mapperFactoryFn();

abstract Builder<T> setSslOptions(ValueProvider<SSLOptions> sslOptions);

Expand All @@ -847,7 +847,7 @@ abstract static class Builder<T> {
public Write<T> build() {

if (!mapperFactoryFn().isPresent() && entity().isPresent()) {
setMapperFactoryFn(new DefaultObjectMapperFactory(entity().get()));
setMapperFactoryFn(new LocalDefaultObjectMapperFactory(entity().get()));
}
return autoBuild();
}
Expand All @@ -864,7 +864,7 @@ private static class WriteFn<T> extends DoFn<T, Void> {

@Setup
public void setup() {
writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
writer = new Mutator<>(spec, LocalMapper::saveAsync, "writes");
}

@ProcessElement
Expand Down Expand Up @@ -894,7 +894,7 @@ private static class DeleteFn<T> extends DoFn<T, Void> {

@Setup
public void setup() {
deleter = new Mutator<>(spec, Mapper::deleteAsync, "deletes");
deleter = new Mutator<>(spec, LocalMapper::deleteAsync, "deletes");
}

@ProcessElement
Expand Down Expand Up @@ -974,12 +974,12 @@ private static class Mutator<T> {

private final Cluster cluster;
private final Session session;
private final SerializableFunction<Session, Mapper> mapperFactoryFn;
private final SerializableFunction<Session, LocalMapper> mapperFactoryFn;
private List<Future<Void>> mutateFutures;
private final BiFunction<Mapper<T>, T, Future<Void>> mutator;
private final BiFunction<LocalMapper<T>, T, Future<Void>> mutator;
private final String operationName;

Mutator(Write<T> spec, BiFunction<Mapper<T>, T, Future<Void>> mutator, String operationName) {
Mutator(Write<T> spec, BiFunction<LocalMapper<T>, T, Future<Void>> mutator, String operationName) {
this.cluster =
getCluster(
spec.hosts(),
Expand All @@ -999,15 +999,15 @@ private static class Mutator<T> {
}

/**
* Mutate the entity to the Cassandra instance, using {@link Mapper} obtained with the Mapper
* Mutate the entity to the Cassandra instance, using {@link LocalMapper} obtained with the LocalMapper
* factory, the DefaultObjectMapperFactory uses {@link
* com.datastax.driver.mapping.MappingManager}. This method uses {@link
* Mapper#saveAsync(Object)} method, which is asynchronous. Beam will wait for all futures to
* LocalMapper#saveAsync(Object)} method, which is asynchronous. Beam will wait for all futures to
* complete, to guarantee all writes have succeeded.
*/
void mutate(T entity) throws ExecutionException, InterruptedException {
Mapper<T> mapper = mapperFactoryFn.apply(session);
this.mutateFutures.add(mutator.apply(mapper, entity));
LocalMapper<T> localMapper = mapperFactoryFn.apply(session);
this.mutateFutures.add(mutator.apply(localMapper, entity));
if (this.mutateFutures.size() == CONCURRENT_ASYNC_QUERIES) {
// We reached the max number of allowed in flight queries.
// Write methods are synchronous in Beam,
Expand Down Expand Up @@ -1078,7 +1078,7 @@ public PCollection<T> expand(PCollection<Read<T>> input) {
checkArgument(coder() != null, "withCoder() is required");
return input
.apply("Reshuffle", Reshuffle.viaRandomKey())
.apply("Read", ParDo.of(new ReadFn<>()))
.apply("Read", ParDo.of(new LocalReadFn<>()))
.setCoder(this.coder());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class ConnectionManager {
public class LocalConnectionManager {

Check warning on line 30 in v2/sourcedb-to-spanner/src/main/java/org/apache/beam/sdk/io/localcassandra/LocalConnectionManager.java

View check run for this annotation

Codecov / codecov/patch

v2/sourcedb-to-spanner/src/main/java/org/apache/beam/sdk/io/localcassandra/LocalConnectionManager.java#L30

Added line #L30 was not covered by tests

private static final ConcurrentHashMap<String, Cluster> clusterMap =
new ConcurrentHashMap<String, Cluster>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@
/**
* Default Object mapper implementation that uses the <a
* href="https://docs.datastax.com/en/developer/java-driver/3.1/manual/object_mapper">Cassandra
* Object Mapper</a> for mapping POJOs to CRUD events in Cassandra.
* Object LocalMapper</a> for mapping POJOs to CRUD events in Cassandra.
*
* @see DefaultObjectMapperFactory
* @see LocalDefaultObjectMapperFactory
*/
@SuppressWarnings({
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
})
class DefaultObjectMapper<T> implements Mapper<T>, Serializable {
class LocalDefaultObjectLocalMapper<T> implements LocalMapper<T>, Serializable {

private final transient com.datastax.driver.mapping.Mapper<T> mapper;

DefaultObjectMapper(com.datastax.driver.mapping.Mapper mapper) {
LocalDefaultObjectLocalMapper(com.datastax.driver.mapping.Mapper mapper) {
this.mapper = mapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,30 @@
import org.apache.beam.sdk.transforms.SerializableFunction;

/**
* Factory implementation that LocalCassandraIO uses to initialize the Default Object Mapper for mapping
* Factory implementation that LocalCassandraIO uses to initialize the Default Object LocalMapper for mapping
* POJOs to CRUD events in Cassandra.
*
* @see DefaultObjectMapper
* @see LocalDefaultObjectLocalMapper
*/
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class DefaultObjectMapperFactory<T> implements SerializableFunction<Session, Mapper> {
class LocalDefaultObjectMapperFactory<T> implements SerializableFunction<Session, LocalMapper> {

private transient MappingManager mappingManager;
final Class<T> entity;

DefaultObjectMapperFactory(Class<T> entity) {
LocalDefaultObjectMapperFactory(Class<T> entity) {
this.entity = entity;
}

@Override
public Mapper apply(Session session) {
public LocalMapper apply(Session session) {
if (mappingManager == null) {
this.mappingManager = new MappingManager(session);
}

return new DefaultObjectMapper<T>(mappingManager.mapper(entity));
return new LocalDefaultObjectLocalMapper<T>(mappingManager.mapper(entity));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@
* This interface allows you to implement a custom mapper to read and persist elements from/to
* Cassandra.
*
* <p>To Implement a custom mapper you need to: 1) Create an implementation of {@link Mapper}. 2)
* Create a {@link SerializableFunction} that instantiates the {@link Mapper} for a given Session,
* for an example see {@link DefaultObjectMapperFactory}). 3) Pass this function to {@link
* <p>To Implement a custom mapper you need to: 1) Create an implementation of {@link LocalMapper}. 2)
* Create a {@link SerializableFunction} that instantiates the {@link LocalMapper} for a given Session,
* for an example see {@link LocalDefaultObjectMapperFactory}). 3) Pass this function to {@link
* LocalCassandraIO.Read#withMapperFactoryFn(SerializableFunction)} in the LocalCassandraIO builder. <br>
* Example:
*
* <pre>{@code
* SerializableFunction<Session, Mapper> factory = new MyCustomFactory();
* SerializableFunction<Session, LocalMapper> factory = new MyCustomFactory();
* pipeline
* .apply(...)
* .apply(LocalCassandraIO.<>read()
* .withMapperFactoryFn(factory));
* }</pre>
*/
public interface Mapper<T> {
public interface LocalMapper<T> {

/**
* This method is called when reading data from Cassandra. It should map a ResultSet into the
Expand Down
Loading

0 comments on commit 5e0babe

Please sign in to comment.