From c88d6600cbcc5e0930041853f276e85ae6e3c510 Mon Sep 17 00:00:00 2001 From: luweizheng Date: Wed, 22 Jul 2020 09:06:55 +0800 Subject: [PATCH] WatermarkStrategy --- .../java/chapter5/AllowLatenessExample.java | 14 ++-- .../java/chapter5/AssignWatermark.java | 71 ++++++++++--------- .../chapter5/IncrementalProcessExample.java | 3 - .../java/chapter5/IntervalJoinExample.java | 26 ++++--- .../chapter5/KeyCoProcessFunctonExample.java | 25 ++++--- .../java/chapter5/ProcessFunctionExample.java | 17 +++-- .../java/chapter5/ReduceFunctionExample.java | 2 +- .../java/chapter5/SideOutputExample.java | 15 ++-- .../java/chapter6/MapStateExample.java | 2 - .../java/chapter8/InsertExample.java | 18 ++--- .../java/chapter8/RegularJoinExample.java | 14 ++-- .../java/chapter8/ScalarFunctionExample.java | 26 +++---- .../java/chapter8/SystemFunctionExample.java | 14 ++-- .../java/chapter8/TableFunctionExample.java | 13 ++-- .../chapter8/TemporalTableJoinExample.java | 25 ++++--- .../java/chapter8/TimeWindowJoinExample.java | 26 +++---- .../chapter8/UserBehaviorFromDataStream.java | 20 ++---- .../java/chapter8/WeightedAggExample.java | 13 ++-- .../java/projects/iot/IoTSQLDemo.java | 1 - .../java/projects/stock/StockPriceDemo.java | 2 +- .../taobao/UserBehaviorKafkaProducer.java | 1 - .../wordcount/WordCountKafkaInStdOut.java | 2 - 22 files changed, 165 insertions(+), 185 deletions(-) diff --git a/src/main/java/com/flink/tutorials/java/chapter5/AllowLatenessExample.java b/src/main/java/com/flink/tutorials/java/chapter5/AllowLatenessExample.java index 03fb64e..ddf3934 100644 --- a/src/main/java/com/flink/tutorials/java/chapter5/AllowLatenessExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter5/AllowLatenessExample.java @@ -1,5 +1,6 @@ package com.flink.tutorials.java.chapter5; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; @@ -9,13 +10,13 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; +import java.time.Duration; import java.util.Calendar; import java.util.Random; @@ -31,12 +32,11 @@ public static void main(String[] args) throws Exception { // 数据流有三个字段:(key, 时间戳, 数值) DataStream> input = env .addSource(new MySource()) - .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor>(Time.seconds(5)) { - @Override - public long extractTimestamp(Tuple3 element) { - return element.f1; - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forBoundedOutOfOrderness(Duration.ofSeconds(5)) + .withTimestampAssigner((event, timestamp) -> event.f1) + ); DataStream> allowedLatenessStream = input.keyBy(item -> item.f0) .timeWindow(Time.seconds(5)) diff --git a/src/main/java/com/flink/tutorials/java/chapter5/AssignWatermark.java b/src/main/java/com/flink/tutorials/java/chapter5/AssignWatermark.java index ac43d75..fe6d2d1 100644 --- a/src/main/java/com/flink/tutorials/java/chapter5/AssignWatermark.java +++ b/src/main/java/com/flink/tutorials/java/chapter5/AssignWatermark.java @@ -1,16 +1,15 @@ package com.flink.tutorials.java.chapter5; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.api.windowing.time.Time; public class AssignWatermark { @@ -32,50 +31,56 @@ public static void main(String[] args) throws Exception { .returns(Types.TUPLE(Types.STRING, Types.LONG)); // 第二个字段是时间戳 - DataStream> watermark = input.assignTimestampsAndWatermarks(new MyPeriodicAssigner()); - DataStream boundedOutOfOrder = input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor>(Time.minutes(1)) { - @Override - public long extractTimestamp(Tuple2 element) { - return element.f1; - } - }); + DataStream> watermark = input.assignTimestampsAndWatermarks( + WatermarkStrategy.forGenerator((context -> new MyPeriodicGenerator())) + .withTimestampAssigner((event, recordTimestamp) -> event.f1)); + + watermark.print(); env.execute("periodic and punctuated watermark"); } - public static class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks> { - private long bound = 60 * 1000; // 1分钟 - private long maxTs = Long.MIN_VALUE; // 已抽取的Timestamp最大值 + // 定期生成Watermark + // 数据流元素 Tuple2 共两个字段 + // 第一个字段为数据本身 + // 第二个字段是时间戳 + public static class MyPeriodicGenerator implements WatermarkGenerator> { + + private final long maxOutOfOrderness = 60 * 1000; // 1分钟 + private long currentMaxTimestamp; // 已抽取的Timestamp最大值 @Override - public long extractTimestamp(Tuple2 element, long previousElementTimestamp) { - // 更新maxTs为当前遇到的最大值 - maxTs = Math.max(maxTs, element.f1); - return element.f1; + public void onEvent(Tuple2 event, long eventTimestamp, WatermarkOutput output) { + // 更新currentMaxTimestamp为当前遇到的最大值 + currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp); } @Override - public Watermark getCurrentWatermark() { - // Watermark比Timestamp最大值慢1分钟 - Watermark watermark = new Watermark(maxTs - bound); - return watermark; + public void onPeriodicEmit(WatermarkOutput output) { + // Watermark比currentMaxTimestamp最大值慢1分钟 + output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness)); } + } - // 第二个字段是时间戳,第三个字段判断是否为Watermark的标记 - public static class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks> { + // 逐个检查数据流中的元素,根据元素中的特殊字段,判断是否要生成Watermark + // 数据流元素 Tuple3 共三个字段 + // 第一个字段为数据本身 + // 第二个字段是时间戳 + // 第三个字段判断是否为Watermark的标记 + public static class MyPunctuatedGenerator implements WatermarkGenerator> { + @Override - public long extractTimestamp(Tuple3 element, long previousElementTimestamp) { - return element.f1; + public void onEvent(Tuple3 event, long eventTimestamp, WatermarkOutput output) { + if (event.f2) { + output.emitWatermark(new Watermark(event.f1)); + } } @Override - public Watermark checkAndGetNextWatermark(Tuple3 element, long extractedTimestamp) { - if (element.f2) { - return new Watermark(extractedTimestamp); - } else { - return null; - } + public void onPeriodicEmit(WatermarkOutput output) { + // 这里不需要做任何事情,因为我们在 onEvent() 方法中生成了Watermark } + } } diff --git a/src/main/java/com/flink/tutorials/java/chapter5/IncrementalProcessExample.java b/src/main/java/com/flink/tutorials/java/chapter5/IncrementalProcessExample.java index 8116628..8d22b07 100644 --- a/src/main/java/com/flink/tutorials/java/chapter5/IncrementalProcessExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter5/IncrementalProcessExample.java @@ -1,12 +1,9 @@ package com.flink.tutorials.java.chapter5; -import akka.stream.impl.fusing.Reduce; import com.flink.tutorials.java.utils.stock.StockPrice; import com.flink.tutorials.java.utils.stock.StockSource; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; diff --git a/src/main/java/com/flink/tutorials/java/chapter5/IntervalJoinExample.java b/src/main/java/com/flink/tutorials/java/chapter5/IntervalJoinExample.java index f7b3032..3c45b3a 100644 --- a/src/main/java/com/flink/tutorials/java/chapter5/IntervalJoinExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter5/IntervalJoinExample.java @@ -1,16 +1,17 @@ package com.flink.tutorials.java.chapter5; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; -import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; +import java.time.Duration; + public class IntervalJoinExample { public static void main(String[] args) throws Exception { @@ -33,12 +34,11 @@ public static void main(String[] args) throws Exception { return Tuple3.of(id, ts, i); }) .returns(Types.TUPLE(Types.STRING, Types.LONG, Types.INT)) - .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor>(Time.minutes(1)) { - @Override - public long extractTimestamp(Tuple3 element) { - return element.f1; - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forBoundedOutOfOrderness(Duration.ofMinutes(1)) + .withTimestampAssigner((event, timestamp) -> event.f1)); + DataStream> input2 = socketSource2.map( line -> { String[] arr = line.split(" "); @@ -48,12 +48,10 @@ public long extractTimestamp(Tuple3 element) { return Tuple3.of(id, ts, i); }) .returns(Types.TUPLE(Types.STRING, Types.LONG, Types.INT)) - .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor>(Time.minutes(1)) { - @Override - public long extractTimestamp(Tuple3 element) { - return element.f1; - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forBoundedOutOfOrderness(Duration.ofMinutes(1)) + .withTimestampAssigner((event, timestamp) -> event.f1)); DataStream intervalJoinResult = input1.keyBy(i -> i.f0) .intervalJoin(input2.keyBy(i -> i.f0)) diff --git a/src/main/java/com/flink/tutorials/java/chapter5/KeyCoProcessFunctonExample.java b/src/main/java/com/flink/tutorials/java/chapter5/KeyCoProcessFunctonExample.java index 34c25fc..8a70710 100644 --- a/src/main/java/com/flink/tutorials/java/chapter5/KeyCoProcessFunctonExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter5/KeyCoProcessFunctonExample.java @@ -4,6 +4,7 @@ import com.flink.tutorials.java.utils.stock.MediaSource; import com.flink.tutorials.java.utils.stock.StockPrice; import com.flink.tutorials.java.utils.stock.StockSource; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; @@ -12,7 +13,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.util.Collector; public class KeyCoProcessFunctonExample { @@ -27,27 +27,26 @@ public static void main(String[] args) throws Exception { // 读入股票数据流 DataStream stockStream = env .addSource(new StockSource("stock/stock-tick-20200108.csv")) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { - @Override - public long extractAscendingTimestamp(StockPrice stockPrice) { - return stockPrice.ts; - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.ts) + ); // 读入媒体评价数据流 DataStream mediaStream = env .addSource(new MediaSource()) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { - @Override - public long extractAscendingTimestamp(Media media) { - return media.ts; - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.ts) + ); DataStream joinStream = stockStream.connect(mediaStream) .keyBy("symbol", "symbol") // 调用process函数 .process(new JoinStockMediaProcessFunction()); + joinStream.print(); env.execute("coprocess function"); diff --git a/src/main/java/com/flink/tutorials/java/chapter5/ProcessFunctionExample.java b/src/main/java/com/flink/tutorials/java/chapter5/ProcessFunctionExample.java index 6677737..30566f7 100644 --- a/src/main/java/com/flink/tutorials/java/chapter5/ProcessFunctionExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter5/ProcessFunctionExample.java @@ -2,6 +2,7 @@ import com.flink.tutorials.java.utils.stock.StockPrice; import com.flink.tutorials.java.utils.stock.StockSource; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; @@ -11,7 +12,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; @@ -31,26 +31,25 @@ public static void main(String[] args) throws Exception { // 读入数据流 DataStream inputStream = env .addSource(new StockSource("stock/stock-tick-20200108.csv")) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { - @Override - public long extractAscendingTimestamp(StockPrice stockPrice) { - return stockPrice.ts; - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.ts) + ); DataStream warnings = inputStream .keyBy(stock -> stock.symbol) // 调用process函数 .process(new IncreaseAlertFunction(3000)); -// warnings.print(); + warnings.print(); OutputTag outputTag = new OutputTag("high-volume-trade") {}; SingleOutputStreamOperator mainDataStream = (SingleOutputStreamOperator)inputStream; DataStreamsideOutputStream = mainDataStream.getSideOutput(outputTag); - sideOutputStream.print(); +// sideOutputStream.print(); env.execute("process function"); } diff --git a/src/main/java/com/flink/tutorials/java/chapter5/ReduceFunctionExample.java b/src/main/java/com/flink/tutorials/java/chapter5/ReduceFunctionExample.java index 7f7a7e2..8fb1469 100644 --- a/src/main/java/com/flink/tutorials/java/chapter5/ReduceFunctionExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter5/ReduceFunctionExample.java @@ -1,7 +1,7 @@ package com.flink.tutorials.java.chapter5; -import com.flink.tutorials.java.utils.stock.StockSource; import com.flink.tutorials.java.utils.stock.StockPrice; +import com.flink.tutorials.java.utils.stock.StockSource; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/src/main/java/com/flink/tutorials/java/chapter5/SideOutputExample.java b/src/main/java/com/flink/tutorials/java/chapter5/SideOutputExample.java index 455445f..b7eee86 100644 --- a/src/main/java/com/flink/tutorials/java/chapter5/SideOutputExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter5/SideOutputExample.java @@ -1,13 +1,13 @@ package com.flink.tutorials.java.chapter5; -import com.flink.tutorials.java.utils.stock.StockSource; import com.flink.tutorials.java.utils.stock.StockPrice; +import com.flink.tutorials.java.utils.stock.StockSource; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; @@ -23,12 +23,11 @@ public static void main(String[] args) throws Exception { DataStream inputStream = env .addSource(new StockSource("stock/stock-tick-20200108.csv")) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { - @Override - public long extractAscendingTimestamp(StockPrice stockPrice) { - return stockPrice.ts; - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.ts) + ); SingleOutputStreamOperator mainStream = inputStream .keyBy(stock -> stock.symbol) diff --git a/src/main/java/com/flink/tutorials/java/chapter6/MapStateExample.java b/src/main/java/com/flink/tutorials/java/chapter6/MapStateExample.java index 2445133..5085dc8 100644 --- a/src/main/java/com/flink/tutorials/java/chapter6/MapStateExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter6/MapStateExample.java @@ -8,14 +8,12 @@ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.util.Collector; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; public class MapStateExample { diff --git a/src/main/java/com/flink/tutorials/java/chapter8/InsertExample.java b/src/main/java/com/flink/tutorials/java/chapter8/InsertExample.java index d9390b3..8ec5e9b 100644 --- a/src/main/java/com/flink/tutorials/java/chapter8/InsertExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter8/InsertExample.java @@ -1,10 +1,10 @@ package com.flink.tutorials.java.chapter8; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -33,17 +33,17 @@ public static void main(String[] args) throws Exception { DataStream> userBehaviorStream = env .fromCollection(userBehaviorData) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { - @Override - public long extractAscendingTimestamp(Tuple4 element) { - return element.f3.getTime(); - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.f3.getTime()) + ); + Table userBehaviorTable = tEnv.fromDataStream(userBehaviorStream, "user_id, item_id, behavior,ts.rowtime"); tEnv.createTemporaryView("user_behavior", userBehaviorTable); - tEnv.sqlUpdate("CREATE TABLE behavior_cnt (\n" + + tEnv.executeSql("CREATE TABLE behavior_cnt (\n" + " user_id BIGINT,\n" + " cnt BIGINT" + ") WITH (\n" + @@ -52,7 +52,7 @@ public long extractAscendingTimestamp(Tuple4 elem " 'format.type' = 'csv' -- 数据源格式为 json\n" + ")"); - tEnv.sqlUpdate("INSERT INTO behavior_cnt SELECT user_id, COUNT(behavior) AS cnt FROM user_behavior GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)"); + tEnv.executeSql("INSERT INTO behavior_cnt SELECT user_id, COUNT(behavior) AS cnt FROM user_behavior GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)"); env.execute("table api"); } diff --git a/src/main/java/com/flink/tutorials/java/chapter8/RegularJoinExample.java b/src/main/java/com/flink/tutorials/java/chapter8/RegularJoinExample.java index 4a012a2..d0b7046 100644 --- a/src/main/java/com/flink/tutorials/java/chapter8/RegularJoinExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter8/RegularJoinExample.java @@ -1,11 +1,11 @@ package com.flink.tutorials.java.chapter8; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -42,12 +42,12 @@ public static void main(String[] args) throws Exception { DataStream> userBehaviorStream = env .fromCollection(userBehaviorData) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { - @Override - public long extractAscendingTimestamp(Tuple4 element) { - return element.f3.getTime(); - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.f3.getTime()) + ); + Table userBehaviorTable = tEnv.fromDataStream(userBehaviorStream, "user_id, item_id, behavior, ts.rowtime"); tEnv.createTemporaryView("user_behavior", userBehaviorTable); diff --git a/src/main/java/com/flink/tutorials/java/chapter8/ScalarFunctionExample.java b/src/main/java/com/flink/tutorials/java/chapter8/ScalarFunctionExample.java index f0ceea5..1d87ffd 100644 --- a/src/main/java/com/flink/tutorials/java/chapter8/ScalarFunctionExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter8/ScalarFunctionExample.java @@ -2,11 +2,11 @@ import com.flink.tutorials.java.chapter8.function.IsInFourRing; import com.flink.tutorials.java.chapter8.function.TimeDiff; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -34,12 +34,12 @@ public static void main(String[] args) throws Exception { DataStream> geoStream = env .fromCollection(geoList) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { - @Override - public long extractAscendingTimestamp(Tuple4 element) { - return element.f3.getTime(); - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.f3.getTime()) + ); + Table geoTable = tEnv.fromDataStream(geoStream, "id, long, alt, ts.rowtime, proc.proctime"); tEnv.createTemporaryView("geo", geoTable); @@ -63,12 +63,12 @@ public long extractAscendingTimestamp(Tuple4 el DataStream> geoStrStream = env .fromCollection(geoStrList) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { - @Override - public long extractAscendingTimestamp(Tuple4 element) { - return element.f3.getTime(); - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.f3.getTime()) + ); + Table geoStrTable = tEnv.fromDataStream(geoStrStream, "id, long, alt, ts.rowtime, proc.proctime"); tEnv.createTemporaryView("geo_str", geoStrTable); diff --git a/src/main/java/com/flink/tutorials/java/chapter8/SystemFunctionExample.java b/src/main/java/com/flink/tutorials/java/chapter8/SystemFunctionExample.java index 8d079ae..55d9582 100644 --- a/src/main/java/com/flink/tutorials/java/chapter8/SystemFunctionExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter8/SystemFunctionExample.java @@ -1,10 +1,10 @@ package com.flink.tutorials.java.chapter8; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -34,12 +34,12 @@ public static void main(String[] args) throws Exception { DataStream> userBehaviorStream = env .fromCollection(userBehaviorData) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { - @Override - public long extractAscendingTimestamp(Tuple4 element) { - return element.f3.getTime(); - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.f3.getTime()) + ); + Table userBehaviorTable = tEnv.fromDataStream(userBehaviorStream, "user_id, item_id, behavior,ts.rowtime"); tEnv.createTemporaryView("user_behavior", userBehaviorTable); diff --git a/src/main/java/com/flink/tutorials/java/chapter8/TableFunctionExample.java b/src/main/java/com/flink/tutorials/java/chapter8/TableFunctionExample.java index fcf7ab5..b899aab 100644 --- a/src/main/java/com/flink/tutorials/java/chapter8/TableFunctionExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter8/TableFunctionExample.java @@ -1,11 +1,11 @@ package com.flink.tutorials.java.chapter8; import com.flink.tutorials.java.chapter8.function.TableFunc; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -32,12 +32,11 @@ public static void main(String[] args) throws Exception { DataStream> stream = env .fromCollection(list) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { - @Override - public long extractAscendingTimestamp(Tuple4 element) { - return element.f3.getTime(); - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.f3.getTime()) + ); Table table = tEnv.fromDataStream(stream, "id, long, str, ts.rowtime"); diff --git a/src/main/java/com/flink/tutorials/java/chapter8/TemporalTableJoinExample.java b/src/main/java/com/flink/tutorials/java/chapter8/TemporalTableJoinExample.java index 68cd751..00c4bb7 100644 --- a/src/main/java/com/flink/tutorials/java/chapter8/TemporalTableJoinExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter8/TemporalTableJoinExample.java @@ -1,11 +1,11 @@ package com.flink.tutorials.java.chapter8; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -42,23 +42,22 @@ public static void main(String[] args) throws Exception { DataStream> userBehaviorStream = env .fromCollection(userBehaviorData) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { - @Override - public long extractAscendingTimestamp(Tuple4 element) { - return element.f3.getTime(); - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.f3.getTime()) + ); + Table userBehaviorTable = tEnv.fromDataStream(userBehaviorStream, "user_id, item_id, behavior,ts.rowtime"); tEnv.createTemporaryView("user_behavior", userBehaviorTable); DataStream> itemStream = env .fromCollection(itemData) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { - @Override - public long extractAscendingTimestamp(Tuple3 element) { - return element.f2.getTime(); - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.f2.getTime()) + ); Table itemTable = tEnv.fromDataStream(itemStream, "item_id, price, versionTs.rowtime"); // 注册 Temporal Table Function diff --git a/src/main/java/com/flink/tutorials/java/chapter8/TimeWindowJoinExample.java b/src/main/java/com/flink/tutorials/java/chapter8/TimeWindowJoinExample.java index fbb5fb0..24ae903 100644 --- a/src/main/java/com/flink/tutorials/java/chapter8/TimeWindowJoinExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter8/TimeWindowJoinExample.java @@ -1,11 +1,11 @@ package com.flink.tutorials.java.chapter8; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -42,23 +42,23 @@ public static void main(String[] args) throws Exception { DataStream> userBehaviorStream = env .fromCollection(userBehaviorData) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { - @Override - public long extractAscendingTimestamp(Tuple4 element) { - return element.f3.getTime(); - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.f3.getTime()) + ); + Table userBehaviorTable = tEnv.fromDataStream(userBehaviorStream, "user_id, item_id, behavior,ts.rowtime"); tEnv.createTemporaryView("user_behavior", userBehaviorTable); DataStream> chatStream = env .fromCollection(chatData) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { - @Override - public long extractAscendingTimestamp(Tuple3 element) { - return element.f2.getTime(); - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.f2.getTime()) + ); + Table chatTable = tEnv.fromDataStream(chatStream, "buyer_id, item_id, ts.rowtime"); tEnv.createTemporaryView("chat", chatTable); diff --git a/src/main/java/com/flink/tutorials/java/chapter8/UserBehaviorFromDataStream.java b/src/main/java/com/flink/tutorials/java/chapter8/UserBehaviorFromDataStream.java index c703e09..ed27720 100644 --- a/src/main/java/com/flink/tutorials/java/chapter8/UserBehaviorFromDataStream.java +++ b/src/main/java/com/flink/tutorials/java/chapter8/UserBehaviorFromDataStream.java @@ -2,20 +2,14 @@ import com.flink.tutorials.java.utils.taobao.UserBehavior; import com.flink.tutorials.java.utils.taobao.UserBehaviorSource; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; -import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.descriptors.Csv; -import org.apache.flink.table.descriptors.FileSystem; -import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; public class UserBehaviorFromDataStream { @@ -31,13 +25,11 @@ public static void main(String[] args) throws Exception { DataStream userBehaviorDataStream = env .addSource(new UserBehaviorSource("taobao/UserBehavior-20171201.csv")) // 在DataStream里设置时间戳和Watermark - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { - @Override - public long extractAscendingTimestamp(UserBehavior userBehavior) { - // 原始数据单位为秒,乘以1000转换成毫秒 - return userBehavior.timestamp * 1000; - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.timestamp) + ); tEnv.createTemporaryView("user_behavior", userBehaviorDataStream, "userId as user_id, itemId as item_id, categoryId as category_id, behavior, ts.rowtime"); diff --git a/src/main/java/com/flink/tutorials/java/chapter8/WeightedAggExample.java b/src/main/java/com/flink/tutorials/java/chapter8/WeightedAggExample.java index f4dfbe1..933dffe 100644 --- a/src/main/java/com/flink/tutorials/java/chapter8/WeightedAggExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter8/WeightedAggExample.java @@ -1,12 +1,12 @@ package com.flink.tutorials.java.chapter8; import com.flink.tutorials.java.chapter8.function.WeightedAvg; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -33,12 +33,11 @@ public static void main(String[] args) throws Exception { DataStream> stream = env .fromCollection(list) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { - @Override - public long extractAscendingTimestamp(Tuple4 element) { - return element.f3.getTime(); - } - }); + .assignTimestampsAndWatermarks( + WatermarkStrategy + .>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.f3.getTime()) + ); Table table = tEnv.fromDataStream(stream, "id, v, w, ts.rowtime"); diff --git a/src/main/java/com/flink/tutorials/java/projects/iot/IoTSQLDemo.java b/src/main/java/com/flink/tutorials/java/projects/iot/IoTSQLDemo.java index 0ce63c1..9a7186e 100644 --- a/src/main/java/com/flink/tutorials/java/projects/iot/IoTSQLDemo.java +++ b/src/main/java/com/flink/tutorials/java/projects/iot/IoTSQLDemo.java @@ -5,7 +5,6 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.types.Row; public class IoTSQLDemo { diff --git a/src/main/java/com/flink/tutorials/java/projects/stock/StockPriceDemo.java b/src/main/java/com/flink/tutorials/java/projects/stock/StockPriceDemo.java index 718498d..b25ee5a 100644 --- a/src/main/java/com/flink/tutorials/java/projects/stock/StockPriceDemo.java +++ b/src/main/java/com/flink/tutorials/java/projects/stock/StockPriceDemo.java @@ -1,7 +1,7 @@ package com.flink.tutorials.java.projects.stock; -import com.flink.tutorials.java.utils.stock.StockSource; import com.flink.tutorials.java.utils.stock.StockPrice; +import com.flink.tutorials.java.utils.stock.StockSource; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/src/main/java/com/flink/tutorials/java/projects/taobao/UserBehaviorKafkaProducer.java b/src/main/java/com/flink/tutorials/java/projects/taobao/UserBehaviorKafkaProducer.java index e4b4cde..a56dcff 100644 --- a/src/main/java/com/flink/tutorials/java/projects/taobao/UserBehaviorKafkaProducer.java +++ b/src/main/java/com/flink/tutorials/java/projects/taobao/UserBehaviorKafkaProducer.java @@ -11,7 +11,6 @@ import java.io.*; import java.time.Instant; import java.util.Properties; -import java.util.TimeZone; import java.util.function.Consumer; public class UserBehaviorKafkaProducer { diff --git a/src/main/java/com/flink/tutorials/java/projects/wordcount/WordCountKafkaInStdOut.java b/src/main/java/com/flink/tutorials/java/projects/wordcount/WordCountKafkaInStdOut.java index d2d60a7..f8aa60f 100644 --- a/src/main/java/com/flink/tutorials/java/projects/wordcount/WordCountKafkaInStdOut.java +++ b/src/main/java/com/flink/tutorials/java/projects/wordcount/WordCountKafkaInStdOut.java @@ -3,8 +3,6 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.RestOptions; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time;