From ebd1616a9b0e43108a0da71b84fa95789bab8752 Mon Sep 17 00:00:00 2001 From: luweizheng Date: Thu, 12 Dec 2019 09:32:03 +0800 Subject: [PATCH] StockPrice & WikiEdits Example Mapj Transformation --- pom.xml | 24 +++-- .../api/transformations/MapExample.scala | 36 ++++++++ .../demos/stock/StockPriceDemo.scala | 90 +++++++++++++++++++ .../demos/wikiedits/WikipediaAnalysis.scala | 23 +++++ 4 files changed, 165 insertions(+), 8 deletions(-) create mode 100644 src/main/scala/com/flink/tutorials/api/transformations/MapExample.scala create mode 100644 src/main/scala/com/flink/tutorials/demos/stock/StockPriceDemo.scala create mode 100644 src/main/scala/com/flink/tutorials/demos/wikiedits/WikipediaAnalysis.scala diff --git a/pom.xml b/pom.xml index 3edd6ed..449c6c6 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ under the License. 0.1 jar - Flink Quickstart Job + Flink Tutorials http://www.myorganization.org @@ -59,6 +59,11 @@ under the License. ${flink.version} provided + + org.apache.flink + flink-connector-wikiedits_${scala.binary.version} + ${flink.version} + org.apache.flink flink-streaming-scala_${scala.binary.version} @@ -118,12 +123,9 @@ under the License. - - org.apache.flink:force-shading - com.google.code.findbugs:jsr305 - org.slf4j:* - log4j:* - + + org.apache.flink:flink-connector-wikiedits_2.11 + @@ -139,7 +141,7 @@ under the License. - quickstart.StreamingJob + com.flink.tutorials.demos.stock.StockPriceDemo @@ -269,6 +271,12 @@ under the License. ${scala.version} compile + + org.apache.flink + flink-connector-wikiedits_${scala.binary.version} + ${flink.version} + compile + diff --git a/src/main/scala/com/flink/tutorials/api/transformations/MapExample.scala b/src/main/scala/com/flink/tutorials/api/transformations/MapExample.scala new file mode 100644 index 0000000..b1d8fb6 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/api/transformations/MapExample.scala @@ -0,0 +1,36 @@ +package com.flink.tutorials.api.transformations + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.streaming.api.scala._ + +object MapExample { + def main(args: Array[String]): Unit = { + // 创建 Flink 执行环境 + val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + + val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8) + + // Lambda函数 => + val lambda = dataStream.map ( input => input * 2 ).print() + + // Lambda函数 _ + val lambda2 = dataStream.map { _ * 2}.print() + + // 继承RichMapFunction + // 第一个参数是输入,第二个参数是输出 + class DoubleMapFunction extends RichMapFunction[Int, Int] { + def map(in: Int):Int = { in * 2 } + }; + + // 匿名函数 + val anonymousFunction = dataStream.map {new RichMapFunction[Int, Int] { + def map(input: Int): Int = { + input * 2 + } + }}.print() + + val richFunction = dataStream.map {new DoubleMapFunction()}.print() + + senv.execute("Basic Map Transformation") + } +} diff --git a/src/main/scala/com/flink/tutorials/demos/stock/StockPriceDemo.scala b/src/main/scala/com/flink/tutorials/demos/stock/StockPriceDemo.scala new file mode 100644 index 0000000..e34b6b6 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/demos/stock/StockPriceDemo.scala @@ -0,0 +1,90 @@ +package com.flink.tutorials.demos.stock + +import java.util.Calendar + +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.functions.source.RichSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows} + +import scala.util.Random + +object StockPriceDemo { + + /** + * Case Class StockPrice + * symbol 股票代号 + * timestamp 时间戳 + * price 价格 + */ + case class StockPrice(symbol: String, timestamp: Long, price: Double) + + def main(args: Array[String]) { + + // 设置执行环境 + val env = StreamExecutionEnvironment.getExecutionEnvironment + + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + // 每5秒生成一个Watermark + env.getConfig.setAutoWatermarkInterval(5000L) + + // 股票价格数据流 + val stockPriceRawStream: DataStream[StockPrice] = env + // 该数据流由StockPriceSource类随机生成 + .addSource(new StockPriceSource) + // 设置 Timestamp 和 Watermark + .assignTimestampsAndWatermarks(new StockPriceTimeAssigner) + + val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream + .keyBy(_.symbol) + // 设置5秒的时间窗口 + .window(TumblingEventTimeWindows.of(Time.seconds(5))) + // 取5秒内某一只股票的最大值 + .max("price") + + // 打印结果 + stockPriceStream.print() + + // 执行程序 + env.execute("Compute max stock price") + } + + class StockPriceSource extends RichSourceFunction[StockPrice]{ + + var isRunning: Boolean = true + + val rand = new Random() + // 初始化股票价格 + var priceList: List[Double] = List(100.0d, 200.0d, 300.0d, 400.0d, 500.0d) + var stockId = 0 + var curPrice = 0.0d + + override def run(srcCtx: SourceContext[StockPrice]): Unit = { + + while (isRunning) { + // 每次从列表中随机选择一只股票 + stockId = rand.nextInt(priceList.size) + + val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05 + priceList = priceList.updated(stockId, curPrice) + val curTime = Calendar.getInstance.getTimeInMillis + + // 将数据源收集写入SourceContext + srcCtx.collect(StockPrice("symbol_" + stockId.toString, curTime, curPrice)) + Thread.sleep(rand.nextInt(10)) + } + } + + override def cancel(): Unit = { + isRunning = false + } + } + + class StockPriceTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)) { + override def extractTimestamp(t: StockPrice): Long = t.timestamp + } + +} diff --git a/src/main/scala/com/flink/tutorials/demos/wikiedits/WikipediaAnalysis.scala b/src/main/scala/com/flink/tutorials/demos/wikiedits/WikipediaAnalysis.scala new file mode 100644 index 0000000..d50250a --- /dev/null +++ b/src/main/scala/com/flink/tutorials/demos/wikiedits/WikipediaAnalysis.scala @@ -0,0 +1,23 @@ +package com.flink.tutorials.demos.wikiedits + +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource +object WikipediaAnalysis { + def main(args: Array[String]) : Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + // 需要外网环境 + val edits = env.addSource(new WikipediaEditsSource) + + val result = edits + .map(line =>(line.isBotEdit,1)) + .keyBy(0) + .timeWindow(Time.seconds(1)) + .sum(1) + + result.print() + + env.execute("Wikipedia Edit streaming") + } +}