This project is an adapter to connect Google Protobuf to the flink's
own TypeInformation
-based serialization framework.
TestCode is copy from official flink-protobuf, and modified to support dynamic protobuf test.
Here is an example to create a table using the Kafka connector and DynamicProtobuf format.
Below is the proto definition file.
syntax = "proto3";
package com.example;
option java_package = "com.example";
option java_multiple_files = true;
message SimpleTest {
optional int64 uid = 1;
optional string name = 2;
optional int32 category_type = 3;
optional bytes content = 4;
optional double price = 5;
map<int64, InnerMessageTest> value_map = 6;
repeated InnerMessageTest value_arr = 7;
message InnerMessageTest{
optional int64 v1 =1;
optional int32 v2 =2;
}
}
Define your table structure. This implementation does not require adding the compiled protobuf classes to the classpath; it directly parses the protobuf byte[]
based on the field structure of the table. However, there are a few requirements:
- Fields must start from 1, and there can be no missing indices in between.
- Since there is no original ProtoBuf definition, it cannot include Enum types.
CREATE TABLE simple_test (
uid BIGINT,
name STRING,
category_type INT,
content BINARY,
price DOUBLE,
value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
value_arr array<row<v1 BIGINT, v2 INT>>
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'dynamic-protobuf3'
)
Option | Required | Forwarded | Default | Type | Description |
---|---|---|---|---|---|
required | no | (none) | String | Specify what format to use, here should be 'dynamic-protobuf3' . |
|
optional | yes | false | Boolean | If this value is set to true, primtive types will be set to default values instead of null |
The following table lists the type mapping from Flink type to Protobuf type.
Flink SQL type | Protobuf type | Description |
---|---|---|
CHAR / VARCHAR / STRING |
string |
|
BOOLEAN |
bool |
|
BINARY / VARBINARY |
bytes |
|
INT |
int32 |
|
BIGINT |
int64 |
|
FLOAT |
float |
|
DOUBLE |
double |
|
ARRAY |
repeated |
Elements cannot be null, the string default value can be specified by write-null-string-literal |
MAP |
map |
Keys or values cannot be null, the string default value can be specified by write-null-string-literal |
ROW |
message |
|
ROW<seconds BIGINT, nanos INT> |
google.protobuf.timestamp |
The google.protobuf.timestamp type can be mapped to seconds and fractions of seconds at nanosecond resolution in UTC epoch time using the row type as well as the protobuf definition. |
As protobuf does not permit null values in maps and array, we need to auto-generate default values when converting from Flink Rows to Protobuf.
Protobuf Data Type | Default Value |
---|---|
int32 / int64 / float / double | 0 |
string | "" |
bool | false |
binary | ByteString.EMPTY |
message | MESSAGE.getDefaultInstance() |
In the serialization process, there's no guarantee that the Flink fields of the same one-of group only contain at most one valid value. When serializing, each field is set in the order of Flink schema, so the field in the higher position will override the field in lower position in the same one-of group.
You can refer to Language Guide (proto3) for more information about Protobuf types.