Skip to content

Commit

Permalink
Fix Java KafkaIO Performance test job (apache#33685)
Browse files Browse the repository at this point in the history
* fix java kafkaIO performance tests

* remove duplicated line
  • Loading branch information
akashorabek authored Jan 23, 2025
1 parent b057a63 commit f52f77f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/beam_PerformanceTests_Kafka_IO.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
github.event_name == 'workflow_dispatch' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
github.event.comment.body == 'Run Java KafkaIO Performance Test'
runs-on: [self-hosted, ubuntu-20.04, main]
runs-on: [self-hosted, ubuntu-20.04, highmem]
timeout-minutes: 120
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
Expand Down Expand Up @@ -142,6 +143,16 @@ public class KafkaIOIT {

private static final Logger LOG = LoggerFactory.getLogger(KafkaIOIT.class);

private static final int RETRIES_CONFIG = 10;

private static final int REQUEST_TIMEOUT_MS_CONFIG = 600000;

private static final int MAX_BLOCK_MS_CONFIG = 300000;

private static final int BUFFER_MEMORY_CONFIG = 100554432;

private static final int RETRY_BACKOFF_MS_CONFIG = 5000;

private static SyntheticSourceOptions sourceOptions;

private static Options options;
Expand Down Expand Up @@ -938,7 +949,14 @@ private KafkaIO.Write<byte[], byte[]> writeToKafka() {
return KafkaIO.<byte[], byte[]>write()
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
.withKeySerializer(ByteArraySerializer.class)
.withValueSerializer(ByteArraySerializer.class);
.withValueSerializer(ByteArraySerializer.class)
.withProducerConfigUpdates(
ImmutableMap.of(
ProducerConfig.RETRIES_CONFIG, RETRIES_CONFIG,
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS_CONFIG,
ProducerConfig.MAX_BLOCK_MS_CONFIG, MAX_BLOCK_MS_CONFIG,
ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY_CONFIG,
ProducerConfig.RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS_CONFIG));
}

private KafkaIO.Read<byte[], byte[]> readFromBoundedKafka() {
Expand Down

0 comments on commit f52f77f

Please sign in to comment.