Skip to content

Commit

Permalink
WatermarkStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Jul 22, 2020
1 parent ca5bc7b commit c88d660
Show file tree
Hide file tree
Showing 22 changed files with 165 additions and 185 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flink.tutorials.java.chapter5;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
Expand All @@ -9,13 +10,13 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Calendar;
import java.util.Random;

Expand All @@ -31,12 +32,11 @@ public static void main(String[] args) throws Exception {
// 数据流有三个字段:(key, 时间戳, 数值)
DataStream<Tuple3<String, Long, Integer>> input = env
.addSource(new MySource())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Integer>>(Time.seconds(5)) {
@Override
public long extractTimestamp(Tuple3<String, Long, Integer> element) {
return element.f1;
}
});
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Long, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.f1)
);

DataStream<Tuple4<String, String, Integer, String>> allowedLatenessStream = input.keyBy(item -> item.f0)
.timeWindow(Time.seconds(5))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package com.flink.tutorials.java.chapter5;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public class AssignWatermark {

Expand All @@ -32,50 +31,56 @@ public static void main(String[] args) throws Exception {
.returns(Types.TUPLE(Types.STRING, Types.LONG));

// 第二个字段是时间戳
DataStream<Tuple2<String, Long>> watermark = input.assignTimestampsAndWatermarks(new MyPeriodicAssigner());
DataStream boundedOutOfOrder = input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.minutes(1)) {
@Override
public long extractTimestamp(Tuple2<String, Long> element) {
return element.f1;
}
});
DataStream<Tuple2<String, Long>> watermark = input.assignTimestampsAndWatermarks(
WatermarkStrategy.forGenerator((context -> new MyPeriodicGenerator()))
.withTimestampAssigner((event, recordTimestamp) -> event.f1));

watermark.print();

env.execute("periodic and punctuated watermark");
}

public static class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
private long bound = 60 * 1000; // 1分钟
private long maxTs = Long.MIN_VALUE; // 已抽取的Timestamp最大值
// 定期生成Watermark
// 数据流元素 Tuple2<String, Long> 共两个字段
// 第一个字段为数据本身
// 第二个字段是时间戳
public static class MyPeriodicGenerator implements WatermarkGenerator<Tuple2<String, Long>> {

private final long maxOutOfOrderness = 60 * 1000; // 1分钟
private long currentMaxTimestamp; // 已抽取的Timestamp最大值

@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
// 更新maxTs为当前遇到的最大值
maxTs = Math.max(maxTs, element.f1);
return element.f1;
public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
// 更新currentMaxTimestamp为当前遇到的最大值
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}

@Override
public Watermark getCurrentWatermark() {
// Watermark比Timestamp最大值慢1分钟
Watermark watermark = new Watermark(maxTs - bound);
return watermark;
public void onPeriodicEmit(WatermarkOutput output) {
// Watermark比currentMaxTimestamp最大值慢1分钟
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
}

}

// 第二个字段是时间戳,第三个字段判断是否为Watermark的标记
public static class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<Tuple3<String, Long, Boolean>> {
// 逐个检查数据流中的元素,根据元素中的特殊字段,判断是否要生成Watermark
// 数据流元素 Tuple3<String, Long, Boolean> 共三个字段
// 第一个字段为数据本身
// 第二个字段是时间戳
// 第三个字段判断是否为Watermark的标记
public static class MyPunctuatedGenerator implements WatermarkGenerator<Tuple3<String, Long, Boolean>> {

@Override
public long extractTimestamp(Tuple3<String, Long, Boolean> element, long previousElementTimestamp) {
return element.f1;
public void onEvent(Tuple3<String, Long, Boolean> event, long eventTimestamp, WatermarkOutput output) {
if (event.f2) {
output.emitWatermark(new Watermark(event.f1));
}
}

@Override
public Watermark checkAndGetNextWatermark(Tuple3<String, Long, Boolean> element, long extractedTimestamp) {
if (element.f2) {
return new Watermark(extractedTimestamp);
} else {
return null;
}
public void onPeriodicEmit(WatermarkOutput output) {
// 这里不需要做任何事情,因为我们在 onEvent() 方法中生成了Watermark
}

}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package com.flink.tutorials.java.chapter5;

import akka.stream.impl.fusing.Reduce;
import com.flink.tutorials.java.utils.stock.StockPrice;
import com.flink.tutorials.java.utils.stock.StockSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package com.flink.tutorials.java.chapter5;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class IntervalJoinExample {

public static void main(String[] args) throws Exception {
Expand All @@ -33,12 +34,11 @@ public static void main(String[] args) throws Exception {
return Tuple3.of(id, ts, i);
})
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.INT))
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Integer>>(Time.minutes(1)) {
@Override
public long extractTimestamp(Tuple3<String, Long, Integer> element) {
return element.f1;
}
});
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Long, Integer>>forBoundedOutOfOrderness(Duration.ofMinutes(1))
.withTimestampAssigner((event, timestamp) -> event.f1));

