From e18596c7fc498e7f05ac3c32bb67cfc264fc76f2 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Fri, 30 Aug 2024 20:40:36 -0400 Subject: [PATCH] Temporarily remove BQMS catalog until it is opend-sourced (#32386) --- .../IO_Iceberg_Integration_Tests.json | 2 +- sdks/java/io/expansion-service/build.gradle | 2 - .../io/iceberg/bigquerymetastore/build.gradle | 56 ---- sdks/java/io/iceberg/build.gradle | 6 - .../iceberg/BigQueryMetastoreCatalogIT.java | 274 ------------------ settings.gradle.kts | 2 - 6 files changed, 1 insertion(+), 341 deletions(-) delete mode 100644 sdks/java/io/iceberg/bigquerymetastore/build.gradle delete mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BigQueryMetastoreCatalogIT.java diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 253d9796f902..1efc8e9e4405 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 8 + "modification": 1 } diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 37ee03eba00d..498950b3dc47 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -52,8 +52,6 @@ dependencies { // Needed for HiveCatalog runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2") runtimeOnly project(path: ":sdks:java:io:iceberg:hive:exec", configuration: "shadow") - // Needed for BigQuery Metastore catalog (this isn't supported for java 8) - runtimeOnly project(path: ":sdks:java:io:iceberg:bigquerymetastore", configuration: "shadow") runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 diff --git a/sdks/java/io/iceberg/bigquerymetastore/build.gradle b/sdks/java/io/iceberg/bigquerymetastore/build.gradle deleted file mode 100644 index 20e4a33b09f2..000000000000 --- a/sdks/java/io/iceberg/bigquerymetastore/build.gradle +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -plugins { - id 'org.apache.beam.module' -} - -def bqmsLocation = "$buildDir/libs" - -applyJavaNature( - automaticModuleName: 'org.apache.beam.sdk.io.iceberg.bqms', - shadowClosure: { - dependencies { - include(dependency(files("$bqmsLocation/iceberg-bigquery-catalog-1.5.2-0.1.0.jar"))) - } - relocate 'com.google.guava', getJavaRelocatedPath('iceberg.bqms.com.google.guava') - }, - validateShadowJar: false -) - -description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: BigQuery Metastore" -ext.summary = "A copy of the BQMS catalog with some popular libraries relocated." - -task downloadBqmsJar(type: Copy) { - def jarUrl = 'https://storage.googleapis.com/spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-0.1.0.jar' - def outputDir = file("$bqmsLocation") - outputDir.mkdirs() - - ant.get(src: jarUrl, dest: outputDir) -} - -repositories { - flatDir { - dirs "$bqmsLocation" - } -} - -compileJava.dependsOn downloadBqmsJar - -dependencies { - implementation files("$bqmsLocation/iceberg-bigquery-catalog-1.5.2-0.1.0.jar") -} diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 97e81afe7129..3d653d6b276e 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -55,7 +55,6 @@ dependencies { implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" implementation library.java.hadoop_common - testImplementation project(path: ":sdks:java:io:iceberg:bigquerymetastore", configuration: "shadow") testImplementation library.java.hadoop_client testImplementation library.java.bigdataoss_gcsio testImplementation library.java.bigdataoss_gcs_connector @@ -110,11 +109,6 @@ task integrationTest(type: Test) { outputs.upToDateWhen { false } include '**/*IT.class' - // BQ metastore catalog doesn't support java 8 - if (project.findProperty('testJavaVersion') == '8' || - JavaVersion.current().equals(JavaVersion.VERSION_1_8)) { - exclude '**/BigQueryMetastoreCatalogIT.class' - } maxParallelForks 4 classpath = sourceSets.test.runtimeClasspath diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BigQueryMetastoreCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BigQueryMetastoreCatalogIT.java deleted file mode 100644 index 7abf2671082f..000000000000 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BigQueryMetastoreCatalogIT.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.iceberg; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.LongStream; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.managed.Managed; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.ManifestFiles; -import org.apache.iceberg.ManifestWriter; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableScan; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SupportsNamespaces; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.GenericParquetReaders; -import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.encryption.InputFilesDecryptor; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -/** - * Integration tests for reading and writing Iceberg tables using the BigQuery Metastore Catalog. - */ -public class BigQueryMetastoreCatalogIT { - private static final Schema DOUBLY_NESTED_ROW_SCHEMA = - Schema.builder() - .addStringField("doubly_nested_str") - .addInt64Field("doubly_nested_float") - .build(); - - private static final Schema NESTED_ROW_SCHEMA = - Schema.builder() - .addStringField("nested_str") - .addInt32Field("nested_int") - .addFloatField("nested_float") - .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) - .build(); - private static final Schema BEAM_SCHEMA = - Schema.builder() - .addStringField("str") - .addBooleanField("bool") - .addNullableInt32Field("nullable_int") - .addNullableInt64Field("nullable_long") - .addArrayField("arr_long", Schema.FieldType.INT64) - .addRowField("row", NESTED_ROW_SCHEMA) - .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) - .build(); - - private static final SimpleFunction ROW_FUNC = - new SimpleFunction() { - @Override - public Row apply(Long num) { - String strNum = Long.toString(num); - Row nestedRow = - Row.withSchema(NESTED_ROW_SCHEMA) - .addValue("nested_str_value_" + strNum) - .addValue(Integer.valueOf(strNum)) - .addValue(Float.valueOf(strNum + "." + strNum)) - .addValue( - Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA) - .addValue("doubly_nested_str_value_" + strNum) - .addValue(num) - .build()) - .build(); - - return Row.withSchema(BEAM_SCHEMA) - .addValue("str_value_" + strNum) - .addValue(num % 2 == 0) - .addValue(Integer.valueOf(strNum)) - .addValue(num) - .addValue(LongStream.range(1, num % 10).boxed().collect(Collectors.toList())) - .addValue(nestedRow) - .addValue(num % 2 == 0 ? null : nestedRow) - .build(); - } - }; - - private static final org.apache.iceberg.Schema ICEBERG_SCHEMA = - IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA); - private static final SimpleFunction RECORD_FUNC = - new SimpleFunction() { - @Override - public Record apply(Row input) { - return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input); - } - }; - - @Rule public TestPipeline writePipeline = TestPipeline.create(); - - @Rule public TestPipeline readPipeline = TestPipeline.create(); - - private static final String TEST_CATALOG = "beam_test_" + System.nanoTime(); - private static final String DATASET = "iceberg_bigquerymetastore_test_" + System.nanoTime(); - @Rule public TestName testName = new TestName(); - private static final String WAREHOUSE = TestPipeline.testingPipelineOptions().getTempLocation(); - private static Catalog catalog; - private static Map catalogProps; - private TableIdentifier tableIdentifier; - - @BeforeClass - public static void setUp() { - GcpOptions options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); - catalogProps = - ImmutableMap.builder() - .put("gcp_project", options.getProject()) - .put("gcp_location", "us-central1") - .put("catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") - .put("warehouse", WAREHOUSE) - .build(); - catalog = - CatalogUtil.loadCatalog( - catalogProps.get("catalog-impl"), TEST_CATALOG, catalogProps, new Configuration()); - catalog.initialize(TEST_CATALOG, catalogProps); - ((SupportsNamespaces) catalog).createNamespace(Namespace.of(DATASET)); - } - - @After - public void cleanup() { - // We need to cleanup tables first before deleting the dataset - catalog.dropTable(tableIdentifier); - } - - @AfterClass - public static void tearDown() { - ((SupportsNamespaces) catalog).dropNamespace(Namespace.of(DATASET)); - } - - private Map getManagedIcebergConfig(TableIdentifier table) { - return ImmutableMap.builder() - .put("table", table.toString()) - .put("catalog_name", TEST_CATALOG) - .put("catalog_properties", catalogProps) - .build(); - } - - @Test - public void testReadWithBqmsCatalog() throws IOException { - tableIdentifier = - TableIdentifier.parse(String.format("%s.%s", DATASET, testName.getMethodName())); - Table table = catalog.createTable(tableIdentifier, ICEBERG_SCHEMA); - - List expectedRows = - LongStream.range(1, 1000).boxed().map(ROW_FUNC::apply).collect(Collectors.toList()); - List records = - expectedRows.stream().map(RECORD_FUNC::apply).collect(Collectors.toList()); - - // write iceberg records with bqms catalog - String filepath = table.location() + "/" + UUID.randomUUID(); - DataWriter writer = - Parquet.writeData(table.io().newOutputFile(filepath)) - .schema(ICEBERG_SCHEMA) - .createWriterFunc(GenericParquetWriter::buildWriter) - .overwrite() - .withSpec(table.spec()) - .build(); - for (Record rec : records) { - writer.write(rec); - } - writer.close(); - AppendFiles appendFiles = table.newAppend(); - String manifestFilename = FileFormat.AVRO.addExtension(filepath + ".manifest"); - OutputFile outputFile = table.io().newOutputFile(manifestFilename); - ManifestWriter manifestWriter; - try (ManifestWriter openWriter = ManifestFiles.write(table.spec(), outputFile)) { - openWriter.add(writer.toDataFile()); - manifestWriter = openWriter; - } - appendFiles.appendManifest(manifestWriter.toManifestFile()); - appendFiles.commit(); - - // Run Managed Iceberg read - PCollection outputRows = - readPipeline - .apply( - Managed.read(Managed.ICEBERG).withConfig(getManagedIcebergConfig(tableIdentifier))) - .getSinglePCollection(); - PAssert.that(outputRows).containsInAnyOrder(expectedRows); - readPipeline.run().waitUntilFinish(); - } - - @Test - public void testWriteWithBqmsCatalog() { - tableIdentifier = - TableIdentifier.parse(String.format("%s.%s", DATASET, testName.getMethodName())); - catalog.createTable(tableIdentifier, IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA)); - - List inputRows = - LongStream.range(1, 1000).mapToObj(ROW_FUNC::apply).collect(Collectors.toList()); - List expectedRecords = - inputRows.stream().map(RECORD_FUNC::apply).collect(Collectors.toList()); - - // Run Managed Iceberg write - writePipeline - .apply(Create.of(inputRows)) - .setRowSchema(BEAM_SCHEMA) - .apply(Managed.write(Managed.ICEBERG).withConfig(getManagedIcebergConfig(tableIdentifier))); - writePipeline.run().waitUntilFinish(); - - // read back the records and check everything's there - Table table = catalog.loadTable(tableIdentifier); - TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA); - List writtenRecords = new ArrayList<>(); - for (CombinedScanTask task : tableScan.planTasks()) { - InputFilesDecryptor descryptor = - new InputFilesDecryptor(task, table.io(), table.encryption()); - for (FileScanTask fileTask : task.files()) { - InputFile inputFile = descryptor.getInputFile(fileTask); - CloseableIterable iterable = - Parquet.read(inputFile) - .split(fileTask.start(), fileTask.length()) - .project(ICEBERG_SCHEMA) - .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(ICEBERG_SCHEMA, fileSchema)) - .filter(fileTask.residual()) - .build(); - - for (Record rec : iterable) { - writtenRecords.add(rec); - } - } - } - assertThat(expectedRecords, containsInAnyOrder(writtenRecords.toArray())); - } -} diff --git a/settings.gradle.kts b/settings.gradle.kts index 1a32a8f111c4..65a55885afa7 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -369,5 +369,3 @@ include("sdks:java:io:iceberg:hive") findProject(":sdks:java:io:iceberg:hive")?.name = "hive" include("sdks:java:io:iceberg:hive:exec") findProject(":sdks:java:io:iceberg:hive:exec")?.name = "exec" -include("sdks:java:io:iceberg:bigquerymetastore") -findProject(":sdks:java:io:iceberg:bigquerymetastore")?.name = "bigquerymetastore"