Skip to content

Commit

Permalink
time & type
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Jan 19, 2020
1 parent ca955e8 commit 76cdba2
Show file tree
Hide file tree
Showing 28 changed files with 1,028 additions and 46 deletions.
14 changes: 14 additions & 0 deletions src/main/java/com/flink/tutorials/java/api/types/StockPrice.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.flink.tutorials.java.api.types;

public class StockPrice {
public String symbol;
public Long timestamp;
public Double price;

public StockPrice() {}
public StockPrice(String symbol, Long timestamp, Double price){
this.symbol = symbol;
this.timestamp = timestamp;
this.price = price;
}
}
22 changes: 22 additions & 0 deletions src/main/java/com/flink/tutorials/java/api/types/StockPrice1.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.flink.tutorials.java.api.types;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// NOT POJO
public class StockPrice1 {

// LOGGER 无getter和setter
private Logger LOGGER = LoggerFactory.getLogger(StockPrice1.class);

public String symbol;
public Long timestamp;
public Double price;

public StockPrice1() {}
public StockPrice1(String symbol, Long timestamp, Double price){
this.symbol = symbol;
this.timestamp = timestamp;
this.price = price;
}
}
17 changes: 17 additions & 0 deletions src/main/java/com/flink/tutorials/java/api/types/StockPrice2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.flink.tutorials.java.api.types;

// NOT POJO
public class StockPrice2 {

public String symbol;
public Long timestamp;
public Double price;

// 缺少无参数构造函数

public StockPrice2(String symbol, Long timestamp, Double price){
this.symbol = symbol;
this.timestamp = timestamp;
this.price = price;
}
}
27 changes: 27 additions & 0 deletions src/main/java/com/flink/tutorials/java/api/types/TupleExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.flink.tutorials.java.api.types;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TupleExample {

// Java Tuple Example
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple3<String, Long, Double>> dataStream = senv.fromElements(
Tuple3.of("0001", 0L, 121.2),
Tuple3.of("0002" ,1L, 201.8),
Tuple3.of("0003", 2L, 10.3),
Tuple3.of("0004", 3L, 99.6)
);

dataStream.filter(item -> item.f2 > 100).print();

dataStream.filter(item -> ((Double)item.getField(2) > 100)).print();

senv.execute("java tuple");
}
}
16 changes: 16 additions & 0 deletions src/main/java/com/flink/tutorials/java/api/types/TypeCheck.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.flink.tutorials.java.api.types;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;