DataStream<Tuple3<String, Long, Integer>> input2 = socketSource2.map(
line -> {
String[] arr = line.split(" ");
Expand All @@ -48,12 +48,10 @@ public long extractTimestamp(Tuple3<String, Long, Integer> element) {
return Tuple3.of(id, ts, i);
})
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.INT))
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Integer>>(Time.minutes(1)) {
@Override
public long extractTimestamp(Tuple3<String, Long, Integer> element) {
return element.f1;
}
});
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Long, Integer>>forBoundedOutOfOrderness(Duration.ofMinutes(1))
.withTimestampAssigner((event, timestamp) -> event.f1));

DataStream<String> intervalJoinResult = input1.keyBy(i -> i.f0)
.intervalJoin(input2.keyBy(i -> i.f0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.flink.tutorials.java.utils.stock.MediaSource;
import com.flink.tutorials.java.utils.stock.StockPrice;
import com.flink.tutorials.java.utils.stock.StockSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
Expand All @@ -12,7 +13,6 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.util.Collector;

public class KeyCoProcessFunctonExample {
Expand All @@ -27,27 +27,26 @@ public static void main(String[] args) throws Exception {
// 读入股票数据流
DataStream<StockPrice> stockStream = env
.addSource(new StockSource("stock/stock-tick-20200108.csv"))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<StockPrice>() {
@Override
public long extractAscendingTimestamp(StockPrice stockPrice) {
return stockPrice.ts;
}
});
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<StockPrice>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.ts)
);

// 读入媒体评价数据流
DataStream<Media> mediaStream = env
.addSource(new MediaSource())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Media>() {
@Override
public long extractAscendingTimestamp(Media media) {
return media.ts;
}
});
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Media>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.ts)
);

DataStream<StockPrice> joinStream = stockStream.connect(mediaStream)
.keyBy("symbol", "symbol")
// 调用process函数
.process(new JoinStockMediaProcessFunction());

joinStream.print();

env.execute("coprocess function");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.flink.tutorials.java.utils.stock.StockPrice;
import com.flink.tutorials.java.utils.stock.StockSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
Expand All @@ -11,7 +12,6 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

Expand All @@ -31,26 +31,25 @@ public static void main(String[] args) throws Exception {
// 读入数据流
DataStream<StockPrice> inputStream = env
.addSource(new StockSource("stock/stock-tick-20200108.csv"))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<StockPrice>() {
@Override
public long extractAscendingTimestamp(StockPrice stockPrice) {
return stockPrice.ts;
}
});
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<StockPrice>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.ts)
);

DataStream<String> warnings = inputStream
.keyBy(stock -> stock.symbol)
// 调用process函数
.process(new IncreaseAlertFunction(3000));

// warnings.print();
warnings.print();

OutputTag<StockPrice> outputTag = new OutputTag<StockPrice>("high-volume-trade") {};

SingleOutputStreamOperator<StockPrice> mainDataStream = (SingleOutputStreamOperator)inputStream;
DataStream<StockPrice>sideOutputStream = mainDataStream.getSideOutput(outputTag);

sideOutputStream.print();
// sideOutputStream.print();

env.execute("process function");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.flink.tutorials.java.chapter5;

import com.flink.tutorials.java.utils.stock.StockSource;
import com.flink.tutorials.java.utils.stock.StockPrice;
import com.flink.tutorials.java.utils.stock.StockSource;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.flink.tutorials.java.chapter5;

import com.flink.tutorials.java.utils.stock.StockSource;
import com.flink.tutorials.java.utils.stock.StockPrice;
import com.flink.tutorials.java.utils.stock.StockSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

Expand All @@ -23,12 +23,11 @@ public static void main(String[] args) throws Exception {

DataStream<StockPrice> inputStream = env
.addSource(new StockSource("stock/stock-tick-20200108.csv"))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<StockPrice>() {
@Override
public long extractAscendingTimestamp(StockPrice stockPrice) {
return stockPrice.ts;
}
});
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<StockPrice>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.ts)
);

SingleOutputStreamOperator<String> mainStream = inputStream
.keyBy(stock -> stock.symbol)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.util.Collector;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;

public class MapStateExample {

Expand Down
Loading

0 comments on commit c88d660

Please sign in to comment.