Skip to content

Commit

Permalink
WorkAround for beam/issues/30266
Browse files Browse the repository at this point in the history
  • Loading branch information
VardhanThigle committed Jan 13, 2025
1 parent 0fa0f30 commit 33c1ba0
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ protected static CassandraIO.Read<SourceRow> setSslOptions(
? profile.getString(TypedDriverOption.SSL_TRUSTSTORE_PASSWORD.getRawOption())
: null;
if (profile.getBoolean(TypedDriverOption.SSL_HOSTNAME_VALIDATION.getRawOption())) {
return tableReader.withSsl(getSSLOptions(trustStorePath, trustStorePassword));
return tableReader.withSsl(
SSLOptionsProvider.buidler()
.setSslOptionsFactory(() -> getSSLOptions(trustStorePath, trustStorePassword))
.build());
}
}
return tableReader;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper;

import com.datastax.driver.core.SSLOptions;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import org.apache.beam.sdk.options.ValueProvider;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

@AutoValue
abstract class SSLOptionsProvider implements ValueProvider<SSLOptions>, Serializable {

abstract SSLOptionsFactory sslOptionsFactory();

@Override
public SSLOptions get() {
return sslOptionsFactory().create();
}

@Override
public @UnknownKeyFor @NonNull @Initialized boolean isAccessible() {
return true;
}

public static Builder buidler() {
return new AutoValue_SSLOptionsProvider.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setSslOptionsFactory(SSLOptionsFactory value);

public abstract SSLOptionsProvider build();
}

public interface SSLOptionsFactory extends Serializable {
SSLOptions create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,17 @@ public void testCassandraTableReaderFactoryBasic() throws RetriableSchemaDiscove
}

/**
* Validates that existing CassandraIO hits <a
* href=https://github.com/apache/beam/issues/30266>apache/beam/issues/30266</a>
*
* @throws RetriableSchemaDiscoveryException
* Helps to validate that we can work with SSL enabled cluster and don't get affected by <a
* href=https://github.com/apache/beam/issues/30266>apache/beam/issues/30266</a>.
*/
@Test
public void testCassandraTableReaderFactoryWithSslThrowsIllegalArgumentException()
throws RetriableSchemaDiscoveryException {

SourceSchemaReference cassandraSchemaReference =
SourceSchemaReference.ofCassandra(
CassandraSchemaReference.builder().setKeyspaceName(TEST_KEYSPACE).build());

DataSource dataSource =
DataSource.ofCassandra(
CassandraDataSource.builder()
Expand All @@ -155,11 +155,16 @@ public void testCassandraTableReaderFactoryWithSslThrowsIllegalArgumentException
.setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
.overrideOptionInOptionsMap(TypedDriverOption.SESSION_KEYSPACE, TEST_KEYSPACE)
.overrideOptionInOptionsMap(TypedDriverOption.SSL_HOSTNAME_VALIDATION, true)
.overrideOptionInOptionsMap(
TypedDriverOption.SSL_TRUSTSTORE_PATH,
sharedEmbeddedCassandra.getInstance().getTrustStorePath().toString())
.overrideOptionInOptionsMap(TypedDriverOption.SSL_TRUSTSTORE_PASSWORD, "cassandra")
.build());
CassandraSchemaDiscovery cassandraSchemaDiscovery = new CassandraSchemaDiscovery();
ImmutableMap<String, ImmutableMap<String, SourceColumnType>> discoverTableSchema =
cassandraSchemaDiscovery.discoverTableSchema(
dataSource, cassandraSchemaReference, ImmutableList.of(PRIMITIVE_TYPES_TABLE));

SourceSchemaReference sourceSchemaReference =
SourceSchemaReference.ofCassandra(
CassandraSchemaReference.builder()
Expand All @@ -173,17 +178,15 @@ public void testCassandraTableReaderFactoryWithSslThrowsIllegalArgumentException
(colName, colType) ->
sourceTableSchemaBuilder.addSourceColumnNameToSourceColumnType(colName, colType));
SourceTableSchema sourceTableSchema = sourceTableSchemaBuilder.build();

PTransform<PBegin, PCollection<SourceRow>> tableReader =
new CassandraTableReaderFactoryCassandraIoImpl()
.getTableReader(dataSource.cassandra(), sourceSchemaReference, sourceTableSchema);
IllegalArgumentException exception =
assertThrows(
IllegalArgumentException.class,
() -> {
testPipelineAsserts.apply(tableReader).apply(Count.globally());
testPipelineAsserts.run().waitUntilFinish();
});
assertThat(exception.getCause().getClass()).isEqualTo(java.io.NotSerializableException.class);
PCollection<SourceRow> output = testPipelineAsserts.apply(tableReader);

PAssert.that(output.apply(Count.globally()))
.containsInAnyOrder(PRIMITIVE_TYPES_TABLE_ROW_COUNT);
testPipelineAsserts.run().waitUntilFinish();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper;

import static com.google.common.truth.Truth.assertThat;

import com.datastax.driver.core.SSLOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

/** Test class for {@link SSLOptionsProvider}. */
@RunWith(MockitoJUnitRunner.class)
public class SSLOptionsProviderTest {
@Mock SSLOptions mockSslOptions;

@Test
public void testSSLOptionsProviderBasic() {
SSLOptionsProvider sslOptionsProvider =
SSLOptionsProvider.buidler().setSslOptionsFactory(() -> mockSslOptions).build();
assertThat(sslOptionsProvider.isAccessible()).isTrue();
assertThat(sslOptionsProvider.get()).isEqualTo(mockSslOptions);
}
}

0 comments on commit 33c1ba0

Please sign in to comment.