From 33c1ba0c9d21fb07e31f13dceb04c4875285ab3c Mon Sep 17 00:00:00 2001 From: Vardhan Thigle Date: Mon, 13 Jan 2025 04:25:19 +0000 Subject: [PATCH] WorkAround for beam/issues/30266 --- ...ndraTableReaderFactoryCassandraIoImpl.java | 5 +- .../iowrapper/SSLOptionsProvider.java | 56 +++++++++++++++++++ ...TableReaderFactoryCassandraIoImplTest.java | 27 +++++---- .../iowrapper/SSLOptionsProviderTest.java | 38 +++++++++++++ 4 files changed, 113 insertions(+), 13 deletions(-) create mode 100644 v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/SSLOptionsProvider.java create mode 100644 v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/SSLOptionsProviderTest.java diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraTableReaderFactoryCassandraIoImpl.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraTableReaderFactoryCassandraIoImpl.java index c18815f8b4..a74e10da32 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraTableReaderFactoryCassandraIoImpl.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraTableReaderFactoryCassandraIoImpl.java @@ -118,7 +118,10 @@ protected static CassandraIO.Read setSslOptions( ? profile.getString(TypedDriverOption.SSL_TRUSTSTORE_PASSWORD.getRawOption()) : null; if (profile.getBoolean(TypedDriverOption.SSL_HOSTNAME_VALIDATION.getRawOption())) { - return tableReader.withSsl(getSSLOptions(trustStorePath, trustStorePassword)); + return tableReader.withSsl( + SSLOptionsProvider.buidler() + .setSslOptionsFactory(() -> getSSLOptions(trustStorePath, trustStorePassword)) + .build()); } } return tableReader; diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/SSLOptionsProvider.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/SSLOptionsProvider.java new file mode 100644 index 0000000000..3e4007dd8c --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/SSLOptionsProvider.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper; + +import com.datastax.driver.core.SSLOptions; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.beam.sdk.options.ValueProvider; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoValue +abstract class SSLOptionsProvider implements ValueProvider, Serializable { + + abstract SSLOptionsFactory sslOptionsFactory(); + + @Override + public SSLOptions get() { + return sslOptionsFactory().create(); + } + + @Override + public @UnknownKeyFor @NonNull @Initialized boolean isAccessible() { + return true; + } + + public static Builder buidler() { + return new AutoValue_SSLOptionsProvider.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setSslOptionsFactory(SSLOptionsFactory value); + + public abstract SSLOptionsProvider build(); + } + + public interface SSLOptionsFactory extends Serializable { + SSLOptions create(); + } +} diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraTableReaderFactoryCassandraIoImplTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraTableReaderFactoryCassandraIoImplTest.java index 10ef3d4318..bd5cdc98da 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraTableReaderFactoryCassandraIoImplTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraTableReaderFactoryCassandraIoImplTest.java @@ -135,17 +135,17 @@ public void testCassandraTableReaderFactoryBasic() throws RetriableSchemaDiscove } /** - * Validates that existing CassandraIO hits apache/beam/issues/30266 - * - * @throws RetriableSchemaDiscoveryException + * Helps to validate that we can work with SSL enabled cluster and don't get affected by apache/beam/issues/30266. */ @Test public void testCassandraTableReaderFactoryWithSslThrowsIllegalArgumentException() throws RetriableSchemaDiscoveryException { + SourceSchemaReference cassandraSchemaReference = SourceSchemaReference.ofCassandra( CassandraSchemaReference.builder().setKeyspaceName(TEST_KEYSPACE).build()); + DataSource dataSource = DataSource.ofCassandra( CassandraDataSource.builder() @@ -155,11 +155,16 @@ public void testCassandraTableReaderFactoryWithSslThrowsIllegalArgumentException .setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter()) .overrideOptionInOptionsMap(TypedDriverOption.SESSION_KEYSPACE, TEST_KEYSPACE) .overrideOptionInOptionsMap(TypedDriverOption.SSL_HOSTNAME_VALIDATION, true) + .overrideOptionInOptionsMap( + TypedDriverOption.SSL_TRUSTSTORE_PATH, + sharedEmbeddedCassandra.getInstance().getTrustStorePath().toString()) + .overrideOptionInOptionsMap(TypedDriverOption.SSL_TRUSTSTORE_PASSWORD, "cassandra") .build()); CassandraSchemaDiscovery cassandraSchemaDiscovery = new CassandraSchemaDiscovery(); ImmutableMap> discoverTableSchema = cassandraSchemaDiscovery.discoverTableSchema( dataSource, cassandraSchemaReference, ImmutableList.of(PRIMITIVE_TYPES_TABLE)); + SourceSchemaReference sourceSchemaReference = SourceSchemaReference.ofCassandra( CassandraSchemaReference.builder() @@ -173,17 +178,15 @@ public void testCassandraTableReaderFactoryWithSslThrowsIllegalArgumentException (colName, colType) -> sourceTableSchemaBuilder.addSourceColumnNameToSourceColumnType(colName, colType)); SourceTableSchema sourceTableSchema = sourceTableSchemaBuilder.build(); + PTransform> tableReader = new CassandraTableReaderFactoryCassandraIoImpl() .getTableReader(dataSource.cassandra(), sourceSchemaReference, sourceTableSchema); - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> { - testPipelineAsserts.apply(tableReader).apply(Count.globally()); - testPipelineAsserts.run().waitUntilFinish(); - }); - assertThat(exception.getCause().getClass()).isEqualTo(java.io.NotSerializableException.class); + PCollection output = testPipelineAsserts.apply(tableReader); + + PAssert.that(output.apply(Count.globally())) + .containsInAnyOrder(PRIMITIVE_TYPES_TABLE_ROW_COUNT); + testPipelineAsserts.run().waitUntilFinish(); } @Test diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/SSLOptionsProviderTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/SSLOptionsProviderTest.java new file mode 100644 index 0000000000..68c39a009f --- /dev/null +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/SSLOptionsProviderTest.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper; + +import static com.google.common.truth.Truth.assertThat; + +import com.datastax.driver.core.SSLOptions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +/** Test class for {@link SSLOptionsProvider}. */ +@RunWith(MockitoJUnitRunner.class) +public class SSLOptionsProviderTest { + @Mock SSLOptions mockSslOptions; + + @Test + public void testSSLOptionsProviderBasic() { + SSLOptionsProvider sslOptionsProvider = + SSLOptionsProvider.buidler().setSslOptionsFactory(() -> mockSslOptions).build(); + assertThat(sslOptionsProvider.isAccessible()).isTrue(); + assertThat(sslOptionsProvider.get()).isEqualTo(mockSslOptions); + } +}