From 76cdba2d2571ba2edb57b9c55c1a1fbd6f7045d0 Mon Sep 17 00:00:00 2001 From: luweizheng Date: Mon, 20 Jan 2020 05:19:57 +0800 Subject: [PATCH] time & type --- .../tutorials/java/api/types/StockPrice.java | 14 ++ .../tutorials/java/api/types/StockPrice1.java | 22 +++ .../tutorials/java/api/types/StockPrice2.java | 17 +++ .../java/api/types/TupleExample.java | 27 ++++ .../tutorials/java/api/types/TypeCheck.java | 16 +++ .../api/transformations/FlatMapExample.scala | 34 ----- .../api/time/AggregateFunctionExample.scala | 55 ++++++++ .../scala/api/time/CoGroupExample.scala | 63 +++++++++ .../api/time/IncrementalProcessExample.scala | 61 ++++++++ .../scala/api/time/IntervalJoinExample.scala | 73 ++++++++++ .../scala/api/time/JoinExample.scala | 58 ++++++++ .../scala/api/time/LateExample.scala | 133 ++++++++++++++++++ .../scala/api/time/PeriodicWatermark.scala | 84 +++++++++++ .../time/ProcessWindowFunctionExample.scala | 65 +++++++++ .../api/time/ReduceFunctionExample.scala | 39 +++++ .../scala/api/time/TriggerExample.scala | 102 ++++++++++++++ .../transformations/AggregationExample.scala | 2 +- .../api/transformations/FilterExample.scala | 2 +- .../api/transformations/FlatMapExample.scala | 101 +++++++++++++ .../api/transformations/KeyByExample.scala | 14 +- .../api/transformations/MapExample.scala | 2 +- .../PartitionCustomExample.scala | 57 ++++++++ .../api/transformations/ReduceExample.scala | 2 +- .../SimpleConnectExample.scala | 2 +- .../scala/api/types/TupleExample.scala | 21 +++ .../demos/stock/StockMediaConnectedDemo.scala | 4 +- .../demos/stock/StockPriceDemo.scala | 2 +- .../demos/wikiedits/WikipediaAnalysis.scala | 2 +- 28 files changed, 1028 insertions(+), 46 deletions(-) create mode 100644 src/main/java/com/flink/tutorials/java/api/types/StockPrice.java create mode 100644 src/main/java/com/flink/tutorials/java/api/types/StockPrice1.java create mode 100644 src/main/java/com/flink/tutorials/java/api/types/StockPrice2.java create mode 100644 src/main/java/com/flink/tutorials/java/api/types/TupleExample.java create mode 100644 src/main/java/com/flink/tutorials/java/api/types/TypeCheck.java delete mode 100644 src/main/scala/com/flink/tutorials/api/transformations/FlatMapExample.scala create mode 100644 src/main/scala/com/flink/tutorials/scala/api/time/AggregateFunctionExample.scala create mode 100644 src/main/scala/com/flink/tutorials/scala/api/time/CoGroupExample.scala create mode 100644 src/main/scala/com/flink/tutorials/scala/api/time/IncrementalProcessExample.scala create mode 100644 src/main/scala/com/flink/tutorials/scala/api/time/IntervalJoinExample.scala create mode 100644 src/main/scala/com/flink/tutorials/scala/api/time/JoinExample.scala create mode 100644 src/main/scala/com/flink/tutorials/scala/api/time/LateExample.scala create mode 100644 src/main/scala/com/flink/tutorials/scala/api/time/PeriodicWatermark.scala create mode 100644 src/main/scala/com/flink/tutorials/scala/api/time/ProcessWindowFunctionExample.scala create mode 100644 src/main/scala/com/flink/tutorials/scala/api/time/ReduceFunctionExample.scala create mode 100644 src/main/scala/com/flink/tutorials/scala/api/time/TriggerExample.scala rename src/main/scala/com/flink/tutorials/{ => scala}/api/transformations/AggregationExample.scala (95%) rename src/main/scala/com/flink/tutorials/{ => scala}/api/transformations/FilterExample.scala (95%) create mode 100644 src/main/scala/com/flink/tutorials/scala/api/transformations/FlatMapExample.scala rename src/main/scala/com/flink/tutorials/{ => scala}/api/transformations/KeyByExample.scala (52%) rename src/main/scala/com/flink/tutorials/{ => scala}/api/transformations/MapExample.scala (96%) create mode 100644 src/main/scala/com/flink/tutorials/scala/api/transformations/PartitionCustomExample.scala rename src/main/scala/com/flink/tutorials/{ => scala}/api/transformations/ReduceExample.scala (95%) rename src/main/scala/com/flink/tutorials/{ => scala}/api/transformations/SimpleConnectExample.scala (94%) create mode 100644 src/main/scala/com/flink/tutorials/scala/api/types/TupleExample.scala rename src/main/scala/com/flink/tutorials/{ => scala}/demos/stock/StockMediaConnectedDemo.scala (95%) rename src/main/scala/com/flink/tutorials/{ => scala}/demos/stock/StockPriceDemo.scala (98%) rename src/main/scala/com/flink/tutorials/{ => scala}/demos/wikiedits/WikipediaAnalysis.scala (92%) diff --git a/src/main/java/com/flink/tutorials/java/api/types/StockPrice.java b/src/main/java/com/flink/tutorials/java/api/types/StockPrice.java new file mode 100644 index 0000000..b4b0ae9 --- /dev/null +++ b/src/main/java/com/flink/tutorials/java/api/types/StockPrice.java @@ -0,0 +1,14 @@ +package com.flink.tutorials.java.api.types; + +public class StockPrice { + public String symbol; + public Long timestamp; + public Double price; + + public StockPrice() {} + public StockPrice(String symbol, Long timestamp, Double price){ + this.symbol = symbol; + this.timestamp = timestamp; + this.price = price; + } +} diff --git a/src/main/java/com/flink/tutorials/java/api/types/StockPrice1.java b/src/main/java/com/flink/tutorials/java/api/types/StockPrice1.java new file mode 100644 index 0000000..de304d9 --- /dev/null +++ b/src/main/java/com/flink/tutorials/java/api/types/StockPrice1.java @@ -0,0 +1,22 @@ +package com.flink.tutorials.java.api.types; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// NOT POJO +public class StockPrice1 { + + // LOGGER 无getter和setter + private Logger LOGGER = LoggerFactory.getLogger(StockPrice1.class); + + public String symbol; + public Long timestamp; + public Double price; + + public StockPrice1() {} + public StockPrice1(String symbol, Long timestamp, Double price){ + this.symbol = symbol; + this.timestamp = timestamp; + this.price = price; + } +} \ No newline at end of file diff --git a/src/main/java/com/flink/tutorials/java/api/types/StockPrice2.java b/src/main/java/com/flink/tutorials/java/api/types/StockPrice2.java new file mode 100644 index 0000000..d329f7e --- /dev/null +++ b/src/main/java/com/flink/tutorials/java/api/types/StockPrice2.java @@ -0,0 +1,17 @@ +package com.flink.tutorials.java.api.types; + +// NOT POJO +public class StockPrice2 { + + public String symbol; + public Long timestamp; + public Double price; + + // 缺少无参数构造函数 + + public StockPrice2(String symbol, Long timestamp, Double price){ + this.symbol = symbol; + this.timestamp = timestamp; + this.price = price; + } +} diff --git a/src/main/java/com/flink/tutorials/java/api/types/TupleExample.java b/src/main/java/com/flink/tutorials/java/api/types/TupleExample.java new file mode 100644 index 0000000..2bdd7b6 --- /dev/null +++ b/src/main/java/com/flink/tutorials/java/api/types/TupleExample.java @@ -0,0 +1,27 @@ +package com.flink.tutorials.java.api.types; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +public class TupleExample { + + // Java Tuple Example + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> dataStream = senv.fromElements( + Tuple3.of("0001", 0L, 121.2), + Tuple3.of("0002" ,1L, 201.8), + Tuple3.of("0003", 2L, 10.3), + Tuple3.of("0004", 3L, 99.6) + ); + + dataStream.filter(item -> item.f2 > 100).print(); + + dataStream.filter(item -> ((Double)item.getField(2) > 100)).print(); + + senv.execute("java tuple"); + } +} diff --git a/src/main/java/com/flink/tutorials/java/api/types/TypeCheck.java b/src/main/java/com/flink/tutorials/java/api/types/TypeCheck.java new file mode 100644 index 0000000..33b1691 --- /dev/null +++ b/src/main/java/com/flink/tutorials/java/api/types/TypeCheck.java @@ -0,0 +1,16 @@ +package com.flink.tutorials.java.api.types; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +public class TypeCheck { + + public static void main(String[] args) { + + System.out.println(TypeInformation.of(StockPrice.class).createSerializer(new ExecutionConfig())); + + System.out.println(TypeInformation.of(StockPrice1.class).createSerializer(new ExecutionConfig())); + + System.out.println(TypeInformation.of(StockPrice2.class).createSerializer(new ExecutionConfig())); + } +} diff --git a/src/main/scala/com/flink/tutorials/api/transformations/FlatMapExample.scala b/src/main/scala/com/flink/tutorials/api/transformations/FlatMapExample.scala deleted file mode 100644 index d57692b..0000000 --- a/src/main/scala/com/flink/tutorials/api/transformations/FlatMapExample.scala +++ /dev/null @@ -1,34 +0,0 @@ -package com.flink.tutorials.api.transformations - -import org.apache.flink.streaming.api.scala._ - -object FlatMapExample { - - def main(args: Array[String]): Unit = { - // 创建 Flink 执行环境 - val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment - - val dataStream: DataStream[String] = senv.fromElements("Hello World", "Hello this is Flink") - - // split函数的输入为 "Hello World" 输出为 "Hello" 和 "World" 组成的列表 ["Hello", "World"] - // flatMap将列表中每个元素提取出来 - // 最后输出为 ["Hello", "World", "Hello", "this", "is", "Flink"] - val words = dataStream.flatMap ( input => input.split(" ") ) - - val words2 = dataStream.map { _.split(" ") } - - // 只对字符串数量大于15的句子进行处理 - val longSentenceWords = dataStream.flatMap { - input => { - if (input.size > 15) { - input.split(" ") - } else { - Seq.empty - } - } - } - - senv.execute("basic flatMap transformation") - } - -} diff --git a/src/main/scala/com/flink/tutorials/scala/api/time/AggregateFunctionExample.scala b/src/main/scala/com/flink/tutorials/scala/api/time/AggregateFunctionExample.scala new file mode 100644 index 0000000..a268c42 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/time/AggregateFunctionExample.scala @@ -0,0 +1,55 @@ +package com.flink.tutorials.scala.api.time + +import org.apache.flink.api.common.functions.AggregateFunction +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.util.Collector + +object AggregateFunctionExample { + + case class StockPrice(symbol: String, price: Double) + + // IN: StockPrice + // ACC:(String, Double, Int) - (symbol, sum, count) + // OUT: (String, Double) - (symbol, average) + class AverageAggregate extends AggregateFunction[StockPrice, (String, Double, Int), (String, Double)] { + + override def createAccumulator() = ("", 0, 0) + + override def add(item: StockPrice, accumulator: (String, Double, Int)) = + (item.symbol, accumulator._2 + item.price, accumulator._3 + 1) + + override def getResult(accumulator:(String, Double, Int)) = (accumulator._1 ,accumulator._2 / accumulator._3) + + override def merge(a: (String, Double, Int), b: (String, Double, Int)) = + (a._1 ,a._2 + b._2, a._3 + b._3) + } + + def main(args: Array[String]): Unit = { + + val senv = StreamExecutionEnvironment.getExecutionEnvironment + senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val socketSource = senv.socketTextStream("localhost", 9000) + + val input: DataStream[StockPrice] = socketSource.flatMap { + (line: String, out: Collector[StockPrice]) => { + val array = line.split(" ") + if (array.size == 2) { + out.collect(StockPrice(array(0), array(1).toDouble)) + } + } + } + + val average = input + .keyBy(s => s.symbol) + .timeWindow(Time.seconds(10)) + .aggregate(new AverageAggregate) + + average.print() + + senv.execute("window aggregate function") + } + +} diff --git a/src/main/scala/com/flink/tutorials/scala/api/time/CoGroupExample.scala b/src/main/scala/com/flink/tutorials/scala/api/time/CoGroupExample.scala new file mode 100644 index 0000000..fb39fda --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/time/CoGroupExample.scala @@ -0,0 +1,63 @@ +package com.flink.tutorials.scala.api.time + +import java.lang + +import org.apache.flink.api.common.functions.CoGroupFunction +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.util.Collector +import collection.JavaConverters._ + +object CoGroupExample { + + class MyCoGroupFunction extends CoGroupFunction[(String, Int), (String, Int), String] { + + // 这里的类型是Java的Iterable,需要引用 collection.JavaConverters._ 并转成Scala + override def coGroup(input1: lang.Iterable[(String, Int)], input2: lang.Iterable[(String, Int)], out: Collector[String]): Unit = { + input1.asScala.foreach(element => out.collect("input1 :" + element.toString())) + input2.asScala.foreach(element => out.collect("input2 :" + element.toString())) + } + + } + + def main(args: Array[String]): Unit = { + + val senv = StreamExecutionEnvironment.getExecutionEnvironment + senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val socketSource1 = senv.socketTextStream("localhost", 9000) + val socketSource2 = senv.socketTextStream("localhost", 9001) + + val input1: DataStream[(String, Int)] = socketSource1.flatMap { + (line: String, out: Collector[(String, Int)]) => { + val array = line.split(" ") + if (array.size == 2) { + out.collect((array(0), array(1).toInt)) + } + } + } + + val input2: DataStream[(String, Int)] = socketSource2.flatMap { + (line: String, out: Collector[(String, Int)]) => { + val array = line.split(" ") + if (array.size == 2) { + out.collect((array(0), array(1).toInt)) + } + } + } + + + val coGroupResult = input1.coGroup(input2) + .where(i1 => i1._1) + .equalTo(i2 => i2._1) + .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) + .apply(new MyCoGroupFunction) + + coGroupResult.print() + + senv.execute("window cogroup function") + } + +} diff --git a/src/main/scala/com/flink/tutorials/scala/api/time/IncrementalProcessExample.scala b/src/main/scala/com/flink/tutorials/scala/api/time/IncrementalProcessExample.scala new file mode 100644 index 0000000..386e6e4 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/time/IncrementalProcessExample.scala @@ -0,0 +1,61 @@ +package com.flink.tutorials.scala.api.time + +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +object IncrementalProcessExample { + + case class StockPrice(symbol: String, price: Double) + + case class MaxMinPrice(symbol: String, max: Double, min: Double, windowEndTs: Long) + + class WindowEndProcessFunction extends ProcessWindowFunction[(String, Double, Double), MaxMinPrice, String, TimeWindow] { + + override def process(key: String, + context: Context, + elements: Iterable[(String, Double, Double)], + out: Collector[MaxMinPrice]): Unit = { + val maxMinItem = elements.head + val windowEndTs = context.window.getEnd + out.collect(MaxMinPrice(key, maxMinItem._2, maxMinItem._3, windowEndTs)) + } + + } + + def main(args: Array[String]): Unit = { + + val senv = StreamExecutionEnvironment.getExecutionEnvironment + senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val socketSource = senv.socketTextStream("localhost", 9000) + + val input: DataStream[StockPrice] = socketSource.flatMap { + (line: String, out: Collector[StockPrice]) => { + val array = line.split(" ") + if (array.size == 2) { + out.collect(StockPrice(array(0), array(1).toDouble)) + } + } + } + + // reduce的返回类型必须和输入类型相同 + // 为此我们将StockPrice拆成一个三元组 (股票代号,最大值、最小值) + val maxMin = input + .map(s => (s.symbol, s.price, s.price)) + .keyBy(s => s._1) + .timeWindow(Time.seconds(10)) + .reduce( + ((s1: (String, Double, Double), s2: (String, Double, Double)) => (s1._1, Math.max(s1._2, s2._2), Math.min(s1._3, s2._3))), + new WindowEndProcessFunction + ) + + maxMin.print() + + senv.execute("combine reduce and process function") + } + +} diff --git a/src/main/scala/com/flink/tutorials/scala/api/time/IntervalJoinExample.scala b/src/main/scala/com/flink/tutorials/scala/api/time/IntervalJoinExample.scala new file mode 100644 index 0000000..6c4b868 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/time/IntervalJoinExample.scala @@ -0,0 +1,73 @@ +package com.flink.tutorials.scala.api.time + +import java.text.SimpleDateFormat + +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.util.Collector + +object IntervalJoinExample { + + class MyProcessFunction extends ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String] { + override def processElement(input1: (String, Long, Int), + input2: (String, Long, Int), + context: ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String]#Context, + out: Collector[String]): Unit = { + + out.collect("input 1: " + input1.toString() + ", input 2: " + input2.toString) + + } + } + + def main(args: Array[String]): Unit = { + + val senv = StreamExecutionEnvironment.getExecutionEnvironment + senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + senv.getConfig.setAutoWatermarkInterval(2000L) + + val socketSource1 = senv.socketTextStream("localhost", 9000) + val socketSource2 = senv.socketTextStream("localhost", 9001) + + // 数据流有三个字段:(key, 时间戳, 数值) + val input1: DataStream[(String, Long, Int)] = socketSource1.flatMap { + (line: String, out: Collector[(String, Long, Int)]) => { + val array = line.split(" ") + if (array.size == 3) { + out.collect((array(0), array(1).toLong, array(2).toInt)) + } + } + }.assignTimestampsAndWatermarks( + new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(1)) { + override def extractTimestamp(element: (String, Long, Int)): Long = { + element._2 + } + }) + + val input2: DataStream[(String, Long, Int)] = socketSource2.flatMap { + (line: String, out: Collector[(String, Long, Int)]) => { + val array = line.split(" ") + if (array.size == 3) { + out.collect((array(0), array(1).toLong, array(2).toInt)) + } + } + }.assignTimestampsAndWatermarks( + new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(1)) { + override def extractTimestamp(element: (String, Long, Int)): Long = { + element._2 + } + }) + + val intervalJoinResult = input1.keyBy(_._1) + .intervalJoin(input2.keyBy(_._1)) + .between(Time.milliseconds(-5), Time.milliseconds(10)) + .process(new MyProcessFunction) + + intervalJoinResult.print() + + senv.execute("interval join function") + } + +} diff --git a/src/main/scala/com/flink/tutorials/scala/api/time/JoinExample.scala b/src/main/scala/com/flink/tutorials/scala/api/time/JoinExample.scala new file mode 100644 index 0000000..89eead0 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/time/JoinExample.scala @@ -0,0 +1,58 @@ +package com.flink.tutorials.scala.api.time + +import org.apache.flink.api.common.functions.JoinFunction +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.util.Collector + +object JoinExample { + + class MyJoinFunction extends JoinFunction[(String, Int), (String, Int), String] { + + override def join(input1: (String, Int), input2: (String, Int)): String = { + "input 1 :" + input1._2 + ", input 2 :" + input2._2 + } + + } + + def main(args: Array[String]): Unit = { + + val senv = StreamExecutionEnvironment.getExecutionEnvironment + senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val socketSource1 = senv.socketTextStream("localhost", 9000) + val socketSource2 = senv.socketTextStream("localhost", 9001) + + val input1: DataStream[(String, Int)] = socketSource1.flatMap { + (line: String, out: Collector[(String, Int)]) => { + val array = line.split(" ") + if (array.size == 2) { + out.collect((array(0), array(1).toInt)) + } + } + } + + val input2: DataStream[(String, Int)] = socketSource2.flatMap { + (line: String, out: Collector[(String, Int)]) => { + val array = line.split(" ") + if (array.size == 2) { + out.collect((array(0), array(1).toInt)) + } + } + } + + + val joinResult = input1.join(input2) + .where(i1 => i1._1) + .equalTo(i2 => i2._1) + .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) + .apply(new MyJoinFunction) + + joinResult.print() + + senv.execute("window join function") + } + +} diff --git a/src/main/scala/com/flink/tutorials/scala/api/time/LateExample.scala b/src/main/scala/com/flink/tutorials/scala/api/time/LateExample.scala new file mode 100644 index 0000000..883b318 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/time/LateExample.scala @@ -0,0 +1,133 @@ +package com.flink.tutorials.scala.api.time + +import java.text.SimpleDateFormat +import java.util.Calendar + +import org.apache.flink.api.common.functions.AggregateFunction +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.scala.typeutils.Types +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.RichSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +import scala.util.Random + +object LateExample { + + class CountAggregate extends AggregateFunction[(String, Long, Int), (String, Int), (String, Int)] { + + override def createAccumulator() = ("", 0) + + override def add(item: (String, Long, Int), accumulator: (String, Int)) = + (item._1, accumulator._2 + 1) + + override def getResult(accumulator:(String, Int)) = accumulator + + override def merge(a: (String, Int), b: (String, Int)) = + (a._1 ,a._2 + b._2) + } + + class MySource extends RichSourceFunction[(String, Long, Int)]{ + + var isRunning: Boolean = true + + val rand = new Random() + + override def run(srcCtx: SourceContext[(String, Long, Int)]): Unit = { + + var count = 0 + + while (isRunning) { + + val curTime = Calendar.getInstance.getTimeInMillis + // 增加一些延迟 + val eventTime = curTime + rand.nextInt(10000) + + // 将数据源收集写入SourceContext + srcCtx.collect(("1", eventTime, rand.nextInt())) + Thread.sleep(100) + } + } + + override def cancel(): Unit = { + isRunning = false + } + } + + // ProcessWindowFunction接收的泛型参数分别为:[输入类型、输出类型、Key、Window] + class AllowedLatenessFunction extends ProcessWindowFunction[ + (String, Long, Int), (String, String, Int, String), String, TimeWindow] { + + override def process(key: String, + context: Context, + elements: Iterable[(String, Long, Int)], + out: Collector[(String, String, Int, String)]): Unit = { + + // 是否被迟到数据更新 + val isUpdated = context.windowState.getState( + new ValueStateDescriptor[Boolean]("isUpdated", Types.of[Boolean]) + ) + val count = elements.size + val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + + if (isUpdated.value() == false) { + // 第一次使用process函数时, Boolean默认初始化为false,因此窗口函数第一次被调用时会进入这里 + out.collect((key, format.format(Calendar.getInstance().getTime), count, "first")) + isUpdated.update(true) + } else { + // 之后isUpdated被置为true,窗口函数因迟到数据被调用时会进入这里 + out.collect((key, format.format(Calendar.getInstance().getTime), count, "updated")) + } + + } + } + + def main(args: Array[String]): Unit = { + + val senv = StreamExecutionEnvironment.getExecutionEnvironment + senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + senv.setParallelism(1) + senv.getConfig.setAutoWatermarkInterval(2000L) + + val socketSource = senv.socketTextStream("localhost", 9000) + + // 数据流有三个字段:(key, 时间戳, 数值) + val input: DataStream[(String, Long, Int)] = senv + .addSource(new MySource) + .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(5)) { + override def extractTimestamp(element: (String, Long, Int)): Long = { + element._2 + } + }) + + val mainStream = input.keyBy(item => item._1) + .timeWindow(Time.seconds(5)) + // 将输出写到late-elements里 + .sideOutputLateData(new OutputTag[(String, Long, Int)]("late-elements")) + .aggregate(new CountAggregate) + + // 接受late-elements,形成一个数据流 + val lateStream: DataStream[(String, Long, Int)] = mainStream.getSideOutput(new OutputTag[(String, Long, Int)]("late-elements")) + +// mainStream.print() +// lateStream.print() + + val allowedLatenessStream = input.keyBy(item => item._1) + .timeWindow(Time.seconds(5)) + .allowedLateness(Time.seconds(5)) + .process(new AllowedLatenessFunction) + + allowedLatenessStream.print() + + + senv.execute("late elements") + } + + +} diff --git a/src/main/scala/com/flink/tutorials/scala/api/time/PeriodicWatermark.scala b/src/main/scala/com/flink/tutorials/scala/api/time/PeriodicWatermark.scala new file mode 100644 index 0000000..3ad6980 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/time/PeriodicWatermark.scala @@ -0,0 +1,84 @@ +package com.flink.tutorials.scala.api.time + +import java.text.SimpleDateFormat + +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks} +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time + +object PeriodicWatermark { + + def main(args: Array[String]): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + // 每5000毫秒生成一个Watermark + env.getConfig.setAutoWatermarkInterval(5000L) + + val socketSource = env.socketTextStream("localhost", 9000) + + val input = socketSource.map{ + line => { + val arr = line.split(" ") + val id = arr(0) + val time = arr(1).toLong + (id, time) + } + } + + val watermark = input.assignTimestampsAndWatermarks(new MyPeriodicAssigner) + val boundedOutOfOrder = input.assignTimestampsAndWatermarks( + new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.minutes(1)) { + override def extractTimestamp(element: (String, Long)): Long = { + element._2 + } + }) + + env.execute("periodic and punctuated watermark") + + } + + // 假设数据流的元素有两个字段(String, Long),其中第二个字段是该元素的时间戳 + class MyPeriodicAssigner extends AssignerWithPeriodicWatermarks[(String, Long)] { + val bound: Long = 60 * 1000 // 1分钟 + var maxTs: Long = Long.MinValue // 已抽取的Timestamp最大值 + + val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + + override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = { + println("extractTimestamp is inovked for element " + element._1 + "@" + element._2) + // 更新maxTs为当前遇到的最大值 + maxTs = maxTs.max(element._2) + // 使用第二个字段作为这个元素的Event Time + element._2 + } + + override def getCurrentWatermark: Watermark = { + println("getCurrentWatermark method is invoked @:" + formatter.format(System.currentTimeMillis())) + // Watermark比Timestamp最大值慢1分钟 + val watermark = new Watermark(maxTs - bound) + println("watermark timestamp @:" + formatter.format(watermark.getTimestamp)) + watermark + } + } + + // 第二个字段是时间戳,第三个字段判断是否为Watermark的标记 + class MyPunctuatedAssigner extends AssignerWithPunctuatedWatermarks[(String, Long, Boolean)] { + + override def extractTimestamp(element: (String, Long, Boolean), previousElementTimestamp: Long): Long = { + element._2 + } + + override def checkAndGetNextWatermark(element: (String, Long, Boolean), extractedTimestamp: Long): Watermark = { + if (element._3) + new Watermark(extractedTimestamp) + else + null + } + } + +} diff --git a/src/main/scala/com/flink/tutorials/scala/api/time/ProcessWindowFunctionExample.scala b/src/main/scala/com/flink/tutorials/scala/api/time/ProcessWindowFunctionExample.scala new file mode 100644 index 0000000..86a96d2 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/time/ProcessWindowFunctionExample.scala @@ -0,0 +1,65 @@ +package com.flink.tutorials.scala.api.time + +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + + +object ProcessWindowFunctionExample { + + case class StockPrice(symbol: String, price: Double) + + // ProcessWindowFunction接收的泛型参数分别为:[输入类型、输出类型、Key、Window] + class FrequencyProcessFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] { + + override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = { + + // 股票价格和该价格出现的次数 + var countMap = scala.collection.mutable.Map[Double, Int]() + + for(element <- elements) { + val count = countMap.getOrElse(element.price, 0) + countMap(element.price) = count + 1 + } + + // 按照出现次数从高到低排序 + val sortedMap = countMap.toSeq.sortWith(_._2 > _._2) + + // 选出出现次数最高的输出到Collector + if (sortedMap.size > 0) { + out.collect((key, sortedMap(0)._1)) + } + + } + } + + def main(args: Array[String]): Unit = { + + val senv = StreamExecutionEnvironment.getExecutionEnvironment + senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val socketSource = senv.socketTextStream("localhost", 9000) + + val input: DataStream[StockPrice] = socketSource.flatMap { + (line: String, out: Collector[StockPrice]) => { + val array = line.split(" ") + if (array.size == 2) { + out.collect(StockPrice(array(0), array(1).toDouble)) + } + } + } + + val frequency = input + .keyBy(s => s.symbol) + .timeWindow(Time.seconds(10)) + .process(new FrequencyProcessFunction) + + frequency.print() + + senv.execute("window process function") + } + +} diff --git a/src/main/scala/com/flink/tutorials/scala/api/time/ReduceFunctionExample.scala b/src/main/scala/com/flink/tutorials/scala/api/time/ReduceFunctionExample.scala new file mode 100644 index 0000000..349c419 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/time/ReduceFunctionExample.scala @@ -0,0 +1,39 @@ +package com.flink.tutorials.scala.api.time + +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.util.Collector + +object ReduceFunctionExample { + + case class StockPrice(symbol: String, price: Double) + + def main(args: Array[String]): Unit = { + + val senv = StreamExecutionEnvironment.getExecutionEnvironment + senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val socketSource = senv.socketTextStream("localhost", 9000) + + val input: DataStream[StockPrice] = socketSource.flatMap { + (line: String, out: Collector[StockPrice]) => { + val array = line.split(" ") + if (array.size == 2) { + out.collect(StockPrice(array(0), array(1).toDouble)) + } + } + } + + // reduce的返回类型必须和输入类型StockPrice一致 + val sum = input + .keyBy(s => s.symbol) + .timeWindow(Time.seconds(10)) + .reduce((s1, s2) => StockPrice(s1.symbol, s1.price + s2.price)) + + sum.print() + + senv.execute("window reduce function") + } + +} diff --git a/src/main/scala/com/flink/tutorials/scala/api/time/TriggerExample.scala b/src/main/scala/com/flink/tutorials/scala/api/time/TriggerExample.scala new file mode 100644 index 0000000..61ca1c7 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/time/TriggerExample.scala @@ -0,0 +1,102 @@ +package com.flink.tutorials.scala.api.time + +import org.apache.flink.api.common.functions.AggregateFunction +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +object TriggerExample { + + case class StockPrice(symbol: String, price: Double) + + // IN: StockPrice + // ACC:(String, Double, Int) - (symbol, sum, count) + // OUT: (String, Double) - (symbol, average) + class AverageAggregate extends AggregateFunction[StockPrice, (String, Double, Int), (String, Double)] { + + override def createAccumulator() = ("", 0, 0) + + override def add(item: StockPrice, accumulator: (String, Double, Int)) = + (item.symbol, accumulator._2 + item.price, accumulator._3 + 1) + + override def getResult(accumulator:(String, Double, Int)) = (accumulator._1 ,accumulator._2 / accumulator._3) + + override def merge(a: (String, Double, Int), b: (String, Double, Int)) = + (a._1 ,a._2 + b._2, a._3 + b._3) + } + + class MyTrigger extends Trigger[StockPrice, TimeWindow] { + + override def onElement(element: StockPrice, + time: Long, + window: TimeWindow, + triggerContext: Trigger.TriggerContext): TriggerResult = { + val lastPriceState: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double]("lastPriceState", classOf[Double])) + + // 设置返回默认值为CONTINUE + var triggerResult: TriggerResult = TriggerResult.CONTINUE + + // 第一次使用lastPriceState时状态是空的,需要先进行判断 + // 状态数据由Java端生成,如果是空,返回一个null + // 如果直接使用Scala的Double,需要使用下面的方法判断是否为空 + if (Option(lastPriceState.value()).isDefined) { + if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.05) { + // 如果价格跌幅大于5%,直接FIRE_AND_PURGE + triggerResult = TriggerResult.FIRE_AND_PURGE + } else if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.01) { + val t = triggerContext.getCurrentProcessingTime + (10 * 1000 - (triggerContext.getCurrentProcessingTime % 10 * 1000)) + // 给10秒后注册一个Timer + triggerContext.registerProcessingTimeTimer(t) + } + } + lastPriceState.update(element.price) + triggerResult + } + + // 我们不用EventTime,直接返回一个CONTINUE + override def onEventTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { + TriggerResult.CONTINUE + } + + override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { + TriggerResult.FIRE_AND_PURGE + } + + override def clear(window: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = { + val lastPrice: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double]("lastPrice", classOf[Double])) + lastPrice.clear() + } + } + + def main(args: Array[String]): Unit = { + + val senv = StreamExecutionEnvironment.getExecutionEnvironment + senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val socketSource = senv.socketTextStream("localhost", 9000) + + val input: DataStream[StockPrice] = socketSource.flatMap { + (line: String, out: Collector[StockPrice]) => { + val array = line.split(" ") + if (array.size == 2) { + out.collect(StockPrice(array(0), array(1).toDouble)) + } + } + } + + val average = input + .keyBy(s => s.symbol) + .timeWindow(Time.seconds(60)) + .trigger(new MyTrigger) + .aggregate(new AverageAggregate) + + average.print() + + senv.execute("trigger") + } + +} diff --git a/src/main/scala/com/flink/tutorials/api/transformations/AggregationExample.scala b/src/main/scala/com/flink/tutorials/scala/api/transformations/AggregationExample.scala similarity index 95% rename from src/main/scala/com/flink/tutorials/api/transformations/AggregationExample.scala rename to src/main/scala/com/flink/tutorials/scala/api/transformations/AggregationExample.scala index 9b12126..1695079 100644 --- a/src/main/scala/com/flink/tutorials/api/transformations/AggregationExample.scala +++ b/src/main/scala/com/flink/tutorials/scala/api/transformations/AggregationExample.scala @@ -1,4 +1,4 @@ -package com.flink.tutorials.api.transformations +package com.flink.tutorials.scala.api.transformations import org.apache.flink.streaming.api.scala._ diff --git a/src/main/scala/com/flink/tutorials/api/transformations/FilterExample.scala b/src/main/scala/com/flink/tutorials/scala/api/transformations/FilterExample.scala similarity index 95% rename from src/main/scala/com/flink/tutorials/api/transformations/FilterExample.scala rename to src/main/scala/com/flink/tutorials/scala/api/transformations/FilterExample.scala index 91c8382..d98c661 100644 --- a/src/main/scala/com/flink/tutorials/api/transformations/FilterExample.scala +++ b/src/main/scala/com/flink/tutorials/scala/api/transformations/FilterExample.scala @@ -1,4 +1,4 @@ -package com.flink.tutorials.api.transformations +package com.flink.tutorials.scala.api.transformations import org.apache.flink.api.common.functions.RichFilterFunction import org.apache.flink.streaming.api.scala._ diff --git a/src/main/scala/com/flink/tutorials/scala/api/transformations/FlatMapExample.scala b/src/main/scala/com/flink/tutorials/scala/api/transformations/FlatMapExample.scala new file mode 100644 index 0000000..3b6d193 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/transformations/FlatMapExample.scala @@ -0,0 +1,101 @@ +package com.flink.tutorials.scala.api.transformations + +import org.apache.flink.api.common.accumulators.IntCounter +import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.scala.extensions._ +import org.apache.flink.util.Collector + +object FlatMapExample { + + def main(args: Array[String]): Unit = { + // 创建 Flink 执行环境 + val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + + val dataStream: DataStream[String] = senv.fromElements("Hello World", "Hello this is Flink") + + // split函数的输入为 "Hello World" 输出为 "Hello" 和 "World" 组成的列表 ["Hello", "World"] + // flatMap将列表中每个元素提取出来 + // 最后输出为 ["Hello", "World", "Hello", "this", "is", "Flink"] + val words = dataStream.flatMap ( input => input.split(" ") ) + + val words2 = dataStream.map { _.split(" ") } + + // 只对字符串数量大于15的句子进行处理 + val longSentenceWords = dataStream.flatMap { + input => { + if (input.size > 15) { + // 输出是 TraversableOnce 因此返回必须是一个列表 + // 这里将Array[String]转成了Seq[String] + input.split(" ").toSeq + } else { + // 为空时必须返回空列表,否则返回值无法与TraversableOnce匹配! + Seq.empty + } + } + } + + val flatMapWith = dataStream.flatMapWith { + case (sentence: String) => { + if (sentence.size > 15) { + sentence.split(" ").toSeq + } else { + Seq.empty + } + } + } + + + + val function = dataStream.flatMap(new WordSplitFlatMap(10)) + + val lambda = dataStream.flatMap{ + (value: String, out: Collector[String]) => { + if (value.size > 10) { + value.split(" ").foreach(out.collect) + } + } + } + + val richFunction = dataStream.flatMap(new WordSplitRichFlatMap(10)) + + val jobExecuteResult = senv.execute("basic flatMap transformation") + + // 执行结束后 获取累加器的结果 + val lines: Int = jobExecuteResult.getAccumulatorResult("num-of-lines") + println("num of lines: " + lines) + } + + // 使用FlatMapFunction实现过滤逻辑,只对字符串长度大于 limit 的内容进行词频统计 + class WordSplitFlatMap(limit: Int) extends FlatMapFunction[String, String] { + override def flatMap(value: String, out: Collector[String]): Unit = { + if (value.size > limit) { + // split返回一个Array + // 将Array中的每个元素使用Collector.collect收集起来,起到将列表展平的效果 + value.split(" ").foreach(out.collect) + } + } + } + + // 使用RichFlatMapFunction实现 + // 添加了累加器 Accumulator + class WordSplitRichFlatMap(limit: Int) extends RichFlatMapFunction[String, String] { + // 创建一个累加器 + val numOfLines: IntCounter = new IntCounter(0) + + override def open(parameters: Configuration): Unit = { + // 在RuntimeContext中注册累加器 + getRuntimeContext.addAccumulator("num-of-lines", this.numOfLines) + } + + override def flatMap(value: String, out: Collector[String]): Unit = { + // 运行过程中调用累加器 + this.numOfLines.add(1) + if(value.size > limit) { + value.split(" ").foreach(out.collect) + } + } + } + +} diff --git a/src/main/scala/com/flink/tutorials/api/transformations/KeyByExample.scala b/src/main/scala/com/flink/tutorials/scala/api/transformations/KeyByExample.scala similarity index 52% rename from src/main/scala/com/flink/tutorials/api/transformations/KeyByExample.scala rename to src/main/scala/com/flink/tutorials/scala/api/transformations/KeyByExample.scala index 4862ba4..d57ba75 100644 --- a/src/main/scala/com/flink/tutorials/api/transformations/KeyByExample.scala +++ b/src/main/scala/com/flink/tutorials/scala/api/transformations/KeyByExample.scala @@ -1,5 +1,6 @@ -package com.flink.tutorials.api.transformations +package com.flink.tutorials.scala.api.transformations +import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.streaming.api.scala._ object KeyByExample { @@ -11,12 +12,19 @@ object KeyByExample { val dataStream: DataStream[(Int, Double)] = senv.fromElements((1, 1.0), (2, 3.2), (1, 5.5), (3, 10.0), (3, 12.5)) // 使用数字位置定义Key 按照第一个字段进行分组 - val keyedStream = dataStream.keyBy(0).sum(1) + val keyedStream = dataStream.keyBy(0).sum(1).print() - keyedStream.print() + // 使用KeySelector + val keySelectorStream = dataStream.keyBy(new MyKeySelector).sum(1).print() senv.execute("basic keyBy transformation") } + class MyKeySelector extends KeySelector[(Int, Double), (Int)] { + override def getKey(in: (Int, Double)): Int = { + return in._1 + } + } + } diff --git a/src/main/scala/com/flink/tutorials/api/transformations/MapExample.scala b/src/main/scala/com/flink/tutorials/scala/api/transformations/MapExample.scala similarity index 96% rename from src/main/scala/com/flink/tutorials/api/transformations/MapExample.scala rename to src/main/scala/com/flink/tutorials/scala/api/transformations/MapExample.scala index fddca9f..82cf7b1 100644 --- a/src/main/scala/com/flink/tutorials/api/transformations/MapExample.scala +++ b/src/main/scala/com/flink/tutorials/scala/api/transformations/MapExample.scala @@ -1,4 +1,4 @@ -package com.flink.tutorials.api.transformations +package com.flink.tutorials.scala.api.transformations import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.streaming.api.scala._ diff --git a/src/main/scala/com/flink/tutorials/scala/api/transformations/PartitionCustomExample.scala b/src/main/scala/com/flink/tutorials/scala/api/transformations/PartitionCustomExample.scala new file mode 100644 index 0000000..502262f --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/transformations/PartitionCustomExample.scala @@ -0,0 +1,57 @@ +package com.flink.tutorials.scala.api.transformations + +import org.apache.flink.api.common.functions.Partitioner +import org.apache.flink.streaming.api.scala._ + +object PartitionCustomExample { + + /** + * Partitioner[T] 其中泛型T为指定的字段类型 + * 重写partiton函数,并根据T字段对数据流中的所有元素进行数据重分配 + * */ + class MyPartitioner extends Partitioner[String] { + + val rand = scala.util.Random + + /** + * key 泛型T 即根据哪个字段进行数据重分配,本例中是(Int, String)中的String + * numPartitons 为当前有多少个并行实例 + * 函数返回值是一个Int 为该元素将被发送给下游第几个实例 + * */ + override def partition(key: String, numPartitions: Int): Int = { + var randomNum = rand.nextInt(numPartitions / 2) + + // 如果字符串中包含数字,该元素将被路由到前半部分,否则将被路由到后半部分。 + if (key.exists(_.isDigit)) { + return randomNum + } else { + return randomNum + numPartitions / 2 + } + } + } + + def main(args: Array[String]): Unit = { + + val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + + // 获取当前执行环境的默认并行度 + val defaultParalleism = senv.getParallelism + + // 设置所有算子的并行度为4,表示所有算子的并行执行的实例数为4 + senv.setParallelism(4) + + val dataStream: DataStream[(Int, String)] = senv.fromElements((1, "123"), (2, "abc"), (3, "256"), (4, "zyx") + , (5, "bcd"), (6, "666")) + + + + // 对(Int, String)中的第二个字段使用 MyPartitioner 中的重分布逻辑 + val partitioned = dataStream.partitionCustom(new MyPartitioner, 1) + + partitioned.print() + + senv.execute("partition custom transformation") + + } + +} diff --git a/src/main/scala/com/flink/tutorials/api/transformations/ReduceExample.scala b/src/main/scala/com/flink/tutorials/scala/api/transformations/ReduceExample.scala similarity index 95% rename from src/main/scala/com/flink/tutorials/api/transformations/ReduceExample.scala rename to src/main/scala/com/flink/tutorials/scala/api/transformations/ReduceExample.scala index fcc4ee0..550f548 100644 --- a/src/main/scala/com/flink/tutorials/api/transformations/ReduceExample.scala +++ b/src/main/scala/com/flink/tutorials/scala/api/transformations/ReduceExample.scala @@ -1,4 +1,4 @@ -package com.flink.tutorials.api.transformations +package com.flink.tutorials.scala.api.transformations import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.scala._ diff --git a/src/main/scala/com/flink/tutorials/api/transformations/SimpleConnectExample.scala b/src/main/scala/com/flink/tutorials/scala/api/transformations/SimpleConnectExample.scala similarity index 94% rename from src/main/scala/com/flink/tutorials/api/transformations/SimpleConnectExample.scala rename to src/main/scala/com/flink/tutorials/scala/api/transformations/SimpleConnectExample.scala index 0879aaf..996b6eb 100644 --- a/src/main/scala/com/flink/tutorials/api/transformations/SimpleConnectExample.scala +++ b/src/main/scala/com/flink/tutorials/scala/api/transformations/SimpleConnectExample.scala @@ -1,4 +1,4 @@ -package com.flink.tutorials.api.transformations +package com.flink.tutorials.scala.api.transformations import org.apache.flink.streaming.api.functions.co.CoMapFunction import org.apache.flink.streaming.api.scala._ diff --git a/src/main/scala/com/flink/tutorials/scala/api/types/TupleExample.scala b/src/main/scala/com/flink/tutorials/scala/api/types/TupleExample.scala new file mode 100644 index 0000000..f2dd7f2 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/types/TupleExample.scala @@ -0,0 +1,21 @@ +package com.flink.tutorials.scala.api.types + +import org.apache.flink.streaming.api.scala._ + +object TupleExample { + + // Scala Tuple Example + def main(args: Array[String]): Unit = { + + val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + + val dataStream: DataStream[(String, Long, Double)] = + senv.fromElements(("0001", 0L, 121.2), ("0002" ,1L, 201.8), + ("0003", 2L, 10.3), ("0004", 3L, 99.6)) + + dataStream.filter(item => item._3 > 100) + + senv.execute("scala tuple") + } + +} diff --git a/src/main/scala/com/flink/tutorials/demos/stock/StockMediaConnectedDemo.scala b/src/main/scala/com/flink/tutorials/scala/demos/stock/StockMediaConnectedDemo.scala similarity index 95% rename from src/main/scala/com/flink/tutorials/demos/stock/StockMediaConnectedDemo.scala rename to src/main/scala/com/flink/tutorials/scala/demos/stock/StockMediaConnectedDemo.scala index 5c4de9d..dcd7946 100644 --- a/src/main/scala/com/flink/tutorials/demos/stock/StockMediaConnectedDemo.scala +++ b/src/main/scala/com/flink/tutorials/scala/demos/stock/StockMediaConnectedDemo.scala @@ -1,8 +1,8 @@ -package com.flink.tutorials.demos.stock +package com.flink.tutorials.scala.demos.stock import java.util.Calendar -import com.flink.tutorials.demos.stock.StockPriceDemo.{StockPrice, StockPriceSource, StockPriceTimeAssigner} +import StockPriceDemo.{StockPrice, StockPriceSource, StockPriceTimeAssigner} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction diff --git a/src/main/scala/com/flink/tutorials/demos/stock/StockPriceDemo.scala b/src/main/scala/com/flink/tutorials/scala/demos/stock/StockPriceDemo.scala similarity index 98% rename from src/main/scala/com/flink/tutorials/demos/stock/StockPriceDemo.scala rename to src/main/scala/com/flink/tutorials/scala/demos/stock/StockPriceDemo.scala index c6544e9..74c7a0a 100644 --- a/src/main/scala/com/flink/tutorials/demos/stock/StockPriceDemo.scala +++ b/src/main/scala/com/flink/tutorials/scala/demos/stock/StockPriceDemo.scala @@ -1,4 +1,4 @@ -package com.flink.tutorials.demos.stock +package com.flink.tutorials.scala.demos.stock import java.util.Calendar diff --git a/src/main/scala/com/flink/tutorials/demos/wikiedits/WikipediaAnalysis.scala b/src/main/scala/com/flink/tutorials/scala/demos/wikiedits/WikipediaAnalysis.scala similarity index 92% rename from src/main/scala/com/flink/tutorials/demos/wikiedits/WikipediaAnalysis.scala rename to src/main/scala/com/flink/tutorials/scala/demos/wikiedits/WikipediaAnalysis.scala index d50250a..9110902 100644 --- a/src/main/scala/com/flink/tutorials/demos/wikiedits/WikipediaAnalysis.scala +++ b/src/main/scala/com/flink/tutorials/scala/demos/wikiedits/WikipediaAnalysis.scala @@ -1,4 +1,4 @@ -package com.flink.tutorials.demos.wikiedits +package com.flink.tutorials.scala.demos.wikiedits import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time