Skip to content

Commit

Permalink
StockPrice & WikiEdits Example
Browse files Browse the repository at this point in the history
Mapj Transformation
  • Loading branch information
luweizheng committed Dec 12, 2019
1 parent 98d8efe commit ebd1616
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 8 deletions.
24 changes: 16 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.
<version>0.1</version>
<packaging>jar</packaging>

<name>Flink Quickstart Job</name>
<name>Flink Tutorials</name>
<url>http://www.myorganization.org</url>

<repositories>
Expand Down Expand Up @@ -59,6 +59,11 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -118,12 +123,9 @@ under the License.
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
<includes>
<include>org.apache.flink:flink-connector-wikiedits_2.11</include>
</includes>
</artifactSet>
<filters>
<filter>
Expand All @@ -139,7 +141,7 @@ under the License.
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>quickstart.StreamingJob</mainClass>
<mainClass>com.flink.tutorials.demos.stock.StockPriceDemo</mainClass>
</transformer>
</transformers>
</configuration>
Expand Down Expand Up @@ -269,6 +271,12 @@ under the License.
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.flink.tutorials.api.transformations

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala._

object MapExample {
def main(args: Array[String]): Unit = {
// 创建 Flink 执行环境
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)

// Lambda函数 =>
val lambda = dataStream.map ( input => input * 2 ).print()

// Lambda函数 _
val lambda2 = dataStream.map { _ * 2}.print()

// 继承RichMapFunction
// 第一个参数是输入,第二个参数是输出
class DoubleMapFunction extends RichMapFunction[Int, Int] {
def map(in: Int):Int = { in * 2 }
};

// 匿名函数
val anonymousFunction = dataStream.map {new RichMapFunction[Int, Int] {
def map(input: Int): Int = {
input * 2
}
}}.print()

val richFunction = dataStream.map {new DoubleMapFunction()}.print()

senv.execute("Basic Map Transformation")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.flink.tutorials.demos.stock

import java.util.Calendar

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}

import scala.util.Random

object StockPriceDemo {

/**
* Case Class StockPrice
* symbol 股票代号
* timestamp 时间戳
* price 价格
*/
case class StockPrice(symbol: String, timestamp: Long, price: Double)

def main(args: Array[String]) {

// 设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 每5秒生成一个Watermark
env.getConfig.setAutoWatermarkInterval(5000L)

// 股票价格数据流
val stockPriceRawStream: DataStream[StockPrice] = env
// 该数据流由StockPriceSource类随机生成
.addSource(new StockPriceSource)
// 设置 Timestamp 和 Watermark
.assignTimestampsAndWatermarks(new StockPriceTimeAssigner)

val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream
.keyBy(_.symbol)
// 设置5秒的时间窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 取5秒内某一只股票的最大值
.max("price")

// 打印结果
stockPriceStream.print()

// 执行程序
env.execute("Compute max stock price")
}

class StockPriceSource extends RichSourceFunction[StockPrice]{

var isRunning: Boolean = true

val rand = new Random()
// 初始化股票价格
var priceList: List[Double] = List(100.0d, 200.0d, 300.0d, 400.0d, 500.0d)
var stockId = 0
var curPrice = 0.0d

override def run(srcCtx: SourceContext[StockPrice]): Unit = {

while (isRunning) {
// 每次从列表中随机选择一只股票
stockId = rand.nextInt(priceList.size)

val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05
priceList = priceList.updated(stockId, curPrice)
val curTime = Calendar.getInstance.getTimeInMillis

// 将数据源收集写入SourceContext
srcCtx.collect(StockPrice("symbol_" + stockId.toString, curTime, curPrice))
Thread.sleep(rand.nextInt(10))
}
}

override def cancel(): Unit = {
isRunning = false
}
}

class StockPriceTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)) {
override def extractTimestamp(t: StockPrice): Long = t.timestamp
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.flink.tutorials.demos.wikiedits

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource
object WikipediaAnalysis {
def main(args: Array[String]) : Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 需要外网环境
val edits = env.addSource(new WikipediaEditsSource)

val result = edits
.map(line =>(line.isBotEdit,1))
.keyBy(0)
.timeWindow(Time.seconds(1))
.sum(1)

result.print()

env.execute("Wikipedia Edit streaming")
}
}

0 comments on commit ebd1616

Please sign in to comment.