Skip to content

Iceberg sql json #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
flink-application-properties-dev.json filter=arn-filter
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ venv/
/pyflink/

/.run/

clean.sh
smudge.sh
86 changes: 86 additions & 0 deletions java/Iceberg/IcebergDataStreamSource/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Flink Iceberg Source using DataStream API

* Flink version: 1.20.0
* Flink API: DataStream API
* Iceberg 1.6.1
* Language: Java (11)
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/)

This example demonstrate how to use
[Flink Iceberg Source Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with the Glue Data Catalog.

For simplicity, the application generates synthetic data, random stock prices, internally.
Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records
serialized with AVRO.

### Prerequisites

The application expects the following resources:
* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default").
The application creates the Table, but the Catalog must exist already.
* An S3 bucket to write the Iceberg table.

#### IAM Permissions

The application must have IAM permissions to:
* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables.
See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html).
* Read and Write from the S3 bucket.

### Runtime configuration

When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties.

When running locally, the configuration is read from the
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file.

Runtime parameters:

| Group ID | Key | Default | Description |
|-----------|--------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------|
| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. |
| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket prefix, for example `s3://my-bucket/iceberg`. |
| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. |
| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. |
| `Iceberg` | `partition.fields` | `symbol` | Comma separated list of partition fields. |
| `Iceberg` | `sort.field` | `timestamp` | Sort field. |
| `Iceberg` | `operation` | `updsert` | Iceberg operation. One of `upsert`, `append` or `overwrite`. |
| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. |


### Checkpoints

Checkpointing must be enabled. Iceberg commits writes on checkpoint.

When running locally, the application enables checkpoints programmatically, every 10 seconds.
When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.


### Known limitations

At the moment there are current limitations concerning Flink Iceberg integration:
* Doesn't support Iceberg Table with hidden partitioning
* Doesn't support adding columns, removing columns, renaming columns or changing columns.

### Schema and schema evolution

The application must "know" the AVRO schema on start.
The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry.
This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront.

This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible.
Schema changes are not propagated to Iceberg.
As long as the schema of incoming records is FORWARD compatible, the application deserializes incoming records using
the schema it knows. Any new field in the incoming record is discarded.

In this example, the schema is loaded from a schema definition file, [price.avsc](./src/main/resources/price.avsc) embedded
with the application.
It is technically possible to fetch the schema on application start from an external source, like a schema registry or a
schema definition file in an S3 bucket. This is beyond the scope of this example.

### Running locally, in IntelliJ

You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.

See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details.
212 changes: 212 additions & 0 deletions java/Iceberg/IcebergDataStreamSource/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.amazonaws</groupId>
<artifactId>iceberg-datastream-source</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<target.java.version>11</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>

<flink.version>1.20.0</flink.version>
<avro.version>1.11.3</avro.version>
<hadoop.version>3.4.0</hadoop.version>
<iceberg.version>1.6.1</iceberg.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<log4j.version>2.23.1</log4j.version>
<junit5.version>5.8.1</junit5.version>
</properties>

<dependencies>
<!-- Flink Core dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Flink Iceberg uses DropWizard metrics -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-kinesisanalytics-runtime</artifactId>
<version>${kda.runtime.version}</version>
<scope>provided</scope>
</dependency>

<!-- AVRO -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>

<!--Iceberg dependencies -->
<!-- DO NOT include the iceberg-flink-runtime-* dependency, because it contains a shaded version of Avro -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-1.19</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws-bundle</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<version>${iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<!-- exclude to prevent multiple of SLF4j binding conflict -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>


<!-- Tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>

<!-- Logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>

<!-- Shade plugin to build the fat-jar including all required dependencies -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.amazonaws.services.msf.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading