Skip to content

Commit

Permalink
scala examples
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Aug 12, 2020
1 parent 5a5b1ad commit 5ec0aec
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, conf);
DataStream<Tuple2<String, Integer>> countStream = env.addSource(new SimpleSource());
System.out.println("parallelism: " + env.getParallelism());

countStream.print();
env.execute("source");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@ public static void main(String[] args) throws Exception {
TextInputFormat textInputFormat = new TextInputFormat(new org.apache.flink.core.fs.Path(filePath));

// 每隔100毫秒检测一遍
// DataStream<String> inputStream = env.readFile(textInputFormat, filePath,
// FileProcessingMode.PROCESS_CONTINUOUSLY, 100);

DataStream<String> readOnceStream = env.readFile(textInputFormat, filePath,
FileProcessingMode.PROCESS_ONCE, 0);
// DataStream<String> inputStream = env.readFile(
// textInputFormat,
// filePath,
// FileProcessingMode.PROCESS_CONTINUOUSLY,
// 100);

// 只读一次
DataStream<String> readOnceStream = env.readFile(
textInputFormat,
filePath,
FileProcessingMode.PROCESS_ONCE,
0);

StreamingFileSink<String> fileSink = StreamingFileSink
.forRowFormat(new Path(filePath + "output-test"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public String beginTransaction() throws Exception {
// 创建一个存储本次事务的文件
Files.createFile(preCommitFilePath);
transactionWriter = Files.newBufferedWriter(preCommitFilePath);
System.out.println("ransaction File: " + preCommitFilePath);
System.out.println("Transaction File: " + preCommitFilePath);

return fileName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static void main(String[] args) throws Exception {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

tEnv
// 使用connect函数连接外部系统
// 使用connect函数连接外部系统 connect将被废弃
.connect(
new Kafka()
.version("universal") // 必填,合法的参数有"0.8", "0.9", "0.10", "0.11"或"universal"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,18 @@ object AssignWatermark {
// 使用下面的方式,部分Intellij需要在设置中添加 -target:jvm-1.8
// Preferences -> Build, Execution, Deployment -> Compiler -> Scala Compiler -> Default / Maven工程
// Additional compiler options 行添加参数: -target:jvm-1.8
val periodWatermark: DataStream[(String, Long)] = input.assignTimestampsAndWatermarks(
WatermarkStrategy.forGenerator[(String, Long)](
new WatermarkGeneratorSupplier[(String, Long)] {
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Long)] =
new MyPeriodicGenerator
}
).withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
override def extractTimestamp(t: (String, Long), l: Long): Long = t._2
})
)
val periodWatermark: DataStream[(String, Long)] = input
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forGenerator[(String, Long)](
new WatermarkGeneratorSupplier[(String, Long)] {
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Long)] =
new MyPeriodicGenerator
}
).withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
override def extractTimestamp(t: (String, Long), l: Long): Long = t._2
})
)

periodWatermark.print()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.flink.tutorials.scala.api.chapter5

import java.time.Duration

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
Expand Down Expand Up @@ -27,11 +30,12 @@ object IntervalJoinExample {
}
}
}.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(1)) {
override def extractTimestamp(element: (String, Long, Int)): Long = {
element._2
}
})
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(new SerializableTimestampAssigner[(String, Long, Int)] {
override def extractTimestamp(t: (String, Long, Int), l: Long): Long = t._2
})
)

val input2: DataStream[(String, Long, Int)] = socketSource2.flatMap {
(line: String, out: Collector[(String, Long, Int)]) => {
Expand All @@ -41,11 +45,12 @@ object IntervalJoinExample {
}
}
}.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(1)) {
override def extractTimestamp(element: (String, Long, Int)): Long = {
element._2
}
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(new SerializableTimestampAssigner[(String, Long, Int)] {
override def extractTimestamp(t: (String, Long, Int), l: Long): Long = t._2
})
)

val intervalJoinResult: DataStream[String] = input1.keyBy(_._1)
.intervalJoin(input2.keyBy(_._1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,6 @@ object KeyedCoProcessFunctionExample {
})
)

// // 读入股票数据流
// val stockStream: DataStream[StockPrice] = env
// .addSource(new StockSource("stock/stock-tick-20200108.csv"))
// .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[StockPrice]() {
// override def extractAscendingTimestamp(stock: StockPrice): Long = {
// stock.ts
// }
// })

// 读入媒体评价数据流
val mediaStream: DataStream[Media] = env
.addSource(new MediaSource)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.flink.tutorials.scala.chapter7

import org.apache.flink.streaming.api.scala._
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.functions.source.SourceFunction

