diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala index 2e83a0a242..4c3fdf8de1 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala @@ -250,7 +250,9 @@ object ParquetAvroIO { val transform = HadoopFormatIO .read[JBoolean, T]() // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder - .withKeyTranslation(Functions.simpleFn[Void, JBoolean](_ => true)) + .withKeyTranslation(new SimpleFunction[Void, JBoolean]() { + override def apply(input: Void): JBoolean = true + }) .withValueTranslation(new SimpleFunction[A, T]() { // Workaround for incomplete Avro objects // `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala index bb3703516d..9cbe664879 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala @@ -24,7 +24,8 @@ import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil} import com.spotify.scio.testing.TestDataManager -import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} +import com.spotify.scio.util.ScioUtil +import com.spotify.scio.util.FilenamePolicySupplier import com.spotify.scio.values.SCollection import me.lyh.parquet.tensorflow.{ ExampleParquetInputFormat, @@ -38,6 +39,7 @@ import org.apache.beam.sdk.io._ import org.apache.beam.sdk.io.fs.ResourceId import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider import org.apache.beam.sdk.transforms.SerializableFunctions +import org.apache.beam.sdk.transforms.SimpleFunction import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.parquet.filter2.predicate.FilterPredicate @@ -116,7 +118,9 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { val source = HadoopFormatIO .read[JBoolean, Example]() // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder - .withKeyTranslation(Functions.simpleFn[Void, JBoolean](_ => true)) + .withKeyTranslation(new SimpleFunction[Void, JBoolean]() { + override def apply(input: Void): JBoolean = true + }) .withConfiguration(job.getConfiguration) sc.applyTransform(source).map(_.getValue) } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala index 5b882b0bdc..1069cdc2e8 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala @@ -23,7 +23,8 @@ import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil} -import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} +import com.spotify.scio.util.ScioUtil +import com.spotify.scio.util.FilenamePolicySupplier import com.spotify.scio.values.SCollection import magnolify.parquet.ParquetType import org.apache.beam.sdk.io.fs.ResourceId @@ -102,7 +103,9 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType]( val source = HadoopFormatIO .read[JBoolean, T] // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder - .withKeyTranslation(Functions.simpleFn[Void, JBoolean](_ => true)) + .withKeyTranslation(new SimpleFunction[Void, JBoolean]() { + override def apply(input: Void): JBoolean = true + }) .withValueTranslation( new SimpleFunction[T, T]() { override def apply(input: T): T = input