diff --git a/src/main/resources/state/UserBehavior-50.csv b/src/main/resources/state/UserBehavior-50.csv new file mode 100644 index 0000000..6c7e9ce --- /dev/null +++ b/src/main/resources/state/UserBehavior-50.csv @@ -0,0 +1,50 @@ +1,2268318,2520377,pv,1511544070 +1,2333346,2520771,pv,1511561733 +1,2576651,149192,pv,1511572885 +1,3830808,4181361,pv,1511593493 +1,4365585,2520377,pv,1511596146 +1,4606018,2735466,pv,1511616481 +1,230380,411153,pv,1511644942 +1,3827899,2920476,pv,1511713473 +1,3745169,2891509,pv,1511725471 +1,1531036,2920476,pv,1511733732 +100,2518420,3425094,pv,1511551443 +100,3763048,3425094,pv,1511551450 +100,2518420,3425094,pv,1511551495 +100,1953042,3425094,pv,1511551619 +100,3763048,3425094,fav,1511551860 +100,5100093,2945933,pv,1511552352 +100,704268,223690,pv,1511563606 +100,4115850,223690,fav,1511563834 +100,2379198,4869428,pv,1511564961 +100,2971043,4869428,fav,1511565222 +1000,1385281,2352202,pv,1511541853 +1000,5120034,1051370,cart,1511542034 +1000,1953489,238434,pv,1511543045 +1000,408860,551706,pv,1511543246 +1000,4590199,4338287,pv,1511543742 +1000,1976168,4338287,pv,1511543773 +1000,4590199,4338287,pv,1511543777 +1000,3722942,1051370,pv,1511570091 +1000,3059682,2440115,pv,1511571595 +1000,2599546,3995452,fav,1511571647 +1000001,1554801,2926020,pv,1512263339 +1000001,5081961,634390,pv,1512263483 +1000001,4210726,2926020,pv,1512263670 +1000001,4088463,174239,buy,1512264173 +1000001,5095065,1216617,pv,1512264337 +1000001,584577,1836480,pv,1512265168 +1000004,2156592,3607361,pv,1511539241 +1000004,1591982,672001,pv,1511539333 +1000004,3854178,4181361,pv,1511571789 +1000004,3999536,4756105,cart,1511571820 +1000004,4324270,3607361,pv,1511966092 +1000004,5125742,3607361,pv,1511966096 +1000004,571666,4756105,pv,1511966112 +1000004,1975544,2355072,pv,1511966126 +1000004,4324270,3607361,pv,1511966135 +1000004,1757180,3607361,pv,1511966181 +1000004,899145,3607361,pv,1511966192 +1000004,2111992,1080785,pv,1511966212 +1000004,1317750,1080785,cart,1511966265 +1000004,4457235,1080785,pv,1511966306 diff --git a/src/main/scala/com/flink/tutorials/scala/api/state/MapStateExample.scala b/src/main/scala/com/flink/tutorials/scala/api/state/MapStateExample.scala new file mode 100644 index 0000000..7d18919 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/api/state/MapStateExample.scala @@ -0,0 +1,104 @@ +package com.flink.tutorials.scala.api.state + +import java.io.InputStream + +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.RichSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.util.Collector + +object MapStateExample { + + /** + * 用户行为 + * categoryId为商品类目ID + * behavior包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav) + * */ + case class UserBehavior(userId: Long, + itemId: Long, + categoryId: Int, + behavior: String, + timestamp: Long) + + class MapStateFunction extends RichFlatMapFunction[UserBehavior, (Long, String, Int)] { + + // 指向MapState的句柄 + private var behaviorMapState: MapState[String, Int] = _ + + override def open(parameters: Configuration): Unit = { + // 创建StateDescriptor + val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int]) + // 通过StateDescriptor获取运行时上下文中的状态 + behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor) + } + + override def flatMap(input: UserBehavior, collector: Collector[(Long, String, Int)]): Unit = { + var behaviorCnt = 1 + // behavior有可能为pv、cart、fav、buy等 + // 判断状态中是否有该behavior + if (behaviorMapState.contains(input.behavior)) { + behaviorCnt = behaviorMapState.get(input.behavior) + 1 + } + // 更新状态 + behaviorMapState.put(input.behavior, behaviorCnt) + collector.collect((input.userId, input.behavior, behaviorCnt)) + } + } + + def main(args: Array[String]): Unit = { + + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(8) + + // 获取数据源 + val sourceStream: DataStream[UserBehavior] = env + .addSource(new UserBehaviorSource("state/UserBehavior-50.csv")).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[UserBehavior]() { + override def extractAscendingTimestamp(userBehavior: UserBehavior): Long = { + // 原始数据单位为秒,乘以1000转换成毫秒 + userBehavior.timestamp * 1000 + } + } + ) + + // 生成一个KeyedStream + val keyedStream = sourceStream.keyBy(user => user.userId) + + // 在KeyedStream上进行flatMap + val behaviorCountStream = keyedStream.flatMap(new MapStateFunction) + + behaviorCountStream.print() + + env.execute("state example") + } + + class UserBehaviorSource(path: String) extends RichSourceFunction[UserBehavior] { + + var isRunning: Boolean = true + // 输入源 + var streamSource: InputStream = _ + + override def run(sourceContext: SourceContext[UserBehavior]): Unit = { + // 从项目的resources目录获取输入 + streamSource = MapStateExample.getClass.getClassLoader.getResourceAsStream(path) + val lines: Iterator[String] = scala.io.Source.fromInputStream(streamSource).getLines + while (isRunning && lines.hasNext) { + val line = lines.next() + val itemStrArr = line.split(",") + val userBehavior = UserBehavior(itemStrArr(0).toLong, itemStrArr(1).toLong, itemStrArr(2).toInt, itemStrArr(3), itemStrArr(4).toLong) + sourceContext.collect(userBehavior) + } + } + + override def cancel(): Unit = { + streamSource.close() + isRunning = false + } + } + +}