Skip to content

Commit

Permalink
add engine agnostic GroupWithoutRepartition transform (#133)
Browse files Browse the repository at this point in the history
* experiment

* enable engine agnostic GroupWithoutRepartition

* update version

* add deprecate flag
  • Loading branch information
minxhe authored Dec 11, 2024
1 parent b983516 commit 81a585c
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
* Samza runner. For example:
*
* <p>input.apply(GroupWithoutRepartition.of(Count.perKey()));
*
* @deprecated use {@link org.apache.beam.sdk.transforms.GroupWithoutRepartition} instead.
*/
@Deprecated
public class GroupWithoutRepartition<InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {
private final PTransform<InputT, OutputT> transform;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ private static boolean needRepartition(TransformHierarchy.Node node, Translation
return true;
}

if (node.getTransform() instanceof GroupWithoutRepartition) {
if (node.getTransform() instanceof GroupWithoutRepartition
|| node.getTransform() instanceof org.apache.beam.sdk.transforms.GroupWithoutRepartition) {
return false;
} else {
return needRepartition(node.getEnclosingNode(), ctx);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;

import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;

/**
* A wrapper transform of {@link org.apache.beam.sdk.transforms.GroupByKey} or {@link
* org.apache.beam.sdk.transforms.join.CoGroupByKey} to indicate there is no repartition needed for
* Flink runner. For example:
*
* <p>input.apply(GroupWithoutRepartition.of(Count.perKey()));
*
* @deprecated this transform's sole purposes is to maintain backward compatibility
* for existing LinkedIn internal use cases. Do not use this class for new development.
*/
@Deprecated
public class GroupWithoutRepartition<InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {
private final PTransform<InputT, OutputT> transform;

public static <InputT extends PInput, OutputT extends POutput>
GroupWithoutRepartition<InputT, OutputT> of(PTransform<InputT, OutputT> transform) {
return new GroupWithoutRepartition<>(transform);
}

private GroupWithoutRepartition(PTransform<InputT, OutputT> transform) {
this.transform = transform;
}

@Override
@SuppressWarnings("unchecked")
public OutputT expand(InputT input) {
if (input instanceof PCollection) {
return (OutputT) ((PCollection) input).apply(transform);
} else if (input instanceof KeyedPCollectionTuple) {
return (OutputT) ((KeyedPCollectionTuple) input).apply(transform);
} else {
throw new RuntimeException(
transform.getName()
+ " is not supported with "
+ GroupWithoutRepartition.class.getSimpleName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,38 @@ public void testGroupByKey() {
p.run();
}

@Test
@Category(ValidatesRunner.class)
public void testGroupWithoutRepartition() {
List<KV<String, Integer>> ungroupedPairs =
Arrays.asList(
KV.of("k1", 3),
KV.of("k5", Integer.MAX_VALUE),
KV.of("k5", Integer.MIN_VALUE),
KV.of("k2", 66),
KV.of("k1", 4),
KV.of("k2", -33),
KV.of("k3", 0));

PCollection<KV<String, Integer>> input =
p.apply(
Create.of(ungroupedPairs)
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));

PCollection<KV<String, Iterable<Integer>>> output = input.apply(GroupWithoutRepartition.of(GroupByKey.create()));

SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> checker =
containsKvs(
kv("k1", 3, 4),
kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE),
kv("k2", 66, -33),
kv("k3", 0));
PAssert.that(output).satisfies(checker);
PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker);

p.run();
}

@Test
@Category(ValidatesRunner.class)
public void testGroupByKeyEmpty() {
Expand Down

0 comments on commit 81a585c

Please sign in to comment.