Skip to content

Commit

Permalink
broadcast state
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Feb 19, 2020
1 parent e51fbad commit 36ba402
Show file tree
Hide file tree
Showing 50 changed files with 1,276 additions and 326 deletions.
28 changes: 7 additions & 21 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ under the License.
<packaging>jar</packaging>

<name>Flink Tutorials</name>
<url>http://www.myorganization.org</url>

<repositories>
<repository>
Expand All @@ -45,7 +44,7 @@ under the License.

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.1</flink.version>
<flink.version>1.10.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
</properties>
Expand All @@ -61,18 +60,18 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

Expand All @@ -86,14 +85,12 @@ under the License.

<!-- Add connector dependencies here. They must be in the default scope (compile). -->

<!-- Example:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
-->

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
Expand Down Expand Up @@ -127,11 +124,6 @@ under the License.
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.apache.flink:flink-connector-wikiedits_2.11</include>
</includes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Expand All @@ -146,7 +138,7 @@ under the License.
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.flink.tutorials.demos.stock.StockPriceDemo</mainClass>
<!-- <mainClass>com.flink.tutorials.java.api.projects.wordcount.WordCountKafkaInStdOut</mainClass>-->
</transformer>
</transformers>
</configuration>
Expand Down Expand Up @@ -276,12 +268,6 @@ under the License.
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/com/flink/tutorials/java/api/basic/Overloading.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.flink.tutorials.java.api.basic;

public class Overloading {

// 无参数 返回值为int
public int test(){
System.out.println("test");
return 1;
}

// 有一个参数
public void test(int a){
System.out.println("test " + a);
}

// 有两个参数和一个返回值
public String test(int a, String s){
System.out.println("test " + a + " " + s);
return a + " " + s;
}

public static void main(String[] args) {
Overloading o = new Overloading();
System.out.println(o.test());
o.test(1);
System.out.println(o.test(1,"test3"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.flink.tutorials.java.api.basic.generics;

public class MyArrayList<T> {

private int size;
T[] elements;

public MyArrayList(int capacity) {
this.size = capacity;
this.elements = (T[]) new Object[capacity];
}

public void set(T element, int position) {
elements[position] = element;
}

@Override
public String toString() {
String result = "";
for (int i = 0; i < size; i++) {
result += elements[i].toString();
}
return result;
}

public <E> void printInfo(E element) {
System.out.println(element.toString());
}

public static void main(String[] args){

MyArrayList<String> strList = new MyArrayList<String>(2);
strList.set("first", 0);
strList.set("second", 1);

MyArrayList<Integer> intList = new MyArrayList<Integer>(2);
intList.set(11, 0);
intList.set(22, 1);

System.out.println(strList.toString());
System.out.println(intList.toString());

intList.printInfo("function");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.flink.tutorials.java.api.basic.generics;

import java.util.ArrayList;

public class TypeErasure {
public static void main(String[] args) {
Class<?> strListClass = new ArrayList<String>().getClass();
Class<?> intListClass = new ArrayList<Integer>().getClass();
// 输出:class java.util.ArrayList
System.out.println(strListClass);
// 输出:class java.util.ArrayList
System.out.println(intListClass);
// 输出:true
System.out.println(strListClass.equals(intListClass));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.flink.tutorials.java.api.basic.lambda;

@FunctionalInterface
interface AddInterface<T> {
T add(T a, T b);
}

public class FunctionalInterfaceExample {

public static void main( String[] args ) {

AddInterface<Integer> addInt = (Integer a, Integer b) -> a + b;
AddInterface<Double> addDouble = (Double a, Double b) -> a + b;

int intResult;
double doubleResult;

intResult = addInt.add(1, 2);
System.out.println("Lambda int add = " + intResult);

doubleResult = addDouble.add(1.1d, 2.2d);
System.out.println("Lambda double add = " + doubleResult);

doubleResult = new MyAdd().add(1.1d, 2.2d);
System.out.println("Class implementation add = " + doubleResult);

doubleResult = new AddInterface<Double>(){
@Override
public Double add(Double a, Double b) {
return a + b;
}
}.add(1d, 2d);

System.out.println("Anonymous function add = " + doubleResult);
}

public static class MyAdd implements AddInterface<Double> {
@Override
public Double add(Double a, Double b) {
return a + b;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.flink.tutorials.java.api.basic.lambda;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class JavaStreamExample {

public static void main( String[] args ) {
List<String> strings = Arrays.asList(
"abc", "", "bc", "12345",
"efg", "abcd","", "jkl");

List<Integer> lengths = strings
.stream()
.filter(string -> !string.isEmpty())
.map(s -> s.length())
.collect(Collectors.toList());

lengths.forEach((s) -> System.out.println(s));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.flink.tutorials.java.api.projects.stock;

import com.flink.tutorials.java.api.utils.stock.StockPrice;
import com.flink.tutorials.java.api.utils.stock.StockSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class StockPriceDemo {

public static void main(String[] args) throws Exception {

// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<StockPrice> stream = env.addSource(new StockSource("stock/stock-test.csv"));
stream.print();

env.execute("stock price");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.flink.tutorials.java.api.projects.wordcount;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.nio.charset.StandardCharsets;
import java.util.Properties;

public class WordCountKafkaInKafkaOut {

public static void main(String[] args) throws Exception {

// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Kafka参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-group");
String inputTopic = "Shakespeare";
String outputTopic = "WordCount";

// Source
FlinkKafkaConsumer<String> consumer =
new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);

// Transformations
// 使用Flink算子对输入流的文本进行操作
// 按空格切词、计数、分区、设置时间窗口、聚合
DataStream<Tuple2<String, Integer>> wordCount = stream
.flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
String[] tokens = line.split("\\s");
// 输出结果 (word, 1)
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<>(token, 1));
}
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);

// Sink
FlinkKafkaProducer<Tuple2<String, Integer>> producer = new FlinkKafkaProducer<Tuple2<String, Integer>> (
outputTopic,
new WordCountKafkaInKafkaOut.KafkaWordCountSerializationSchema(outputTopic),
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
wordCount.addSink(producer);

// execute
env.execute("kafka streaming word count");

}

public static class KafkaWordCountSerializationSchema implements KafkaSerializationSchema<Tuple2<String, Integer>> {

private String topic;

public KafkaWordCountSerializationSchema(String topic) {
super();
this.topic = topic;
}

@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, Integer> element, Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, (element.f0 + ": " + element.f1).getBytes(StandardCharsets.UTF_8));
}
}
}
Loading

0 comments on commit 36ba402

Please sign in to comment.