Skip to content

Commit

Permalink
revert Functions.simpleFn one-liners in Parquet (#4578)
Browse files Browse the repository at this point in the history
* Revert "Update scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala"

This reverts commit e6692e8.

* Revert "use Functions.simpleFn"

This reverts commit 37da210.
  • Loading branch information
clairemcginty authored Oct 28, 2022
1 parent 57d662a commit ee45260
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ object ParquetAvroIO {
val transform = HadoopFormatIO
.read[JBoolean, T]()
// Hadoop input always emit key-value, and `Void` causes NPE in Beam coder
.withKeyTranslation(Functions.simpleFn[Void, JBoolean](_ => true))
.withKeyTranslation(new SimpleFunction[Void, JBoolean]() {
override def apply(input: Void): JBoolean = true
})
.withValueTranslation(new SimpleFunction[A, T]() {
// Workaround for incomplete Avro objects
// `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil}
import com.spotify.scio.testing.TestDataManager
import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil}
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.util.FilenamePolicySupplier
import com.spotify.scio.values.SCollection
import me.lyh.parquet.tensorflow.{
ExampleParquetInputFormat,
Expand All @@ -38,6 +39,7 @@ import org.apache.beam.sdk.io._
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
import org.apache.beam.sdk.transforms.SerializableFunctions
import org.apache.beam.sdk.transforms.SimpleFunction
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.filter2.predicate.FilterPredicate
Expand Down Expand Up @@ -116,7 +118,9 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] {
val source = HadoopFormatIO
.read[JBoolean, Example]()
// Hadoop input always emit key-value, and `Void` causes NPE in Beam coder
.withKeyTranslation(Functions.simpleFn[Void, JBoolean](_ => true))
.withKeyTranslation(new SimpleFunction[Void, JBoolean]() {
override def apply(input: Void): JBoolean = true
})
.withConfiguration(job.getConfiguration)
sc.applyTransform(source).map(_.getValue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil}
import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil}
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.util.FilenamePolicySupplier
import com.spotify.scio.values.SCollection
import magnolify.parquet.ParquetType
import org.apache.beam.sdk.io.fs.ResourceId
Expand Down Expand Up @@ -102,7 +103,9 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType](
val source = HadoopFormatIO
.read[JBoolean, T]
// Hadoop input always emit key-value, and `Void` causes NPE in Beam coder
.withKeyTranslation(Functions.simpleFn[Void, JBoolean](_ => true))
.withKeyTranslation(new SimpleFunction[Void, JBoolean]() {
override def apply(input: Void): JBoolean = true
})
.withValueTranslation(
new SimpleFunction[T, T]() {
override def apply(input: T): T = input
Expand Down

0 comments on commit ee45260

Please sign in to comment.