Skip to content

Commit

Permalink
map filter flatMap
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Dec 17, 2019
1 parent ebd1616 commit eef0975
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.flink.tutorials.api.transformations

import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.streaming.api.scala._


object FilterExample {

def main(args: Array[String]): Unit = {
// 创建 Flink 执行环境
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)

// 使用 => 构造Lambda表达式
val lambda = dataStream.filter ( input => input > 0 )

// 使用 _ 构造Lambda表达式
val lambda2 = dataStream.map { _ > 0 }

// 继承RichFilterFunction
// limit参数可以从外部传入
class MyFilterFunction(limit: Int) extends RichFilterFunction[Int] {

override def filter(input: Int): Boolean = {
if (input > limit) {
true
} else {
false
}
}

}

val richFunctionDataStream = dataStream.filter(new MyFilterFunction(2))
richFunctionDataStream.print()

senv.execute("basic filter transformation")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.flink.tutorials.api.transformations

import org.apache.flink.streaming.api.scala._

object FlatMapExample {

def main(args: Array[String]): Unit = {
// 创建 Flink 执行环境
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val dataStream: DataStream[String] = senv.fromElements("Hello World", "Hello this is Flink")

// split函数的输入为 "Hello World" 输出为 "Hello" 和 "World" 组成的列表 ["Hello", "World"]
// flatMap将列表中每个元素提取出来
// 最后输出为 ["Hello", "World", "Hello", "this", "is", "Flink"]
val words = dataStream.flatMap ( input => input.split(" ") )

val words2 = dataStream.map { _.split(" ") }

// 只对字符串数量大于15的句子进行处理
val longSentenceWords = dataStream.flatMap {
input => {
if (input.size > 15) {
input.split(" ")
} else {
Seq.empty
}
}
}

senv.execute("basic flatMap transformation")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.flink.tutorials.api.transformations

import org.apache.flink.streaming.api.scala._

object KeyByExample {

def main(args: Array[String]): Unit = {
// 创建 Flink 执行环境
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val dataStream: DataStream[(Int, Double)] = senv.fromElements((1, 1.0), (2, 3.2), (1, 5.5), (3, 10.0), (3, 12.5))

val keyedStream = dataStream.keyBy(0).sum(1)

keyedStream.print()

senv.execute("basic keyBy transformation")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,36 @@ import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala._

object MapExample {

def main(args: Array[String]): Unit = {
// 创建 Flink 执行环境
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)

// Lambda函数 =>
val lambda = dataStream.map ( input => input * 2 ).print()
// 使用 => 构造Lambda表达式
val lambda = dataStream.map ( input => ("lambda Input : " + input.toString + ", Output : " + (input * 2).toString) )

// Lambda函数 _
val lambda2 = dataStream.map { _ * 2}.print()
// 使用 _ 构造Lambda表达式
val lambda2 = dataStream.map { _ * 2 }

// 继承RichMapFunction
// 第一个参数是输入,第二个参数是输出
class DoubleMapFunction extends RichMapFunction[Int, Int] {
def map(in: Int):Int = { in * 2 }
};

// 匿名函数
val anonymousFunction = dataStream.map {new RichMapFunction[Int, Int] {
def map(input: Int): Int = {
input * 2
// 第一个泛型是输入类型,第二个泛型是输出类型
class DoubleMapFunction extends RichMapFunction[Int, String] {
override def map(input: Int): String =
("overide map Input : " + input.toString + ", Output : " + (input * 2).toString)
}

val richFunctionDataStream = dataStream.map {new DoubleMapFunction()}

// 匿名类
val anonymousDataStream = dataStream.map {new RichMapFunction[Int, String] {
override def map(input: Int): String = {
("overide map Input : " + input.toString + ", Output : " + (input * 2).toString)
}
}}.print()

val richFunction = dataStream.map {new DoubleMapFunction()}.print()
}}

senv.execute("Basic Map Transformation")
senv.execute("basic map transformation")
}

}

0 comments on commit eef0975

Please sign in to comment.