Skip to content

Commit

Permalink
Do not reuse parser
Browse files Browse the repository at this point in the history
  • Loading branch information
Michel Davit committed Jan 31, 2025
1 parent b2e72cb commit d62f69d
Showing 1 changed file with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ final private class SlowGenericRecordCoder extends CustomCoder[GenericRecord] {
private val sc = StringUtf8Coder.of()

// Reuse parsed schemas because avro uses caching
@transient private lazy val parser = new Schema.Parser()
@transient private lazy val schemaCache = new TrieMap[String, Schema]()

private val encoder = new EmptyOnDeserializationThreadLocal[BinaryEncoder]();
private val decoder = new EmptyOnDeserializationThreadLocal[BinaryDecoder]();
private val encoder = new EmptyOnDeserializationThreadLocal[BinaryEncoder]()
private val decoder = new EmptyOnDeserializationThreadLocal[BinaryDecoder]()

override def encode(value: GenericRecord, os: OutputStream): Unit = {
val enc = EncoderFactory.get().directBinaryEncoder(os, encoder.get())
Expand All @@ -60,7 +59,7 @@ final private class SlowGenericRecordCoder extends CustomCoder[GenericRecord] {
val dec = DecoderFactory.get().directBinaryDecoder(is, decoder.get())
decoder.set(dec)
val schemaString = sc.decode(is)
val schema = schemaCache.getOrElseUpdate(schemaString, parser.parse(schemaString))
val schema = schemaCache.getOrElseUpdate(schemaString, new Schema.Parser().parse(schemaString))
val reader = AvroDatumFactory.generic()(schema, schema)
reader.read(null, dec)
}
Expand Down

0 comments on commit d62f69d

Please sign in to comment.