object SimpleSourceExample {

def main(args: Array[String]): Unit = {
val conf = new Configuration
// 访问 http://localhost:8082 可以看到Flink Web UI
conf.setInteger(RestOptions.PORT, 8082)
// 创建本地执行环境,并行度为2
val env = StreamExecutionEnvironment.createLocalEnvironment(2, conf)
val countStream = env.addSource(new SimpleSource)
System.out.println("parallelism: " + env.getParallelism)

countStream.print()
env.execute("source")
}

class SimpleSource extends SourceFunction[(String, Integer)] {
private var offset = 0
private var isRunning = true

override def run(ctx: SourceFunction.SourceContext[(String, Integer)]): Unit = {
while ( {
isRunning
}) {
Thread.sleep(500)
ctx.collect(new (String, Integer)("" + offset, offset))
offset += 1
if (offset == 1000)
isRunning = false
}
}

override def cancel(): Unit = {
isRunning = false
}
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.flink.tutorials.scala.api.chapter7

import com.flink.tutorials.java.chapter7.TextFileExample
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala._
Expand All @@ -12,15 +11,26 @@ object TextFileExample {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// 文件路径
val filePath = classOf[TextFileExample].getClassLoader.getResource("taobao/UserBehavior-20171201.csv").getPath
val filePath = getClass.getClassLoader.getResource("taobao/UserBehavior-20171201.csv").getPath

// 文件为纯文本格式
val textInputFormat = new TextInputFormat(new org.apache.flink.core.fs.Path(filePath))

// 每隔100毫秒检测一遍
val inputStream = env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100)

inputStream.print
// val inputStream = env.readFile(
// textInputFormat,
// filePath,
// FileProcessingMode.PROCESS_CONTINUOUSLY,
// 100)

// 只读一次
val readOnceStream = env.readFile(
textInputFormat,
filePath,
FileProcessingMode.PROCESS_ONCE,
0)

readOnceStream.print()
env.execute("read file from path")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.flink.tutorials.scala.chapter7

import java.io.BufferedWriter
import java.nio.file.{Files, Path, Paths}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.typeutils.base.{StringSerializer, VoidSerializer}
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.functions.sink.{SinkFunction, TwoPhaseCommitSinkFunction}

object TransactionWriteSinkExample {

def main(args: Array[String]): Unit = {
val conf = new Configuration
// 访问 http://localhost:8082 可以看到Flink Web UI
conf.setInteger(RestOptions.PORT, 8082)
// 创建本地执行环境,并行度为1
val env = StreamExecutionEnvironment.createLocalEnvironment(1, conf)
// 每隔5秒进行一次Checkpoint
env.getCheckpointConfig.setCheckpointInterval(5 * 1000)
val countStream = env.addSource(new CheckpointedSourceExample.CheckpointedSource)
// 每隔一定时间模拟一次失败
val result = countStream.map(new CheckpointedSourceExample.FailingMapper(20))
// 类Unix系统的临时文件夹在/tmp下
// Windows用户需要修改这个目录
val preCommitPath = "/tmp/flink-sink-precommit"
val commitedPath = "/tmp/flink-sink-commited"
if (!Files.exists(Paths.get(preCommitPath))) Files.createDirectory(Paths.get(preCommitPath))
if (!Files.exists(Paths.get(commitedPath))) Files.createDirectory(Paths.get(commitedPath))
// 使用Exactly-Once语义的Sink,执行本程序时可以查看相应的输出目录,查看数据
result.addSink(new TwoPhaseFileSink(preCommitPath, commitedPath))
// 数据打印到屏幕上,无Exactly-Once保障,有数据重发现象
result.print()
env.execute("two file sink")
}


class TwoPhaseFileSink(var preCommitPath: String, var commitedPath: String) extends TwoPhaseCommitSinkFunction[(String, Int), String, Void](StringSerializer.INSTANCE, VoidSerializer.INSTANCE) {
// 缓存
var transactionWriter: BufferedWriter = _

override def invoke(transaction: String, in: (String, Int), context: SinkFunction.Context[_]): Unit = {
transactionWriter.write(in._1 + " " + in._2 + "\n")
}

override def beginTransaction: String = {
val time = LocalDateTime.now.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
val subTaskIdx = getRuntimeContext.getIndexOfThisSubtask
val fileName = time + "-" + subTaskIdx
val preCommitFilePath = Paths.get(preCommitPath + "/" + fileName)
// 创建一个存储本次事务的文件
Files.createFile(preCommitFilePath)
transactionWriter = Files.newBufferedWriter(preCommitFilePath)
System.out.println("Transaction File: " + preCommitFilePath)
fileName
}

// 将当前数据由内存写入磁盘
override def preCommit(transaction: String): Unit = {
transactionWriter.flush()
transactionWriter.close()
}

override def commit(transaction: String): Unit = {
val preCommitFilePath = Paths.get(preCommitPath + "/" + transaction)
if (Files.exists(preCommitFilePath)) {
val commitedFilePath = Paths.get(commitedPath + "/" + transaction)
try
Files.move(preCommitFilePath, commitedFilePath)
catch {
case e: Exception =>
System.out.println(e)
}
}
}

override def abort(transaction: String): Unit = {
val preCommitFilePath = Paths.get(preCommitPath + "/" + transaction)
// 如果中途遇到中断,将文件删除
if (Files.exists(preCommitFilePath)) try
Files.delete(preCommitFilePath)
catch {
case e: Exception =>
System.out.println(e)
}
}
}
}

This file was deleted.

Loading

0 comments on commit 5ec0aec

Please sign in to comment.