Skip to content

Commit

Permalink
keyed map state
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Jan 28, 2020
1 parent 76cdba2 commit 9ac67db
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 0 deletions.
50 changes: 50 additions & 0 deletions src/main/resources/state/UserBehavior-50.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
1,2268318,2520377,pv,1511544070
1,2333346,2520771,pv,1511561733
1,2576651,149192,pv,1511572885
1,3830808,4181361,pv,1511593493
1,4365585,2520377,pv,1511596146
1,4606018,2735466,pv,1511616481
1,230380,411153,pv,1511644942
1,3827899,2920476,pv,1511713473
1,3745169,2891509,pv,1511725471
1,1531036,2920476,pv,1511733732
100,2518420,3425094,pv,1511551443
100,3763048,3425094,pv,1511551450
100,2518420,3425094,pv,1511551495
100,1953042,3425094,pv,1511551619
100,3763048,3425094,fav,1511551860
100,5100093,2945933,pv,1511552352
100,704268,223690,pv,1511563606
100,4115850,223690,fav,1511563834
100,2379198,4869428,pv,1511564961
100,2971043,4869428,fav,1511565222
1000,1385281,2352202,pv,1511541853
1000,5120034,1051370,cart,1511542034
1000,1953489,238434,pv,1511543045
1000,408860,551706,pv,1511543246
1000,4590199,4338287,pv,1511543742
1000,1976168,4338287,pv,1511543773
1000,4590199,4338287,pv,1511543777
1000,3722942,1051370,pv,1511570091
1000,3059682,2440115,pv,1511571595
1000,2599546,3995452,fav,1511571647
1000001,1554801,2926020,pv,1512263339
1000001,5081961,634390,pv,1512263483
1000001,4210726,2926020,pv,1512263670
1000001,4088463,174239,buy,1512264173
1000001,5095065,1216617,pv,1512264337
1000001,584577,1836480,pv,1512265168
1000004,2156592,3607361,pv,1511539241
1000004,1591982,672001,pv,1511539333
1000004,3854178,4181361,pv,1511571789
1000004,3999536,4756105,cart,1511571820
1000004,4324270,3607361,pv,1511966092
1000004,5125742,3607361,pv,1511966096
1000004,571666,4756105,pv,1511966112
1000004,1975544,2355072,pv,1511966126
1000004,4324270,3607361,pv,1511966135
1000004,1757180,3607361,pv,1511966181
1000004,899145,3607361,pv,1511966192
1000004,2111992,1080785,pv,1511966212
1000004,1317750,1080785,cart,1511966265
1000004,4457235,1080785,pv,1511966306
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.flink.tutorials.scala.api.state

import java.io.InputStream

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object MapStateExample {

/**
* 用户行为
* categoryId为商品类目ID
* behavior包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav)
* */
case class UserBehavior(userId: Long,
itemId: Long,
categoryId: Int,
behavior: String,
timestamp: Long)

class MapStateFunction extends RichFlatMapFunction[UserBehavior, (Long, String, Int)] {

// 指向MapState的句柄
private var behaviorMapState: MapState[String, Int] = _

override def open(parameters: Configuration): Unit = {
// 创建StateDescriptor
val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int])
// 通过StateDescriptor获取运行时上下文中的状态
behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor)
}

override def flatMap(input: UserBehavior, collector: Collector[(Long, String, Int)]): Unit = {
var behaviorCnt = 1
// behavior有可能为pv、cart、fav、buy等
// 判断状态中是否有该behavior
if (behaviorMapState.contains(input.behavior)) {
behaviorCnt = behaviorMapState.get(input.behavior) + 1
}
// 更新状态
behaviorMapState.put(input.behavior, behaviorCnt)
collector.collect((input.userId, input.behavior, behaviorCnt))
}
}

def main(args: Array[String]): Unit = {

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(8)

// 获取数据源
val sourceStream: DataStream[UserBehavior] = env
.addSource(new UserBehaviorSource("state/UserBehavior-50.csv")).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[UserBehavior]() {
override def extractAscendingTimestamp(userBehavior: UserBehavior): Long = {
// 原始数据单位为秒,乘以1000转换成毫秒
userBehavior.timestamp * 1000
}
}
)

// 生成一个KeyedStream
val keyedStream = sourceStream.keyBy(user => user.userId)

// 在KeyedStream上进行flatMap
val behaviorCountStream = keyedStream.flatMap(new MapStateFunction)

behaviorCountStream.print()

env.execute("state example")
}

class UserBehaviorSource(path: String) extends RichSourceFunction[UserBehavior] {

var isRunning: Boolean = true
// 输入源
var streamSource: InputStream = _

override def run(sourceContext: SourceContext[UserBehavior]): Unit = {
// 从项目的resources目录获取输入
streamSource = MapStateExample.getClass.getClassLoader.getResourceAsStream(path)
val lines: Iterator[String] = scala.io.Source.fromInputStream(streamSource).getLines
while (isRunning && lines.hasNext) {
val line = lines.next()
val itemStrArr = line.split(",")
val userBehavior = UserBehavior(itemStrArr(0).toLong, itemStrArr(1).toLong, itemStrArr(2).toInt, itemStrArr(3), itemStrArr(4).toLong)
sourceContext.collect(userBehavior)
}
}

override def cancel(): Unit = {
streamSource.close()
isRunning = false
}
}

}

0 comments on commit 9ac67db

Please sign in to comment.