Skip to content

Commit

Permalink
Add kafka native record headers (#135)
Browse files Browse the repository at this point in the history
This commit introduces two new kafka native headers
  1. "_t" -- which represents the timestamp at which the message was created by the producer.
  2. "_lm" -- which represents that this message is part of large message.
  • Loading branch information
viswamy committed Sep 6, 2019
1 parent 0311e2e commit 00cf733
Show file tree
Hide file tree
Showing 13 changed files with 466 additions and 30 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#gradle
.gradle/
build/
.mario/

#idea
.idea/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@

package com.linkedin.kafka.clients.largemessage;

import com.linkedin.kafka.clients.common.LargeMessageHeaderValue;
import com.linkedin.kafka.clients.consumer.LiKafkaConsumer;
import com.linkedin.kafka.clients.producer.LiKafkaProducer;
import com.linkedin.kafka.clients.utils.Constants;
import com.linkedin.kafka.clients.utils.LiKafkaClientsTestUtils;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import com.linkedin.kafka.clients.utils.PrimitiveEncoderDecoder;
import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness;
import com.linkedin.kafka.clients.utils.tests.KafkaTestUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -29,8 +31,10 @@
import java.util.Set;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;


/**
* The integration test for large message.
*/
Expand Down Expand Up @@ -65,6 +69,7 @@ public void tearDown() {

@Test
public void testLargeMessage() {
long startTime = System.currentTimeMillis();
Properties props = new Properties();
props.setProperty("large.message.enabled", "true");
props.setProperty("max.message.segment.size", "200");
Expand All @@ -76,33 +81,29 @@ public void testLargeMessage() {

/* The test will send 100 different large messages to broker, consume from broker and verify the message contents.
Here for simplicity we use a large message segment as a large message, and chunk this */
Map<String, String> messages = new HashMap<String, String>();
Map<String, String> messages = new HashMap<>();
int numberOfLargeMessages = 100;
int largeMessageSize = 1000;
final Set<String> ackedMessages = new HashSet<String>();
final Set<String> ackedMessages = new HashSet<>();
// Produce large messages.
for (int i = 0; i < numberOfLargeMessages; i++) {
final String messageId = LiKafkaClientsUtils.randomUUID().toString().replace("-", "");
String message = messageId + KafkaTestUtils.getRandomString(largeMessageSize);
messages.put(messageId, message);
largeMessageProducer.send(new ProducerRecord<String, String>(TOPIC, message),
new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// The callback should have been invoked only once.
assertFalse(ackedMessages.contains(messageId));
if (e == null) {
ackedMessages.add(messageId);
}
}
});
largeMessageProducer.send(new ProducerRecord<>(TOPIC, message), (recordMetadata, e) -> {
// The callback should have been invoked only once.
assertFalse(ackedMessages.contains(messageId));
if (e == null) {
ackedMessages.add(messageId);
}
});
}
largeMessageProducer.close();
// All messages should have been sent.
assertEquals(ackedMessages.size(), messages.size());

// Consume and verify the large messages
List<TopicPartition> partitions = new ArrayList<TopicPartition>();
List<TopicPartition> partitions = new ArrayList<>();
for (int i = 0; i < NUM_PARTITIONS; i++) {
partitions.add(new TopicPartition(TOPIC, i));
}
Expand All @@ -116,6 +117,18 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
consumptionStarted = true;
}
for (ConsumerRecord<String, String> consumerRecord : records) {
// Verify headers
Map<String, byte[]> headers = LiKafkaClientsTestUtils.fetchSpecialHeaders(consumerRecord.headers());
assertTrue(headers.containsKey(Constants.TIMESTAMP_HEADER));
assertEquals(PrimitiveEncoderDecoder.LONG_SIZE, headers.get(Constants.TIMESTAMP_HEADER).length);
long eventTimestamp = PrimitiveEncoderDecoder.decodeLong(headers.get(Constants.TIMESTAMP_HEADER), 0);
assertTrue(eventTimestamp >= startTime && eventTimestamp <= System.currentTimeMillis());
assertTrue(headers.containsKey(Constants.LARGE_MESSAGE_HEADER));
LargeMessageHeaderValue largeMessageHeaderValue = LargeMessageHeaderValue.fromBytes(headers.get(Constants.LARGE_MESSAGE_HEADER));
assertEquals(largeMessageHeaderValue.getSegmentNumber(), -1);
assertEquals(largeMessageHeaderValue.getNumberOfSegments(), 6);
assertEquals(largeMessageHeaderValue.getType(), LargeMessageHeaderValue.LEGACY);

String messageId = consumerRecord.value().substring(0, 32);
String origMessage = messages.get(messageId);
assertEquals(consumerRecord.value(), origMessage, "Messages should be the same");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import com.linkedin.kafka.clients.consumer.LiKafkaConsumer;
import com.linkedin.kafka.clients.largemessage.errors.SkippableException;
import com.linkedin.kafka.clients.utils.Constants;
import com.linkedin.kafka.clients.utils.LiKafkaClientsTestUtils;
import com.linkedin.kafka.clients.utils.PrimitiveEncoderDecoder;
import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness;
import java.io.IOException;
import java.util.BitSet;
Expand Down Expand Up @@ -54,6 +57,7 @@ public void tearDown() {
*/
@Test
public void testSend() throws IOException, InterruptedException {
long startTime = System.currentTimeMillis();
Properties props = new Properties();
props.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
final String tempTopic = "testTopic" + new Random().nextInt(1000000);
Expand All @@ -74,6 +78,11 @@ public void testSend() throws IOException, InterruptedException {
while (messageCount < RECORD_COUNT && System.currentTimeMillis() < startMs + 30000) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
Map<String, byte[]> headers = LiKafkaClientsTestUtils.fetchSpecialHeaders(record.headers());
assertTrue(headers.containsKey(Constants.TIMESTAMP_HEADER));
long eventTimestamp = PrimitiveEncoderDecoder.decodeLong(headers.get(Constants.TIMESTAMP_HEADER), 0);
assertTrue(eventTimestamp >= startTime && eventTimestamp <= System.currentTimeMillis());

int index = Integer.parseInt(record.value());
counts.set(index);
messageCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import static org.testng.Assert.assertEquals;

Expand Down Expand Up @@ -52,4 +56,26 @@ public static String getRandomString(int length) {
}
return stringBuiler.toString();
}

/**
* Special header keys have a "_" prefix and are managed internally by the clients.
* @param headers
* @return
*/
public static Map<String, byte[]> fetchSpecialHeaders(Headers headers) {
Map<String, byte[]> map = new HashMap<>();
for (Header header : headers) {

if (!header.key().startsWith("_")) {
// skip any non special header
continue;
}

if (map.containsKey(header.key())) {
throw new IllegalStateException("Duplicate special header found " + header.key());
}
map.put(header.key(), header.value());
}
return map;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information.
*/

package com.linkedin.kafka.clients.common;

import com.linkedin.kafka.clients.utils.PrimitiveEncoderDecoder;
import java.util.UUID;


/**
* This class represents the header value for a large message.
* Every large message header takes up 25 bytes and is structured as follows
*
* | Type | UUID | segmentNumber | numberOfSegments |
* | 1 byte | 16 bytes | 4 bytes | 4 bytes |
*
* The Large message header values will be used to support large messages eventually.
* (as opposed to encoding large segment metadata info inside the payload)
*/
public class LargeMessageHeaderValue {
public static final UUID EMPTY_UUID = new UUID(0L, 0L);
public static final int INVALID_SEGMENT_ID = -1;
private final byte _type;
private final UUID _uuid;
private final int _segmentNumber;
private final int _numberOfSegments;

// This indicates that the large message framework is using
// SegmentSerializer/SegmentDeserializer interface to split
// and assemble large message segments.
public static final byte LEGACY = (byte) 0;

public LargeMessageHeaderValue(byte type, UUID uuid, int segmentNumber, int numberOfSegments) {
_type = type;
_uuid = uuid;
_segmentNumber = segmentNumber;
_numberOfSegments = numberOfSegments;
}

public int getSegmentNumber() {
return _segmentNumber;
}

public int getNumberOfSegments() {
return _numberOfSegments;
}

public UUID getUuid() {
return _uuid;
}

public byte getType() {
return _type;
}

public static byte[] toBytes(LargeMessageHeaderValue largeMessageHeaderValue) {
byte[] serialized = new byte[25];
int byteOffset = 0;
serialized[byteOffset] = largeMessageHeaderValue.getType();
byteOffset += 1; // for type
PrimitiveEncoderDecoder.encodeLong(largeMessageHeaderValue.getUuid().getLeastSignificantBits(), serialized, byteOffset);
byteOffset += PrimitiveEncoderDecoder.LONG_SIZE; // for UUID(least significant bits)
PrimitiveEncoderDecoder.encodeLong(largeMessageHeaderValue.getUuid().getMostSignificantBits(), serialized, byteOffset);
byteOffset += PrimitiveEncoderDecoder.LONG_SIZE; // for UUID(most significant bits)
PrimitiveEncoderDecoder.encodeInt(largeMessageHeaderValue.getSegmentNumber(), serialized, byteOffset);
byteOffset += PrimitiveEncoderDecoder.INT_SIZE; // for segment number
PrimitiveEncoderDecoder.encodeInt(largeMessageHeaderValue.getNumberOfSegments(), serialized, byteOffset);
return serialized;
}

public static LargeMessageHeaderValue fromBytes(byte[] bytes) {
int byteOffset = 0;

byte type = bytes[byteOffset];
byteOffset += 1;
long leastSignificantBits = PrimitiveEncoderDecoder.decodeLong(bytes, byteOffset);
byteOffset += PrimitiveEncoderDecoder.LONG_SIZE;
long mostSignificantBits = PrimitiveEncoderDecoder.decodeLong(bytes, byteOffset);
byteOffset += PrimitiveEncoderDecoder.LONG_SIZE;
int segmentNumber = PrimitiveEncoderDecoder.decodeInt(bytes, byteOffset);
byteOffset += PrimitiveEncoderDecoder.INT_SIZE;
int numberOfSegments = PrimitiveEncoderDecoder.decodeInt(bytes, byteOffset);
return new LargeMessageHeaderValue(type, new UUID(mostSignificantBits, leastSignificantBits), segmentNumber, numberOfSegments);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import com.linkedin.kafka.clients.auditing.AuditType;
import com.linkedin.kafka.clients.auditing.Auditor;
import com.linkedin.kafka.clients.common.LargeMessageHeaderValue;
import com.linkedin.kafka.clients.largemessage.errors.SkippableException;
import com.linkedin.kafka.clients.utils.Constants;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import java.util.Collections;
import java.util.Objects;
Expand All @@ -17,6 +19,9 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -417,6 +422,21 @@ private ConsumerRecord<K, V> handleConsumerRecord(ConsumerRecord<byte[], byte[]>
}

_partitionConsumerHighWatermark.computeIfAbsent(tp, _storedConsumerHighWatermark)._currentConsumerHighWatermark = consumerRecord.offset();
// Create a new copy of the headers
Headers headers = new RecordHeaders(consumerRecord.headers());
Header largeMessageHeader = headers.lastHeader(Constants.LARGE_MESSAGE_HEADER);
if (largeMessageHeader != null) {
LargeMessageHeaderValue largeMessageHeaderValue = LargeMessageHeaderValue.fromBytes(largeMessageHeader.value());
// Once the large message header value is parsed, remove any such key from record headers
headers.remove(Constants.LARGE_MESSAGE_HEADER);
largeMessageHeaderValue = new LargeMessageHeaderValue(
largeMessageHeaderValue.getType(),
LargeMessageHeaderValue.EMPTY_UUID,
LargeMessageHeaderValue.INVALID_SEGMENT_ID,
largeMessageHeaderValue.getNumberOfSegments()
);
headers.add(Constants.LARGE_MESSAGE_HEADER, LargeMessageHeaderValue.toBytes(largeMessageHeaderValue));
}

handledRecord = new ConsumerRecord<>(
consumerRecord.topic(),
Expand All @@ -428,7 +448,9 @@ private ConsumerRecord<K, V> handleConsumerRecord(ConsumerRecord<byte[], byte[]>
consumerRecord.serializedKeySize(),
valueBytes == null ? 0 : valueBytes.length,
_keyDeserializer.deserialize(consumerRecord.topic(), consumerRecord.key()),
value);
value,
headers
);
}
return handledRecord;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import java.util.List;
import java.util.UUID;
import org.apache.kafka.common.header.Headers;


/**
* Message splitter for large messages
Expand Down Expand Up @@ -84,14 +86,19 @@ List<ProducerRecord<byte[], byte[]>> split(String topic,
* this large message.
* @param key The key associated with the message.
* @param serializedRecord the serialized bytes of large message to split
* @param headers headers for the producer record.
* If the header is null, a new record headers is created and large message specific values are added.
* If the header is not null, any old large message keys are removed and new values as generated by the
* current #split() call will be added.
* @return A list of IndexedRecord each contains a chunk of the original large message.
*/
List<ProducerRecord<byte[], byte[]>> split(String topic,
Integer partition,
Long timestamp,
UUID messageId,
byte[] key,
byte[] serializedRecord);
byte[] serializedRecord,
Headers headers);

/**
* Split the large message into several {@link org.apache.kafka.clients.producer.ProducerRecord}
Expand All @@ -106,6 +113,10 @@ List<ProducerRecord<byte[], byte[]>> split(String topic,
* @param key The key associated with the message.
* @param serializedRecord the serialized bytes of large message to split
* @param maxSegmentSize the max segment size to use to split the message
* @param headers headers for the producer record.
` * If the header is null, a new record headers is created and large message specific values are added.
* If the header is not null, any old large message keys are removed and new values as generated by the
* current #split() call will be added.
* @return A list of IndexedRecord each contains a chunk of the original large message.
*/
List<ProducerRecord<byte[], byte[]>> split(String topic,
Expand All @@ -114,7 +125,7 @@ List<ProducerRecord<byte[], byte[]>> split(String topic,
UUID messageId,
byte[] key,
byte[] serializedRecord,
int maxSegmentSize);

}
int maxSegmentSize,
Headers headers);

}
Loading

0 comments on commit 00cf733

Please sign in to comment.