Skip to content

YosanHo/flink-protobuf3

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 

Repository files navigation

Dynamic Protobuf-3.x format support for Apache Flink

This project is an adapter to connect Google Protobuf to the flink's own TypeInformation-based serialization framework.

TestCode is copy from official flink-protobuf, and modified to support dynamic protobuf test.

How to create a table with DynamicProtobuf format

Here is an example to create a table using the Kafka connector and DynamicProtobuf format.

Below is the proto definition file.

syntax = "proto3";
package com.example;
option java_package = "com.example";
option java_multiple_files = true;

message SimpleTest {
    optional int64 uid = 1;
    optional string name = 2;
    optional int32 category_type = 3;
    optional bytes content = 4;
    optional double price = 5;
    map<int64, InnerMessageTest> value_map = 6;
    repeated  InnerMessageTest value_arr = 7;
    
    message InnerMessageTest{
      optional int64 v1 =1;
      optional int32 v2 =2;
    }
}

Define your table structure. This implementation does not require adding the compiled protobuf classes to the classpath; it directly parses the protobuf byte[] based on the field structure of the table. However, there are a few requirements:

  1. Fields must start from 1, and there can be no missing indices in between.
  2. Since there is no original ProtoBuf definition, it cannot include Enum types.
CREATE TABLE simple_test (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'dynamic-protobuf3'
)

Format Options

Option Required Forwarded Default Type Description
format
required no (none) String Specify what format to use, here should be 'dynamic-protobuf3'.
protobuf.read-default-values
optional yes false Boolean If this value is set to true, primtive types will be set to default values instead of null

Data Type Mapping

The following table lists the type mapping from Flink type to Protobuf type.

Flink SQL type Protobuf type Description
CHAR / VARCHAR / STRING string
BOOLEAN bool
BINARY / VARBINARY bytes
INT int32
BIGINT int64
FLOAT float
DOUBLE double
ARRAY repeated Elements cannot be null, the string default value can be specified by write-null-string-literal
MAP map Keys or values cannot be null, the string default value can be specified by write-null-string-literal
ROW message
ROW<seconds BIGINT, nanos INT> google.protobuf.timestamp The google.protobuf.timestamp type can be mapped to seconds and fractions of seconds at nanosecond resolution in UTC epoch time using the row type as well as the protobuf definition.

Null Values

As protobuf does not permit null values in maps and array, we need to auto-generate default values when converting from Flink Rows to Protobuf.

Protobuf Data Type Default Value
int32 / int64 / float / double 0
string ""
bool false
binary ByteString.EMPTY
message MESSAGE.getDefaultInstance()

OneOf field

In the serialization process, there's no guarantee that the Flink fields of the same one-of group only contain at most one valid value. When serializing, each field is set in the order of Flink schema, so the field in the higher position will override the field in lower position in the same one-of group.

You can refer to Language Guide (proto3) for more information about Protobuf types.

About

No description or website provided.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages