Skip to content

Commit

Permalink
state refactor java scala
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Feb 21, 2020
1 parent 2a72c37 commit a72580f
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 110 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.flink.tutorials.java.api.state;

import com.flink.tutorials.java.utils.taobao.BehaviorPattern;
import com.flink.tutorials.java.utils.taobao.UserBehavior;
import com.flink.tutorials.java.utils.taobao.UserBehaviorSource;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.util.Collector;

public class BroadcastStateExample {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// 主数据流
DataStream<UserBehavior> userBehaviorStream = env.addSource(new UserBehaviorSource("taobao/UserBehavior-20171201.csv"))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {
// 原始数据单位为秒,乘以1000转换成毫秒
return userBehavior.timestamp * 1000;
}
});

// BehaviorPattern数据流
DataStream<BehaviorPattern> patternStream = env.fromElements(new BehaviorPattern("pv", "buy"));

// Broadcast State只能使用 Key->Value 结构,基于MapStateDescriptor
MapStateDescriptor<Void, BehaviorPattern> broadcastStateDescriptor = new MapStateDescriptor<>("behaviorPattern", Types.VOID, Types.POJO(BehaviorPattern.class));
BroadcastStream<BehaviorPattern> broadcastStream = patternStream.broadcast(broadcastStateDescriptor);

// 生成一个KeyedStream
KeyedStream<UserBehavior, Long> keyedStream = userBehaviorStream.keyBy(user -> user.userId);

// 在KeyedStream上进行connect和process
DataStream<Tuple2<Long, BehaviorPattern>> matchedStream = keyedStream
.connect(broadcastStream)
.process(new BroadcastPatternFunction());

matchedStream.print();

env.execute("broadcast taobao example");
}

/**
* 四个泛型分别为:
* 1. KeyedStream中Key的数据类型
* 2. 主数据流的数据类型
* 3. 广播流的数据类型
* 4. 输出类型
* */
public static class BroadcastPatternFunction
extends KeyedBroadcastProcessFunction<Long, UserBehavior, BehaviorPattern, Tuple2<Long, BehaviorPattern>> {

// 用户上次行为状态句柄,每个用户存储一个状态
private ValueState<String> lastBehaviorState;
// Broadcast State Descriptor
private MapStateDescriptor<Void, BehaviorPattern> bcPatternDesc;

@Override
public void open(Configuration configuration) {
lastBehaviorState = getRuntimeContext().getState(
new ValueStateDescriptor<String>("lastBehaviorState", Types.STRING));
bcPatternDesc = new MapStateDescriptor<Void, BehaviorPattern>("behaviorPattern", Types.VOID, Types.POJO(BehaviorPattern.class));
}

@Override
public void processBroadcastElement(BehaviorPattern pattern,
Context context,
Collector<Tuple2<Long, BehaviorPattern>> collector) throws Exception {
BroadcastState<Void, BehaviorPattern> bcPatternState = context.getBroadcastState(bcPatternDesc);
// 将新数据更新至Broadcast State,这里使用一个null作为Key
// 在本场景中所有数据都共享一个Pattern,因此这里伪造了一个Key
bcPatternState.put(null, pattern);
}

@Override
public void processElement(UserBehavior userBehavior,
ReadOnlyContext context,
Collector<Tuple2<Long, BehaviorPattern>> collector) throws Exception {

// 获取最新的Broadcast State
BehaviorPattern pattern = context.getBroadcastState(bcPatternDesc).get(null);
String lastBehavior = lastBehaviorState.value();
if (pattern != null && lastBehavior != null) {
// 用户之前有过行为,检查是否符合给定的模式
if (pattern.firstBehavior.equals(lastBehavior) &&
pattern.secondBehavior.equals(userBehavior.behavior)) {
// 当前用户行为符合模式
collector.collect(Tuple2.of(userBehavior.userId, pattern));
}
}
lastBehaviorState.update(userBehavior.behavior);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.flink.tutorials.java.api.state;

import com.flink.tutorials.java.utils.taobao.UserBehavior;
import com.flink.tutorials.java.utils.taobao.UserBehaviorSource;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.util.Collector;

public class MapStateExample {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<UserBehavior> userBehaviorStream = env.addSource(new UserBehaviorSource("taobao/UserBehavior-20171201.csv"))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {
// 原始数据单位为秒,乘以1000转换成毫秒
return userBehavior.timestamp * 1000;
}
});

// 生成一个KeyedStream
KeyedStream<UserBehavior, Long> keyedStream = userBehaviorStream.keyBy(user -> user.userId);

// 在KeyedStream上进行flatMap
DataStream<Tuple3<Long, String, Integer>> behaviorCountStream= keyedStream.flatMap(new MapStateFunction());

behaviorCountStream.print();

env.execute("taobao map state example");
}

public static class MapStateFunction extends RichFlatMapFunction<UserBehavior, Tuple3<Long, String, Integer>> {

// 指向MapState的句柄
private MapState<String, Integer> behaviorMapState;

@Override
public void open(Configuration configuration) {
// 创建StateDescriptor
MapStateDescriptor<String, Integer> behaviorMapStateDescriptor = new MapStateDescriptor<String, Integer>("behaviorMap", Types.STRING, Types.INT);
// 通过StateDescriptor获取运行时上下文中的状态
behaviorMapState = getRuntimeContext().getMapState(behaviorMapStateDescriptor);
}

@Override
public void flatMap(UserBehavior input, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
int behaviorCnt = 1;
// behavior有可能为pv、cart、fav、buy等
// 判断状态中是否有该behavior
if (behaviorMapState.contains(input.behavior)) {
behaviorCnt = behaviorMapState.get(input.behavior) + 1;
}
// 更新状态
behaviorMapState.put(input.behavior, behaviorCnt);
out.collect(Tuple3.of(input.userId, input.behavior, behaviorCnt));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.flink.tutorials.java.utils.taobao;

/**
* 行为模式
* 整个模式简化为两个行为
* */
public class BehaviorPattern {

public String firstBehavior;
public String secondBehavior;

public BehaviorPattern() {}

public BehaviorPattern(String firstBehavior, String secondBehavior) {
this.firstBehavior = firstBehavior;
this.secondBehavior = secondBehavior;
}

@Override
public String toString() {
return "first: " + firstBehavior + ", second: " + secondBehavior;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.flink.tutorials.java.utils.taobao;

public class UserBehavior {
public long userId;
public long itemId;
public int categoryId;
public String behavior;
public long timestamp;

public UserBehavior() {}

public UserBehavior(long userId, long itemId, int categoryId, String behavior, long timestamp) {
this.userId = userId;
this.itemId = itemId;
this.categoryId = categoryId;
this.behavior = behavior;
this.timestamp = timestamp;
}

public static UserBehavior of(long userId, long itemId, int categoryId, String behavior, long timestamp) {
return new UserBehavior(userId, itemId, categoryId, behavior, timestamp);
}

@Override
public String toString() {
return "(" + userId + "," + itemId + "," + categoryId + "," +
behavior + "," + timestamp + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.flink.tutorials.java.utils.taobao;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;

public class UserBehaviorSource implements SourceFunction<UserBehavior> {

private boolean isRunning = true;
private String path;
private InputStream streamSource;

public UserBehaviorSource(String path) {
this.path = path;
}

@Override
public void run(SourceContext<UserBehavior> sourceContext) throws Exception {
// 从项目的resources目录获取输入
streamSource = this.getClass().getClassLoader().getResourceAsStream(path);
BufferedReader br = new BufferedReader(new InputStreamReader(streamSource));
String line;
boolean isFirstLine = true;
long timeDiff = 0;
long lastEventTs = 0;
while (isRunning && (line = br.readLine()) != null) {
String[] itemStrArr = line.split(",");
long eventTs = Long.parseLong(itemStrArr[4]);
if (isFirstLine) {
// 从第一行数据提取时间戳
lastEventTs = eventTs;
isFirstLine = false;
}
UserBehavior userBehavior = UserBehavior.of(Long.parseLong(itemStrArr[0]),
Long.parseLong(itemStrArr[1]), Integer.parseInt(itemStrArr[2]),
itemStrArr[3], eventTs);
// 输入文件中的时间戳是从小到大排列的
// 新读入的行如果比上一行大,sleep,这样来模拟一个有时间间隔的输入流
timeDiff = eventTs - lastEventTs;
if (timeDiff > 0)
Thread.sleep(timeDiff * 1000);
sourceContext.collect(userBehavior);
lastEventTs = eventTs;
}
}

// 停止发送数据
@Override
public void cancel() {
try {
streamSource.close();
} catch (Exception e) {
System.out.println(e.toString());
}
isRunning = false;
}

}
Loading

0 comments on commit a72580f

Please sign in to comment.