public class TypeCheck {

public static void main(String[] args) {

System.out.println(TypeInformation.of(StockPrice.class).createSerializer(new ExecutionConfig()));

System.out.println(TypeInformation.of(StockPrice1.class).createSerializer(new ExecutionConfig()));

System.out.println(TypeInformation.of(StockPrice2.class).createSerializer(new ExecutionConfig()));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.flink.tutorials.scala.api.time

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object AggregateFunctionExample {

case class StockPrice(symbol: String, price: Double)

// IN: StockPrice
// ACC:(String, Double, Int) - (symbol, sum, count)
// OUT: (String, Double) - (symbol, average)
class AverageAggregate extends AggregateFunction[StockPrice, (String, Double, Int), (String, Double)] {

override def createAccumulator() = ("", 0, 0)

override def add(item: StockPrice, accumulator: (String, Double, Int)) =
(item.symbol, accumulator._2 + item.price, accumulator._3 + 1)

override def getResult(accumulator:(String, Double, Int)) = (accumulator._1 ,accumulator._2 / accumulator._3)

override def merge(a: (String, Double, Int), b: (String, Double, Int)) =
(a._1 ,a._2 + b._2, a._3 + b._3)
}

def main(args: Array[String]): Unit = {

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val socketSource = senv.socketTextStream("localhost", 9000)

val input: DataStream[StockPrice] = socketSource.flatMap {
(line: String, out: Collector[StockPrice]) => {
val array = line.split(" ")
if (array.size == 2) {
out.collect(StockPrice(array(0), array(1).toDouble))
}
}
}

val average = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.aggregate(new AverageAggregate)

average.print()

senv.execute("window aggregate function")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.flink.tutorials.scala.api.time

import java.lang

import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import collection.JavaConverters._

object CoGroupExample {

class MyCoGroupFunction extends CoGroupFunction[(String, Int), (String, Int), String] {

// 这里的类型是Java的Iterable,需要引用 collection.JavaConverters._ 并转成Scala
override def coGroup(input1: lang.Iterable[(String, Int)], input2: lang.Iterable[(String, Int)], out: Collector[String]): Unit = {
input1.asScala.foreach(element => out.collect("input1 :" + element.toString()))
input2.asScala.foreach(element => out.collect("input2 :" + element.toString()))
}

}

def main(args: Array[String]): Unit = {

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val socketSource1 = senv.socketTextStream("localhost", 9000)
val socketSource2 = senv.socketTextStream("localhost", 9001)

val input1: DataStream[(String, Int)] = socketSource1.flatMap {
(line: String, out: Collector[(String, Int)]) => {
val array = line.split(" ")
if (array.size == 2) {
out.collect((array(0), array(1).toInt))
}
}
}

val input2: DataStream[(String, Int)] = socketSource2.flatMap {
(line: String, out: Collector[(String, Int)]) => {
val array = line.split(" ")
if (array.size == 2) {
out.collect((array(0), array(1).toInt))
}
}
}


val coGroupResult = input1.coGroup(input2)
.where(i1 => i1._1)
.equalTo(i2 => i2._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.apply(new MyCoGroupFunction)

coGroupResult.print()

senv.execute("window cogroup function")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.flink.tutorials.scala.api.time

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object IncrementalProcessExample {

case class StockPrice(symbol: String, price: Double)

case class MaxMinPrice(symbol: String, max: Double, min: Double, windowEndTs: Long)

class WindowEndProcessFunction extends ProcessWindowFunction[(String, Double, Double), MaxMinPrice, String, TimeWindow] {

override def process(key: String,
context: Context,
elements: Iterable[(String, Double, Double)],
out: Collector[MaxMinPrice]): Unit = {
val maxMinItem = elements.head
val windowEndTs = context.window.getEnd
out.collect(MaxMinPrice(key, maxMinItem._2, maxMinItem._3, windowEndTs))
}

}

def main(args: Array[String]): Unit = {

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val socketSource = senv.socketTextStream("localhost", 9000)

val input: DataStream[StockPrice] = socketSource.flatMap {
(line: String, out: Collector[StockPrice]) => {
val array = line.split(" ")
if (array.size == 2) {
out.collect(StockPrice(array(0), array(1).toDouble))
}
}
}

// reduce的返回类型必须和输入类型相同
// 为此我们将StockPrice拆成一个三元组 (股票代号,最大值、最小值)
val maxMin = input
.map(s => (s.symbol, s.price, s.price))
.keyBy(s => s._1)
.timeWindow(Time.seconds(10))
.reduce(
((s1: (String, Double, Double), s2: (String, Double, Double)) => (s1._1, Math.max(s1._2, s2._2), Math.min(s1._3, s2._3))),
new WindowEndProcessFunction
)

maxMin.print()

senv.execute("combine reduce and process function")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.flink.tutorials.scala.api.time

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object IntervalJoinExample {

class MyProcessFunction extends ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String] {
override def processElement(input1: (String, Long, Int),
input2: (String, Long, Int),
context: ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String]#Context,
out: Collector[String]): Unit = {

out.collect("input 1: " + input1.toString() + ", input 2: " + input2.toString)

}
}

def main(args: Array[String]): Unit = {

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
senv.getConfig.setAutoWatermarkInterval(2000L)

val socketSource1 = senv.socketTextStream("localhost", 9000)
val socketSource2 = senv.socketTextStream("localhost", 9001)

// 数据流有三个字段:(key, 时间戳, 数值)
val input1: DataStream[(String, Long, Int)] = socketSource1.flatMap {
(line: String, out: Collector[(String, Long, Int)]) => {
val array = line.split(" ")
if (array.size == 3) {
out.collect((array(0), array(1).toLong, array(2).toInt))
}
}
}.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(1)) {
override def extractTimestamp(element: (String, Long, Int)): Long = {
element._2
}
})

val input2: DataStream[(String, Long, Int)] = socketSource2.flatMap {
(line: String, out: Collector[(String, Long, Int)]) => {
val array = line.split(" ")
if (array.size == 3) {
out.collect((array(0), array(1).toLong, array(2).toInt))
}
}
}.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(1)) {
override def extractTimestamp(element: (String, Long, Int)): Long = {
element._2
}
})

val intervalJoinResult = input1.keyBy(_._1)
.intervalJoin(input2.keyBy(_._1))
.between(Time.milliseconds(-5), Time.milliseconds(10))
.process(new MyProcessFunction)

intervalJoinResult.print()

senv.execute("interval join function")
}

}
Loading

0 comments on commit 76cdba2

Please sign in to comment.