Skip to content

Commit

Permalink
connect
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Dec 25, 2019
1 parent a2d2d23 commit ca955e8
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Scala Library, provided by Flink as well. -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.flink.tutorials.api.transformations

import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.scala._

object SimpleConnectExample {

def main(args: Array[String]): Unit = {

val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val intStream: DataStream[Int] = senv.fromElements(1, 0, 9, 2, 3, 6)
val stringStream: DataStream[String] = senv.fromElements("LOW", "HIGH", "LOW", "LOW")

val connectedStream: ConnectedStreams[Int, String] = intStream.connect(stringStream)

// CoMapFunction三个泛型分别对应第一个流的输入、第二个流的输入,map之后的输出
class MyCoMapFunction extends CoMapFunction[Int, String, String] {

override def map1(input1: Int): String = input1.toString

override def map2(input2: String): String = input2
}

val mapResult = connectedStream.map(new MyCoMapFunction)

senv.execute("simple connect transformation")

}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.flink.tutorials.demos.stock

import java.util.Calendar

import com.flink.tutorials.demos.stock.StockPriceDemo.{StockPrice, StockPriceSource, StockPriceTimeAssigner}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import scala.util.Random

object StockMediaConnectedDemo {

def main(args: Array[String]) {

// 设置执行环境
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 每5秒生成一个Watermark
env.getConfig.setAutoWatermarkInterval(5000L)

// 股票价格数据流
val stockPriceRawStream: DataStream[StockPrice] = env
// 该数据流由StockPriceSource类随机生成
.addSource(new StockPriceSource)
// 设置 Timestamp 和 Watermark
.assignTimestampsAndWatermarks(new StockPriceTimeAssigner)

val mediaStatusStream: DataStream[Media] = env
.addSource(new MediaSource)

// 先将两个流connect,再进行keyBy
val keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream
.connect(mediaStatusStream)
.keyBy(0,0)

// 先keyBy再connect
val keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0)
.connect(mediaStatusStream.keyBy(0))

val alert1 = keyByConnect1.flatMap(new AlertFlatMap).print()

val alerts2 = keyByConnect2.flatMap(new AlertFlatMap).print()

// 执行程序
env.execute("connect stock price with media status")
}

/** 媒体评价
*
* symbol 股票代号
* timestamp 时间戳
* status 评价 正面/一般/负面
*/
case class Media(symbol: String, timestamp: Long, status: String)

class MediaSource extends RichSourceFunction[Media]{

var isRunning: Boolean = true

val rand = new Random()
var stockId = 0

override def run(srcCtx: SourceContext[Media]): Unit = {

while (isRunning) {
// 每次从列表中随机选择一只股票
stockId = rand.nextInt(5)

var status: String = "NORMAL"
if (rand.nextGaussian() > 0.9) {
status = "POSITIVE"
} else if (rand.nextGaussian() < 0.05) {
status = "NEGATIVE"
}

val curTime = Calendar.getInstance.getTimeInMillis

srcCtx.collect(Media(stockId.toString, curTime, status))

Thread.sleep(rand.nextInt(100))
}
}

override def cancel(): Unit = {
isRunning = false
}
}

case class Alert(symbol: String, timestamp: Long, alert: String)

class AlertFlatMap extends RichCoFlatMapFunction[StockPrice, Media, Alert] {

var priceMaxThreshold: List[Double] = List(101.0d, 201.0d, 301.0d, 401.0d, 501.0d)

var mediaLevel: String = "NORMAL"

override def flatMap1(stock: StockPrice, collector: Collector[Alert]) : Unit = {
val stockId = stock.symbol.toInt
if ("POSITIVE".equals(mediaLevel) && stock.price > priceMaxThreshold(stockId)) {
collector.collect(Alert(stock.symbol, stock.timestamp, "POSITIVE"))
}
}

override def flatMap2(media: Media, collector: Collector[Alert]): Unit = {
mediaLevel = media.status
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object StockPriceDemo {
val curTime = Calendar.getInstance.getTimeInMillis

// 将数据源收集写入SourceContext
srcCtx.collect(StockPrice("symbol_" + stockId.toString, curTime, curPrice))
srcCtx.collect(StockPrice(stockId.toString, curTime, curPrice))
Thread.sleep(rand.nextInt(10))
}
}
Expand Down

0 comments on commit ca955e8

Please sign in to comment.