From d62f69d5dce52557360ebb9f3722eee885e4d9d4 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 31 Jan 2025 16:34:19 +0100 Subject: [PATCH] Do not reuse parser --- .../scala/com/spotify/scio/coders/avro/AvroCoders.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/scio-avro/src/main/scala/com/spotify/scio/coders/avro/AvroCoders.scala b/scio-avro/src/main/scala/com/spotify/scio/coders/avro/AvroCoders.scala index ba64173917..e2d8abe48e 100644 --- a/scio-avro/src/main/scala/com/spotify/scio/coders/avro/AvroCoders.scala +++ b/scio-avro/src/main/scala/com/spotify/scio/coders/avro/AvroCoders.scala @@ -40,11 +40,10 @@ final private class SlowGenericRecordCoder extends CustomCoder[GenericRecord] { private val sc = StringUtf8Coder.of() // Reuse parsed schemas because avro uses caching - @transient private lazy val parser = new Schema.Parser() @transient private lazy val schemaCache = new TrieMap[String, Schema]() - private val encoder = new EmptyOnDeserializationThreadLocal[BinaryEncoder](); - private val decoder = new EmptyOnDeserializationThreadLocal[BinaryDecoder](); + private val encoder = new EmptyOnDeserializationThreadLocal[BinaryEncoder]() + private val decoder = new EmptyOnDeserializationThreadLocal[BinaryDecoder]() override def encode(value: GenericRecord, os: OutputStream): Unit = { val enc = EncoderFactory.get().directBinaryEncoder(os, encoder.get()) @@ -60,7 +59,7 @@ final private class SlowGenericRecordCoder extends CustomCoder[GenericRecord] { val dec = DecoderFactory.get().directBinaryDecoder(is, decoder.get()) decoder.set(dec) val schemaString = sc.decode(is) - val schema = schemaCache.getOrElseUpdate(schemaString, parser.parse(schemaString)) + val schema = schemaCache.getOrElseUpdate(schemaString, new Schema.Parser().parse(schemaString)) val reader = AvroDatumFactory.generic()(schema, schema) reader.read(null, dec) }