diff --git a/src/main/java/com/flink/tutorials/java/chapter7/SimpleSourceExample.java b/src/main/java/com/flink/tutorials/java/chapter7/SimpleSourceExample.java index af8657c..0b6fb03 100644 --- a/src/main/java/com/flink/tutorials/java/chapter7/SimpleSourceExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter7/SimpleSourceExample.java @@ -18,6 +18,7 @@ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, conf); DataStream> countStream = env.addSource(new SimpleSource()); System.out.println("parallelism: " + env.getParallelism()); + countStream.print(); env.execute("source"); } diff --git a/src/main/java/com/flink/tutorials/java/chapter7/TextFileExample.java b/src/main/java/com/flink/tutorials/java/chapter7/TextFileExample.java index 11ad000..dc81983 100644 --- a/src/main/java/com/flink/tutorials/java/chapter7/TextFileExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter7/TextFileExample.java @@ -22,11 +22,18 @@ public static void main(String[] args) throws Exception { TextInputFormat textInputFormat = new TextInputFormat(new org.apache.flink.core.fs.Path(filePath)); // 每隔100毫秒检测一遍 -// DataStream inputStream = env.readFile(textInputFormat, filePath, -// FileProcessingMode.PROCESS_CONTINUOUSLY, 100); - - DataStream readOnceStream = env.readFile(textInputFormat, filePath, - FileProcessingMode.PROCESS_ONCE, 0); +// DataStream inputStream = env.readFile( +// textInputFormat, +// filePath, +// FileProcessingMode.PROCESS_CONTINUOUSLY, +// 100); + + // 只读一次 + DataStream readOnceStream = env.readFile( + textInputFormat, + filePath, + FileProcessingMode.PROCESS_ONCE, + 0); StreamingFileSink fileSink = StreamingFileSink .forRowFormat(new Path(filePath + "output-test"), diff --git a/src/main/java/com/flink/tutorials/java/chapter7/TransactionWriteSinkExample.java b/src/main/java/com/flink/tutorials/java/chapter7/TransactionWriteSinkExample.java index e8533dc..1cc1625 100644 --- a/src/main/java/com/flink/tutorials/java/chapter7/TransactionWriteSinkExample.java +++ b/src/main/java/com/flink/tutorials/java/chapter7/TransactionWriteSinkExample.java @@ -76,7 +76,7 @@ public String beginTransaction() throws Exception { // 创建一个存储本次事务的文件 Files.createFile(preCommitFilePath); transactionWriter = Files.newBufferedWriter(preCommitFilePath); - System.out.println("ransaction File: " + preCommitFilePath); + System.out.println("Transaction File: " + preCommitFilePath); return fileName; } diff --git a/src/main/java/com/flink/tutorials/java/chapter8/UserBehaviorKafkaConnect.java b/src/main/java/com/flink/tutorials/java/chapter8/UserBehaviorKafkaConnect.java index a26e36f..1dc14a1 100644 --- a/src/main/java/com/flink/tutorials/java/chapter8/UserBehaviorKafkaConnect.java +++ b/src/main/java/com/flink/tutorials/java/chapter8/UserBehaviorKafkaConnect.java @@ -25,7 +25,7 @@ public static void main(String[] args) throws Exception { env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); tEnv - // 使用connect函数连接外部系统 + // 使用connect函数连接外部系统 connect将被废弃 .connect( new Kafka() .version("universal") // 必填,合法的参数有"0.8", "0.9", "0.10", "0.11"或"universal" diff --git a/src/main/scala/com/flink/tutorials/scala/chapter5/AssignWatermark.scala b/src/main/scala/com/flink/tutorials/scala/chapter5/AssignWatermark.scala index 18b63cf..f65d17f 100644 --- a/src/main/scala/com/flink/tutorials/scala/chapter5/AssignWatermark.scala +++ b/src/main/scala/com/flink/tutorials/scala/chapter5/AssignWatermark.scala @@ -29,16 +29,18 @@ object AssignWatermark { // 使用下面的方式,部分Intellij需要在设置中添加 -target:jvm-1.8 // Preferences -> Build, Execution, Deployment -> Compiler -> Scala Compiler -> Default / Maven工程 // Additional compiler options 行添加参数: -target:jvm-1.8 - val periodWatermark: DataStream[(String, Long)] = input.assignTimestampsAndWatermarks( - WatermarkStrategy.forGenerator[(String, Long)]( - new WatermarkGeneratorSupplier[(String, Long)] { - override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Long)] = - new MyPeriodicGenerator - } - ).withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] { - override def extractTimestamp(t: (String, Long), l: Long): Long = t._2 - }) - ) + val periodWatermark: DataStream[(String, Long)] = input + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forGenerator[(String, Long)]( + new WatermarkGeneratorSupplier[(String, Long)] { + override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Long)] = + new MyPeriodicGenerator + } + ).withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] { + override def extractTimestamp(t: (String, Long), l: Long): Long = t._2 + }) + ) periodWatermark.print() diff --git a/src/main/scala/com/flink/tutorials/scala/chapter5/IntervalJoinExample.scala b/src/main/scala/com/flink/tutorials/scala/chapter5/IntervalJoinExample.scala index fe51371..fb5d687 100644 --- a/src/main/scala/com/flink/tutorials/scala/chapter5/IntervalJoinExample.scala +++ b/src/main/scala/com/flink/tutorials/scala/chapter5/IntervalJoinExample.scala @@ -1,5 +1,8 @@ package com.flink.tutorials.scala.api.chapter5 +import java.time.Duration + +import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkStrategy} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor @@ -27,11 +30,12 @@ object IntervalJoinExample { } } }.assignTimestampsAndWatermarks( - new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(1)) { - override def extractTimestamp(element: (String, Long, Int)): Long = { - element._2 - } - }) + WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(1)) + .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long, Int)] { + override def extractTimestamp(t: (String, Long, Int), l: Long): Long = t._2 + }) + ) val input2: DataStream[(String, Long, Int)] = socketSource2.flatMap { (line: String, out: Collector[(String, Long, Int)]) => { @@ -41,11 +45,12 @@ object IntervalJoinExample { } } }.assignTimestampsAndWatermarks( - new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(1)) { - override def extractTimestamp(element: (String, Long, Int)): Long = { - element._2 - } + WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(1)) + .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long, Int)] { + override def extractTimestamp(t: (String, Long, Int), l: Long): Long = t._2 }) + ) val intervalJoinResult: DataStream[String] = input1.keyBy(_._1) .intervalJoin(input2.keyBy(_._1)) diff --git a/src/main/scala/com/flink/tutorials/scala/chapter5/KeyedCoProcessFunctionExample.scala b/src/main/scala/com/flink/tutorials/scala/chapter5/KeyedCoProcessFunctionExample.scala index f4b4d33..9e84fdb 100644 --- a/src/main/scala/com/flink/tutorials/scala/chapter5/KeyedCoProcessFunctionExample.scala +++ b/src/main/scala/com/flink/tutorials/scala/chapter5/KeyedCoProcessFunctionExample.scala @@ -32,15 +32,6 @@ object KeyedCoProcessFunctionExample { }) ) -// // 读入股票数据流 -// val stockStream: DataStream[StockPrice] = env -// .addSource(new StockSource("stock/stock-tick-20200108.csv")) -// .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[StockPrice]() { -// override def extractAscendingTimestamp(stock: StockPrice): Long = { -// stock.ts -// } -// }) - // 读入媒体评价数据流 val mediaStream: DataStream[Media] = env .addSource(new MediaSource) diff --git a/src/main/scala/com/flink/tutorials/scala/chapter7/SimpleSourceExample.scala b/src/main/scala/com/flink/tutorials/scala/chapter7/SimpleSourceExample.scala new file mode 100644 index 0000000..a390d46 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/chapter7/SimpleSourceExample.scala @@ -0,0 +1,43 @@ +package com.flink.tutorials.scala.chapter7 + +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.configuration.{Configuration, RestOptions} +import org.apache.flink.streaming.api.functions.source.SourceFunction + +object SimpleSourceExample { + + def main(args: Array[String]): Unit = { + val conf = new Configuration + // 访问 http://localhost:8082 可以看到Flink Web UI + conf.setInteger(RestOptions.PORT, 8082) + // 创建本地执行环境,并行度为2 + val env = StreamExecutionEnvironment.createLocalEnvironment(2, conf) + val countStream = env.addSource(new SimpleSource) + System.out.println("parallelism: " + env.getParallelism) + + countStream.print() + env.execute("source") + } + + class SimpleSource extends SourceFunction[(String, Integer)] { + private var offset = 0 + private var isRunning = true + + override def run(ctx: SourceFunction.SourceContext[(String, Integer)]): Unit = { + while ( { + isRunning + }) { + Thread.sleep(500) + ctx.collect(new (String, Integer)("" + offset, offset)) + offset += 1 + if (offset == 1000) + isRunning = false + } + } + + override def cancel(): Unit = { + isRunning = false + } + } + +} diff --git a/src/main/scala/com/flink/tutorials/scala/chapter7/TextFileExample.scala b/src/main/scala/com/flink/tutorials/scala/chapter7/TextFileExample.scala index bc70c7d..c2e431a 100644 --- a/src/main/scala/com/flink/tutorials/scala/chapter7/TextFileExample.scala +++ b/src/main/scala/com/flink/tutorials/scala/chapter7/TextFileExample.scala @@ -1,6 +1,5 @@ package com.flink.tutorials.scala.api.chapter7 -import com.flink.tutorials.java.chapter7.TextFileExample import org.apache.flink.api.java.io.TextInputFormat import org.apache.flink.streaming.api.functions.source.FileProcessingMode import org.apache.flink.streaming.api.scala._ @@ -12,15 +11,26 @@ object TextFileExample { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 文件路径 - val filePath = classOf[TextFileExample].getClassLoader.getResource("taobao/UserBehavior-20171201.csv").getPath + val filePath = getClass.getClassLoader.getResource("taobao/UserBehavior-20171201.csv").getPath // 文件为纯文本格式 val textInputFormat = new TextInputFormat(new org.apache.flink.core.fs.Path(filePath)) // 每隔100毫秒检测一遍 - val inputStream = env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100) - - inputStream.print +// val inputStream = env.readFile( +// textInputFormat, +// filePath, +// FileProcessingMode.PROCESS_CONTINUOUSLY, +// 100) + + // 只读一次 + val readOnceStream = env.readFile( + textInputFormat, + filePath, + FileProcessingMode.PROCESS_ONCE, + 0) + + readOnceStream.print() env.execute("read file from path") } } diff --git a/src/main/scala/com/flink/tutorials/scala/chapter7/TransactionWriteSinkExample.scala b/src/main/scala/com/flink/tutorials/scala/chapter7/TransactionWriteSinkExample.scala new file mode 100644 index 0000000..c22f567 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/chapter7/TransactionWriteSinkExample.scala @@ -0,0 +1,90 @@ +package com.flink.tutorials.scala.chapter7 + +import java.io.BufferedWriter +import java.nio.file.{Files, Path, Paths} +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter + +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.api.common.typeutils.base.{StringSerializer, VoidSerializer} +import org.apache.flink.configuration.{Configuration, RestOptions} +import org.apache.flink.streaming.api.functions.sink.{SinkFunction, TwoPhaseCommitSinkFunction} + +object TransactionWriteSinkExample { + + def main(args: Array[String]): Unit = { + val conf = new Configuration + // 访问 http://localhost:8082 可以看到Flink Web UI + conf.setInteger(RestOptions.PORT, 8082) + // 创建本地执行环境,并行度为1 + val env = StreamExecutionEnvironment.createLocalEnvironment(1, conf) + // 每隔5秒进行一次Checkpoint + env.getCheckpointConfig.setCheckpointInterval(5 * 1000) + val countStream = env.addSource(new CheckpointedSourceExample.CheckpointedSource) + // 每隔一定时间模拟一次失败 + val result = countStream.map(new CheckpointedSourceExample.FailingMapper(20)) + // 类Unix系统的临时文件夹在/tmp下 + // Windows用户需要修改这个目录 + val preCommitPath = "/tmp/flink-sink-precommit" + val commitedPath = "/tmp/flink-sink-commited" + if (!Files.exists(Paths.get(preCommitPath))) Files.createDirectory(Paths.get(preCommitPath)) + if (!Files.exists(Paths.get(commitedPath))) Files.createDirectory(Paths.get(commitedPath)) + // 使用Exactly-Once语义的Sink,执行本程序时可以查看相应的输出目录,查看数据 + result.addSink(new TwoPhaseFileSink(preCommitPath, commitedPath)) + // 数据打印到屏幕上,无Exactly-Once保障,有数据重发现象 + result.print() + env.execute("two file sink") + } + + + class TwoPhaseFileSink(var preCommitPath: String, var commitedPath: String) extends TwoPhaseCommitSinkFunction[(String, Int), String, Void](StringSerializer.INSTANCE, VoidSerializer.INSTANCE) { + // 缓存 + var transactionWriter: BufferedWriter = _ + + override def invoke(transaction: String, in: (String, Int), context: SinkFunction.Context[_]): Unit = { + transactionWriter.write(in._1 + " " + in._2 + "\n") + } + + override def beginTransaction: String = { + val time = LocalDateTime.now.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + val subTaskIdx = getRuntimeContext.getIndexOfThisSubtask + val fileName = time + "-" + subTaskIdx + val preCommitFilePath = Paths.get(preCommitPath + "/" + fileName) + // 创建一个存储本次事务的文件 + Files.createFile(preCommitFilePath) + transactionWriter = Files.newBufferedWriter(preCommitFilePath) + System.out.println("Transaction File: " + preCommitFilePath) + fileName + } + + // 将当前数据由内存写入磁盘 + override def preCommit(transaction: String): Unit = { + transactionWriter.flush() + transactionWriter.close() + } + + override def commit(transaction: String): Unit = { + val preCommitFilePath = Paths.get(preCommitPath + "/" + transaction) + if (Files.exists(preCommitFilePath)) { + val commitedFilePath = Paths.get(commitedPath + "/" + transaction) + try + Files.move(preCommitFilePath, commitedFilePath) + catch { + case e: Exception => + System.out.println(e) + } + } + } + + override def abort(transaction: String): Unit = { + val preCommitFilePath = Paths.get(preCommitPath + "/" + transaction) + // 如果中途遇到中断,将文件删除 + if (Files.exists(preCommitFilePath)) try + Files.delete(preCommitFilePath) + catch { + case e: Exception => + System.out.println(e) + } + } + } +} diff --git a/src/main/scala/com/flink/tutorials/scala/chapter8/SimpleExample.scala b/src/main/scala/com/flink/tutorials/scala/chapter8/SimpleExample.scala deleted file mode 100644 index b960ea2..0000000 --- a/src/main/scala/com/flink/tutorials/scala/chapter8/SimpleExample.scala +++ /dev/null @@ -1,5 +0,0 @@ -package com.flink.tutorials.scala.chapter8 - -object SimpleExample { - -} diff --git a/src/main/scala/com/flink/tutorials/scala/projects/stock/StockMediaConnectedDemo.scala b/src/main/scala/com/flink/tutorials/scala/projects/stock/StockMediaConnectedDemo.scala deleted file mode 100644 index ccd445e..0000000 --- a/src/main/scala/com/flink/tutorials/scala/projects/stock/StockMediaConnectedDemo.scala +++ /dev/null @@ -1,105 +0,0 @@ -package com.flink.tutorials.scala.projects.stock - -/* - -object StockMediaConnectedDemo { - - def main(args: Array[String]) { - - // 设置执行环境 - val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()) - - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - // 每5秒生成一个Watermark - env.getConfig.setAutoWatermarkInterval(5000L) - - // 股票价格数据流 - val stockPriceRawStream: DataStream[StockPrice] = env - // 该数据流由StockPriceSource类随机生成 - .addSource(new StockSource("stock/stock-tick-20200108.csv")) - // 设置 Timestamp 和 Watermark - .assignTimestampsAndWatermarks(new StockPriceTimeAssigner) - - val mediaStatusStream: DataStream[Media] = env - .addSource(new MediaSource) - - // 先将两个流connect,再进行keyBy - val keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream - .connect(mediaStatusStream) - .keyBy(0,0) - - // 先keyBy再connect - val keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0) - .connect(mediaStatusStream.keyBy(0)) - - val alert1 = keyByConnect1.flatMap(new AlertFlatMap).print() - - val alerts2 = keyByConnect2.flatMap(new AlertFlatMap).print() - - // 执行程序 - env.execute("connect stock price with media status") - } - - /** 媒体评价 - * - * symbol 股票代号 - * timestamp 时间戳 - * status 评价 正面/一般/负面 - */ - case class Media(symbol: String, timestamp: Long, status: String) - - class MediaSource extends RichSourceFunction[Media]{ - - var isRunning: Boolean = true - - val rand = new Random() - var stockId = 0 - - override def run(srcCtx: SourceContext[Media]): Unit = { - - while (isRunning) { - // 每次从列表中随机选择一只股票 - stockId = rand.nextInt(5) - - var status: String = "NORMAL" - if (rand.nextGaussian() > 0.9) { - status = "POSITIVE" - } else if (rand.nextGaussian() < 0.05) { - status = "NEGATIVE" - } - - val curTime = Calendar.getInstance.getTimeInMillis - - srcCtx.collect(Media(stockId.toString, curTime, status)) - - Thread.sleep(rand.nextInt(100)) - } - } - - override def cancel(): Unit = { - isRunning = false - } - } - - case class Alert(symbol: String, timestamp: Long, alert: String) - - class AlertFlatMap extends RichCoFlatMapFunction[StockPrice, Media, Alert] { - - var priceMaxThreshold: List[Double] = List(101.0d, 201.0d, 301.0d, 401.0d, 501.0d) - - var mediaLevel: String = "NORMAL" - - override def flatMap1(stock: StockPrice, collector: Collector[Alert]) : Unit = { - val stockId = stock.symbol.toInt - if ("POSITIVE".equals(mediaLevel) && stock.price > priceMaxThreshold(stockId)) { - collector.collect(Alert(stock.symbol, stock.ts, "POSITIVE")) - } - } - - override def flatMap2(media: Media, collector: Collector[Alert]): Unit = { - mediaLevel = media.status - } - } - -} -*/ diff --git a/src/main/scala/com/flink/tutorials/scala/projects/stock/StockPriceDemo.scala b/src/main/scala/com/flink/tutorials/scala/projects/stock/StockPriceDemo.scala index 70d6b93..ed53dd0 100644 --- a/src/main/scala/com/flink/tutorials/scala/projects/stock/StockPriceDemo.scala +++ b/src/main/scala/com/flink/tutorials/scala/projects/stock/StockPriceDemo.scala @@ -1,6 +1,7 @@ package com.flink.tutorials.scala.projects.stock import com.flink.tutorials.scala.utils.stock.{StockPrice, StockSource} +import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ @@ -22,7 +23,14 @@ object StockPriceDemo { val stockPriceRawStream: DataStream[StockPrice] = env .addSource(new StockSource("stock/stock-tick-20200108.csv")) // 设置 Timestamp 和 Watermark - .assignTimestampsAndWatermarks(new StockPriceTimeAssigner) + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forMonotonousTimestamps() + .withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] { + override def extractTimestamp(t: StockPrice, l: Long): Long = t.ts + } + ) + ) val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream .keyBy(_.symbol) @@ -38,8 +46,4 @@ object StockPriceDemo { env.execute("Compute max stock price") } - class StockPriceTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)) { - override def extractTimestamp(t: StockPrice): Long = t.ts - } - }