diff --git a/src/main/java/com/flink/tutorials/java/api/state/BroadcastStateExample.java b/src/main/java/com/flink/tutorials/java/api/state/BroadcastStateExample.java new file mode 100644 index 0000000..354908d --- /dev/null +++ b/src/main/java/com/flink/tutorials/java/api/state/BroadcastStateExample.java @@ -0,0 +1,112 @@ +package com.flink.tutorials.java.api.state; + +import com.flink.tutorials.java.utils.taobao.BehaviorPattern; +import com.flink.tutorials.java.utils.taobao.UserBehavior; +import com.flink.tutorials.java.utils.taobao.UserBehaviorSource; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; +import org.apache.flink.util.Collector; + +public class BroadcastStateExample { + + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + // 主数据流 + DataStream userBehaviorStream = env.addSource(new UserBehaviorSource("taobao/UserBehavior-20171201.csv")) + .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { + @Override + public long extractAscendingTimestamp(UserBehavior userBehavior) { + // 原始数据单位为秒,乘以1000转换成毫秒 + return userBehavior.timestamp * 1000; + } + }); + + // BehaviorPattern数据流 + DataStream patternStream = env.fromElements(new BehaviorPattern("pv", "buy")); + + // Broadcast State只能使用 Key->Value 结构,基于MapStateDescriptor + MapStateDescriptor broadcastStateDescriptor = new MapStateDescriptor<>("behaviorPattern", Types.VOID, Types.POJO(BehaviorPattern.class)); + BroadcastStream broadcastStream = patternStream.broadcast(broadcastStateDescriptor); + + // 生成一个KeyedStream + KeyedStream keyedStream = userBehaviorStream.keyBy(user -> user.userId); + + // 在KeyedStream上进行connect和process + DataStream> matchedStream = keyedStream + .connect(broadcastStream) + .process(new BroadcastPatternFunction()); + + matchedStream.print(); + + env.execute("broadcast taobao example"); + } + + /** + * 四个泛型分别为: + * 1. KeyedStream中Key的数据类型 + * 2. 主数据流的数据类型 + * 3. 广播流的数据类型 + * 4. 输出类型 + * */ + public static class BroadcastPatternFunction + extends KeyedBroadcastProcessFunction> { + + // 用户上次行为状态句柄,每个用户存储一个状态 + private ValueState lastBehaviorState; + // Broadcast State Descriptor + private MapStateDescriptor bcPatternDesc; + + @Override + public void open(Configuration configuration) { + lastBehaviorState = getRuntimeContext().getState( + new ValueStateDescriptor("lastBehaviorState", Types.STRING)); + bcPatternDesc = new MapStateDescriptor("behaviorPattern", Types.VOID, Types.POJO(BehaviorPattern.class)); + } + + @Override + public void processBroadcastElement(BehaviorPattern pattern, + Context context, + Collector> collector) throws Exception { + BroadcastState bcPatternState = context.getBroadcastState(bcPatternDesc); + // 将新数据更新至Broadcast State,这里使用一个null作为Key + // 在本场景中所有数据都共享一个Pattern,因此这里伪造了一个Key + bcPatternState.put(null, pattern); + } + + @Override + public void processElement(UserBehavior userBehavior, + ReadOnlyContext context, + Collector> collector) throws Exception { + + // 获取最新的Broadcast State + BehaviorPattern pattern = context.getBroadcastState(bcPatternDesc).get(null); + String lastBehavior = lastBehaviorState.value(); + if (pattern != null && lastBehavior != null) { + // 用户之前有过行为,检查是否符合给定的模式 + if (pattern.firstBehavior.equals(lastBehavior) && + pattern.secondBehavior.equals(userBehavior.behavior)) { + // 当前用户行为符合模式 + collector.collect(Tuple2.of(userBehavior.userId, pattern)); + } + } + lastBehaviorState.update(userBehavior.behavior); + } + } +} diff --git a/src/main/java/com/flink/tutorials/java/api/state/MapStateExample.java b/src/main/java/com/flink/tutorials/java/api/state/MapStateExample.java new file mode 100644 index 0000000..82a2265 --- /dev/null +++ b/src/main/java/com/flink/tutorials/java/api/state/MapStateExample.java @@ -0,0 +1,71 @@ +package com.flink.tutorials.java.api.state; + +import com.flink.tutorials.java.utils.taobao.UserBehavior; +import com.flink.tutorials.java.utils.taobao.UserBehaviorSource; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; +import org.apache.flink.util.Collector; + +public class MapStateExample { + + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + DataStream userBehaviorStream = env.addSource(new UserBehaviorSource("taobao/UserBehavior-20171201.csv")) + .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { + @Override + public long extractAscendingTimestamp(UserBehavior userBehavior) { + // 原始数据单位为秒,乘以1000转换成毫秒 + return userBehavior.timestamp * 1000; + } + }); + + // 生成一个KeyedStream + KeyedStream keyedStream = userBehaviorStream.keyBy(user -> user.userId); + + // 在KeyedStream上进行flatMap + DataStream> behaviorCountStream= keyedStream.flatMap(new MapStateFunction()); + + behaviorCountStream.print(); + + env.execute("taobao map state example"); + } + + public static class MapStateFunction extends RichFlatMapFunction> { + + // 指向MapState的句柄 + private MapState behaviorMapState; + + @Override + public void open(Configuration configuration) { + // 创建StateDescriptor + MapStateDescriptor behaviorMapStateDescriptor = new MapStateDescriptor("behaviorMap", Types.STRING, Types.INT); + // 通过StateDescriptor获取运行时上下文中的状态 + behaviorMapState = getRuntimeContext().getMapState(behaviorMapStateDescriptor); + } + + @Override + public void flatMap(UserBehavior input, Collector> out) throws Exception { + int behaviorCnt = 1; + // behavior有可能为pv、cart、fav、buy等 + // 判断状态中是否有该behavior + if (behaviorMapState.contains(input.behavior)) { + behaviorCnt = behaviorMapState.get(input.behavior) + 1; + } + // 更新状态 + behaviorMapState.put(input.behavior, behaviorCnt); + out.collect(Tuple3.of(input.userId, input.behavior, behaviorCnt)); + } + } +} diff --git a/src/main/java/com/flink/tutorials/java/utils/taobao/BehaviorPattern.java b/src/main/java/com/flink/tutorials/java/utils/taobao/BehaviorPattern.java new file mode 100644 index 0000000..5f3ca05 --- /dev/null +++ b/src/main/java/com/flink/tutorials/java/utils/taobao/BehaviorPattern.java @@ -0,0 +1,23 @@ +package com.flink.tutorials.java.utils.taobao; + +/** + * 行为模式 + * 整个模式简化为两个行为 + * */ +public class BehaviorPattern { + + public String firstBehavior; + public String secondBehavior; + + public BehaviorPattern() {} + + public BehaviorPattern(String firstBehavior, String secondBehavior) { + this.firstBehavior = firstBehavior; + this.secondBehavior = secondBehavior; + } + + @Override + public String toString() { + return "first: " + firstBehavior + ", second: " + secondBehavior; + } +} diff --git a/src/main/java/com/flink/tutorials/java/utils/taobao/UserBehavior.java b/src/main/java/com/flink/tutorials/java/utils/taobao/UserBehavior.java new file mode 100644 index 0000000..91f9ecd --- /dev/null +++ b/src/main/java/com/flink/tutorials/java/utils/taobao/UserBehavior.java @@ -0,0 +1,29 @@ +package com.flink.tutorials.java.utils.taobao; + +public class UserBehavior { + public long userId; + public long itemId; + public int categoryId; + public String behavior; + public long timestamp; + + public UserBehavior() {} + + public UserBehavior(long userId, long itemId, int categoryId, String behavior, long timestamp) { + this.userId = userId; + this.itemId = itemId; + this.categoryId = categoryId; + this.behavior = behavior; + this.timestamp = timestamp; + } + + public static UserBehavior of(long userId, long itemId, int categoryId, String behavior, long timestamp) { + return new UserBehavior(userId, itemId, categoryId, behavior, timestamp); + } + + @Override + public String toString() { + return "(" + userId + "," + itemId + "," + categoryId + "," + + behavior + "," + timestamp + ")"; + } +} diff --git a/src/main/java/com/flink/tutorials/java/utils/taobao/UserBehaviorSource.java b/src/main/java/com/flink/tutorials/java/utils/taobao/UserBehaviorSource.java new file mode 100644 index 0000000..36ee542 --- /dev/null +++ b/src/main/java/com/flink/tutorials/java/utils/taobao/UserBehaviorSource.java @@ -0,0 +1,60 @@ +package com.flink.tutorials.java.utils.taobao; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +public class UserBehaviorSource implements SourceFunction { + + private boolean isRunning = true; + private String path; + private InputStream streamSource; + + public UserBehaviorSource(String path) { + this.path = path; + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + // 从项目的resources目录获取输入 + streamSource = this.getClass().getClassLoader().getResourceAsStream(path); + BufferedReader br = new BufferedReader(new InputStreamReader(streamSource)); + String line; + boolean isFirstLine = true; + long timeDiff = 0; + long lastEventTs = 0; + while (isRunning && (line = br.readLine()) != null) { + String[] itemStrArr = line.split(","); + long eventTs = Long.parseLong(itemStrArr[4]); + if (isFirstLine) { + // 从第一行数据提取时间戳 + lastEventTs = eventTs; + isFirstLine = false; + } + UserBehavior userBehavior = UserBehavior.of(Long.parseLong(itemStrArr[0]), + Long.parseLong(itemStrArr[1]), Integer.parseInt(itemStrArr[2]), + itemStrArr[3], eventTs); + // 输入文件中的时间戳是从小到大排列的 + // 新读入的行如果比上一行大,sleep,这样来模拟一个有时间间隔的输入流 + timeDiff = eventTs - lastEventTs; + if (timeDiff > 0) + Thread.sleep(timeDiff * 1000); + sourceContext.collect(userBehavior); + lastEventTs = eventTs; + } + } + + // 停止发送数据 + @Override + public void cancel() { + try { + streamSource.close(); + } catch (Exception e) { + System.out.println(e.toString()); + } + isRunning = false; + } + +} diff --git a/src/main/resources/state/UserBehavior-20171201.csv b/src/main/resources/taobao/UserBehavior-20171201.csv similarity index 100% rename from src/main/resources/state/UserBehavior-20171201.csv rename to src/main/resources/taobao/UserBehavior-20171201.csv diff --git a/src/main/scala/com/flink/tutorials/scala/api/state/BroadcastStateExample.scala b/src/main/scala/com/flink/tutorials/scala/api/state/BroadcastStateExample.scala index ad93e9d..fe91ac7 100644 --- a/src/main/scala/com/flink/tutorials/scala/api/state/BroadcastStateExample.scala +++ b/src/main/scala/com/flink/tutorials/scala/api/state/BroadcastStateExample.scala @@ -1,14 +1,12 @@ package com.flink.tutorials.scala.api.state -import java.io.InputStream -import org.apache.flink.api.common.state.{BroadcastState, MapState, MapStateDescriptor, ValueState, ValueStateDescriptor} +import com.flink.tutorials.scala.utils.taobao.{BehaviorPattern, UserBehavior, UserBehaviorSource} +import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction -import org.apache.flink.streaming.api.functions.source.RichSourceFunction -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector @@ -23,7 +21,7 @@ object BroadcastStateExample { // 获取数据源 val userBehaviorStream: DataStream[UserBehavior] = env - .addSource(new UserBehaviorSource("state/UserBehavior-20171201.csv")).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[UserBehavior]() { + .addSource(new UserBehaviorSource("taobao/UserBehavior-20171201.csv")).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[UserBehavior]() { override def extractAscendingTimestamp(userBehavior: UserBehavior): Long = { // 原始数据单位为秒,乘以1000转换成毫秒 userBehavior.timestamp * 1000 @@ -48,26 +46,9 @@ object BroadcastStateExample { matchedStream.print() - env.execute("broadcast state example") + env.execute("broadcast taobao example") } - /** - * 用户行为 - * categoryId为商品类目ID - * behavior包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav) - * */ - case class UserBehavior(userId: Long, - itemId: Long, - categoryId: Int, - behavior: String, - timestamp: Long) - - /** - * 行为模式 - * 整个模式简化为两个行为 - * */ - case class BehaviorPattern(firstBehavior: String, secondBehavior: String) - /** * 四个泛型分别为: * 1. KeyedStream中Key的数据类型 @@ -78,7 +59,7 @@ object BroadcastStateExample { class BroadcastPatternFunction extends KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)] { - // 用户上次性能状态句柄,每个用户存储一个状态 + // 用户上次行为状态句柄,每个用户存储一个状态 private var lastBehaviorState: ValueState[String] = _ // Broadcast State Descriptor private var bcPatternDesc: MapStateDescriptor[Void, BehaviorPattern] = _ @@ -116,49 +97,9 @@ object BroadcastStateExample { if (pattern.firstBehavior.equals(lastBehavior) && pattern.secondBehavior.equals(userBehavior.behavior)) // 当前用户行为符合模式 - collector.collect((userBehavior.userId, pattern)) + collector.collect((userBehavior.userId, pattern)) } lastBehaviorState.update(userBehavior.behavior) } } - - class UserBehaviorSource(path: String) extends RichSourceFunction[UserBehavior] { - - var isRunning: Boolean = true - // 输入源 - var streamSource: InputStream = _ - - override def run(sourceContext: SourceContext[UserBehavior]): Unit = { - // 从项目的resources目录获取输入 - streamSource = BroadcastStateExample.getClass.getClassLoader.getResourceAsStream(path) - val lines: Iterator[String] = scala.io.Source.fromInputStream(streamSource).getLines - var isFirstLine: Boolean = true - var timeDiff: Long = 0 - var lastEventTs: Long = 0 - while (isRunning && lines.hasNext) { - val line = lines.next() - val itemStrArr = line.split(",") - val eventTs: Long = itemStrArr(4).toLong - if (isFirstLine) { - // 从第一行数据提取时间戳 - lastEventTs = eventTs - isFirstLine = false - } - val userBehavior = UserBehavior(itemStrArr(0).toLong, itemStrArr(1).toLong, itemStrArr(2).toInt, itemStrArr(3), eventTs) - // 输入文件中的时间戳是从小到大排列的 - // 新读入的行如果比上一行大,sleep,这样来模拟一个有时间间隔的输入流 - timeDiff = eventTs - lastEventTs - if (timeDiff > 0) - Thread.sleep(timeDiff * 1000) - sourceContext.collect(userBehavior) - lastEventTs = eventTs - } - } - - override def cancel(): Unit = { - streamSource.close() - isRunning = false - } - } - } diff --git a/src/main/scala/com/flink/tutorials/scala/api/state/MapStateExample.scala b/src/main/scala/com/flink/tutorials/scala/api/state/MapStateExample.scala index c2256b1..d40c527 100644 --- a/src/main/scala/com/flink/tutorials/scala/api/state/MapStateExample.scala +++ b/src/main/scala/com/flink/tutorials/scala/api/state/MapStateExample.scala @@ -1,13 +1,10 @@ package com.flink.tutorials.scala.api.state -import java.io.InputStream - +import com.flink.tutorials.scala.utils.taobao.{UserBehavior, UserBehaviorSource} import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.functions.source.RichSourceFunction -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector @@ -22,36 +19,25 @@ object MapStateExample { // 获取数据源 val sourceStream: DataStream[UserBehavior] = env - .addSource(new UserBehaviorSource("state/UserBehavior-20171201.csv")).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[UserBehavior]() { + .addSource(new UserBehaviorSource("taobao/UserBehavior-20171201.csv")) + .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[UserBehavior]() { override def extractAscendingTimestamp(userBehavior: UserBehavior): Long = { // 原始数据单位为秒,乘以1000转换成毫秒 userBehavior.timestamp * 1000 } - } - ) + }) // 生成一个KeyedStream val keyedStream = sourceStream.keyBy(user => user.userId) // 在KeyedStream上进行flatMap - val behaviorCountStream = keyedStream.flatMap(new MapStateFunction) + val behaviorCountStream: DataStream[(Long, String, Int)] = keyedStream.flatMap(new MapStateFunction) behaviorCountStream.print() - env.execute("state example") + env.execute("taobao map state example") } - /** - * 用户行为 - * categoryId为商品类目ID - * behavior包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav) - * */ - case class UserBehavior(userId: Long, - itemId: Long, - categoryId: Int, - behavior: String, - timestamp: Long) - class MapStateFunction extends RichFlatMapFunction[UserBehavior, (Long, String, Int)] { // 指向MapState的句柄 @@ -76,29 +62,4 @@ object MapStateExample { collector.collect((input.userId, input.behavior, behaviorCnt)) } } - - class UserBehaviorSource(path: String) extends RichSourceFunction[UserBehavior] { - - var isRunning: Boolean = true - // 输入源 - var streamSource: InputStream = _ - - override def run(sourceContext: SourceContext[UserBehavior]): Unit = { - // 从项目的resources目录获取输入 - streamSource = MapStateExample.getClass.getClassLoader.getResourceAsStream(path) - val lines: Iterator[String] = scala.io.Source.fromInputStream(streamSource).getLines - while (isRunning && lines.hasNext) { - val line = lines.next() - val itemStrArr = line.split(",") - val userBehavior = UserBehavior(itemStrArr(0).toLong, itemStrArr(1).toLong, itemStrArr(2).toInt, itemStrArr(3), itemStrArr(4).toLong) - sourceContext.collect(userBehavior) - } - } - - override def cancel(): Unit = { - streamSource.close() - isRunning = false - } - } - } diff --git a/src/main/scala/com/flink/tutorials/scala/utils/taobao/BehaviorPattern.scala b/src/main/scala/com/flink/tutorials/scala/utils/taobao/BehaviorPattern.scala new file mode 100644 index 0000000..99cb320 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/utils/taobao/BehaviorPattern.scala @@ -0,0 +1,7 @@ +package com.flink.tutorials.scala.utils.taobao + +/** + * 行为模式 + * 整个模式简化为两个行为 + * */ +case class BehaviorPattern(firstBehavior: String, secondBehavior: String) diff --git a/src/main/scala/com/flink/tutorials/scala/utils/taobao/UserBehavior.scala b/src/main/scala/com/flink/tutorials/scala/utils/taobao/UserBehavior.scala new file mode 100644 index 0000000..ea0a551 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/utils/taobao/UserBehavior.scala @@ -0,0 +1,12 @@ +package com.flink.tutorials.scala.utils.taobao + +/** + * 用户行为 + * categoryId为商品类目ID + * behavior包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav) + * */ +case class UserBehavior(userId: Long, + itemId: Long, + categoryId: Int, + behavior: String, + timestamp: Long) diff --git a/src/main/scala/com/flink/tutorials/scala/utils/taobao/UserBehaviorSource.scala b/src/main/scala/com/flink/tutorials/scala/utils/taobao/UserBehaviorSource.scala new file mode 100644 index 0000000..d77ec32 --- /dev/null +++ b/src/main/scala/com/flink/tutorials/scala/utils/taobao/UserBehaviorSource.scala @@ -0,0 +1,45 @@ +package com.flink.tutorials.scala.utils.taobao + +import java.io.InputStream + +import org.apache.flink.streaming.api.functions.source.RichSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext + +class UserBehaviorSource(path: String) extends RichSourceFunction[UserBehavior] { + + var isRunning: Boolean = true + // 输入源 + var streamSource: InputStream = _ + + override def run(sourceContext: SourceContext[UserBehavior]): Unit = { + // 从项目的resources目录获取输入 + streamSource = this.getClass.getClassLoader.getResourceAsStream(path) + val lines: Iterator[String] = scala.io.Source.fromInputStream(streamSource).getLines + var isFirstLine: Boolean = true + var timeDiff: Long = 0 + var lastEventTs: Long = 0 + while (isRunning && lines.hasNext) { + val line = lines.next() + val itemStrArr = line.split(",") + val eventTs: Long = itemStrArr(4).toLong + if (isFirstLine) { + // 从第一行数据提取时间戳 + lastEventTs = eventTs + isFirstLine = false + } + val userBehavior = UserBehavior(itemStrArr(0).toLong, itemStrArr(1).toLong, itemStrArr(2).toInt, itemStrArr(3), eventTs) + // 输入文件中的时间戳是从小到大排列的 + // 新读入的行如果比上一行大,sleep,这样来模拟一个有时间间隔的输入流 + timeDiff = eventTs - lastEventTs + if (timeDiff > 0) + Thread.sleep(timeDiff * 1000) + sourceContext.collect(userBehavior) + lastEventTs = eventTs + } + } + + override def cancel(): Unit = { + streamSource.close() + isRunning = false + } +}