From a1e4b931a9a8836b3959f16f6fa599ac972bb9ae Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Tue, 22 Oct 2024 10:50:24 +0200 Subject: [PATCH] [proxima-beam-core] O2-Czech-Republic#339 expander fixes --- .../o2/proxima/beam/util/state/ExpandContext.java | 7 ++++--- .../proxima/beam/util/state/MethodCallUtils.java | 14 +++++++++----- .../beam/util/state/ExternalStateExpanderTest.java | 1 + .../TransactionalOnlineAttributeWriterTest.java | 8 ++------ 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/util/state/ExpandContext.java b/beam/core/src/main/java/cz/o2/proxima/beam/util/state/ExpandContext.java index 7c71ba118..38c82db2f 100644 --- a/beam/core/src/main/java/cz/o2/proxima/beam/util/state/ExpandContext.java +++ b/beam/core/src/main/java/cz/o2/proxima/beam/util/state/ExpandContext.java @@ -380,8 +380,9 @@ PTransform, PCollectionTuple> transformedParDo( return new PTransform<>() { @Override public PCollectionTuple expand(PCollection input) { + Coder inputCoder = input.getCoder(); @SuppressWarnings("unchecked") - KvCoder coder = (KvCoder) input.getCoder(); + KvCoder coder = (KvCoder) inputCoder; Coder keyCoder = coder.getKeyCoder(); Coder valueCoder = coder.getValueCoder(); TypeDescriptor> valueDescriptor = @@ -413,7 +414,7 @@ public PCollectionTuple expand(PCollection input) { PCollectionTuple tuple = flattened.apply( "expanded", - ParDo.of(transformedDoFn(doFn, (KvCoder) input.getCoder(), mainOutputTag)) + ParDo.of(transformedDoFn(doFn, (KvCoder) inputCoder, mainOutputTag)) .withOutputTags(mainOutputTag, otherOutputs.and(STATE_TUPLE_TAG))); PCollectionTuple res = PCollectionTuple.empty(input.getPipeline()); for (Entry, PCollection> e : @@ -946,7 +947,7 @@ public void intercept(@This DoFn>, ?> doFn, @AllArguments boolean isNextScheduled = nextFlush != null && nextFlush.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE); if (isNextScheduled) { - flushTimer.set(nextFlush); + flushTimer.withOutputTimestamp(nextFlush).set(nextFlush); nextFlushState.write(nextFlush); } @SuppressWarnings("unchecked") diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/util/state/MethodCallUtils.java b/beam/core/src/main/java/cz/o2/proxima/beam/util/state/MethodCallUtils.java index 57418aa5c..8d2463da7 100644 --- a/beam/core/src/main/java/cz/o2/proxima/beam/util/state/MethodCallUtils.java +++ b/beam/core/src/main/java/cz/o2/proxima/beam/util/state/MethodCallUtils.java @@ -331,16 +331,18 @@ static Map> getStateUpdaters(DoFn d Pair.of( p.getSecond().value(), createUpdater( + p.getSecond().value(), ((StateSpec) ExceptionUtils.uncheckedFactory(() -> p.getFirst().get(doFn)))))) .filter(p -> p.getSecond() != null) .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond)); } - @SuppressWarnings("unchecked") - private static @Nullable BiConsumer createUpdater(StateSpec stateSpec) { + private static @Nullable BiConsumer createUpdater( + String name, StateSpec stateSpec) { + AtomicReference> consumer = new AtomicReference<>(); - stateSpec.bind("dummy", createUpdaterBinder(consumer)); + stateSpec.bind(name, createUpdaterBinder(consumer)); return consumer.get(); } @@ -363,6 +365,7 @@ static LinkedHashMap>> g Pair.of( p.getSecond().value(), createReader( + p.getSecond().value(), ((StateSpec) ExceptionUtils.uncheckedFactory(() -> p.getFirst().get(doFn)))))) .filter(p -> p.getSecond() != null) @@ -372,9 +375,10 @@ static LinkedHashMap>> g @SuppressWarnings("unchecked") private static @Nullable BiFunction> createReader( - StateSpec stateSpec) { + String name, StateSpec stateSpec) { + AtomicReference>> res = new AtomicReference<>(); - stateSpec.bind("dummy", createStateReaderBinder(res)); + stateSpec.bind(name, createStateReaderBinder(res)); return res.get(); } diff --git a/beam/core/src/test/java/cz/o2/proxima/beam/util/state/ExternalStateExpanderTest.java b/beam/core/src/test/java/cz/o2/proxima/beam/util/state/ExternalStateExpanderTest.java index 86de349e5..0ef3e66d3 100644 --- a/beam/core/src/test/java/cz/o2/proxima/beam/util/state/ExternalStateExpanderTest.java +++ b/beam/core/src/test/java/cz/o2/proxima/beam/util/state/ExternalStateExpanderTest.java @@ -224,6 +224,7 @@ public void testSimpleExpandWithStateStore() throws IOException { (int) CoderUtils.decodeFromByteArray( VarIntCoder.of(), second.getValue().getValue().getKey())); + assertEquals("sum", second.getValue().getValue().getName()); } @Test diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriterTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriterTest.java index 5645bf7d9..3557fc4b4 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriterTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriterTest.java @@ -220,9 +220,7 @@ public void testTransactionCreateCommit() throws InterruptedException { } @Test(timeout = 10_000) - public void testTransactionCreateUpdateCommitMultipleOutputs() - throws InterruptedException, TransactionRejectedException { - + public void testTransactionCreateUpdateCommitMultipleOutputs() throws InterruptedException { CachedView view = Optionals.get(direct.getCachedView(status)); view.assign(view.getPartitions()); OnlineAttributeWriter writer = Optionals.get(direct.getWriter(status)); @@ -348,9 +346,7 @@ public void testTransactionCommitReject() throws InterruptedException { } @Test(timeout = 10000) - public void testGlobalTransactionWriter() - throws InterruptedException, TransactionRejectedException { - + public void testGlobalTransactionWriter() throws InterruptedException { TransactionalOnlineAttributeWriter writer = direct.getGlobalTransactionWriter(); assertTrue(user.isTransactional()); // we successfully open and commit the transaction