diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 0000000..4b56e17 --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,42 @@ +name: Deploy + +on: + push: + branches: + - 'main' + +jobs: + build: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.11"] + + steps: + - uses: actions/checkout@v3 + - name: Set up Python envs + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + cd doc + python -m pip install --upgrade pip + python -m pip install -r requirements.txt + - name: Build website + run: | + cd doc + sphinx-build -b html ./ ./_build/html + - name: Install SSH Key + uses: shimataro/ssh-key-action@v2 + with: + key: ${{ secrets.SSH_PRIVATE_KEY }} + known_hosts: unnecessary + - name: Adding Known Hosts + run: ssh-keyscan -p ${{ secrets.REMOTE_PORT }} -H ${{ secrets.REMOTE_HOST }} >> ~/.ssh/known_hosts + + - name: Deploy with rsync + run: | + cd doc + rsync -avz _build/html/* ${{ secrets.REMOTE_USER }}@${{ secrets.REMOTE_HOST }}:/var/www/flink-book/ diff --git a/.gitignore b/.gitignore index cde467a..7526228 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,17 @@ dependency-reduced-pom.xml *iml .idea derby.log -.DS_Store \ No newline at end of file +.DS_Store + +**/.ipynb_checkpoints +**/__pycache__ +data/ +*.json +*.bkp +*.params +*.csv +*egg-info* +doc/_build/* +test*.md +.idea +env \ No newline at end of file diff --git a/.gitignore copy b/.gitignore copy new file mode 100644 index 0000000..f35cbdf --- /dev/null +++ b/.gitignore copy @@ -0,0 +1,13 @@ +**/.ipynb_checkpoints +**/__pycache__ +data/ +*.json +*.bkp +*.params +*.DS_Store +*.csv +*egg-info* +_build/* +test*.md +.idea +env \ No newline at end of file diff --git a/README copy.md b/README copy.md new file mode 100644 index 0000000..0f77b8d --- /dev/null +++ b/README copy.md @@ -0,0 +1,15 @@ +# Python 数据科学加速 + +开源的、面向下一代数据科学和人工智能应用的 Python 并行加速编程书籍。 + +## 内容介绍 + +Python 已经成为数据科学和人工智能引领性的编程语言,数据科学家常常使用 Python 完成一系列任务。本书主要针对 Python 无法高效并行所设计,重点以数据科学领域为应用场景。 + +## 参与贡献 + +如果想参与贡献,请参考下面的两个文件。 + +* [构建指南](./contribute/info.md) 页面详细介绍了本书是如何撰写的,包括如何克隆代码仓库,如何创建开发环境,如何部署到 GitHub Pages。 + +* [样式规范](./contribute/style.md) 页面详细介绍了文件命名方式,文字风格,代码规范,画图工具等。 \ No newline at end of file diff --git a/doc/_static/custom.css b/doc/_static/custom.css new file mode 100644 index 0000000..194630d --- /dev/null +++ b/doc/_static/custom.css @@ -0,0 +1,3 @@ +html[data-theme="light"] { + --sbt-color-announcement: rgb(125, 125, 125); +} \ No newline at end of file diff --git a/doc/_static/logo.ico b/doc/_static/logo.ico new file mode 100644 index 0000000..5169b6d Binary files /dev/null and b/doc/_static/logo.ico differ diff --git a/doc/_toc.yml b/doc/_toc.yml new file mode 100644 index 0000000..968c062 --- /dev/null +++ b/doc/_toc.yml @@ -0,0 +1,10 @@ +root: index +subtrees: +- numbered: 2 + entries: + - file: ch-system-design/index + entries: + - file: ch-system-design/dataflow + - file: ch-system-design/flink-core + - file: ch-system-design/task-resource + - file: ch-system-design/exercise-wordcount diff --git a/doc/build.sh b/doc/build.sh new file mode 100644 index 0000000..7882f65 --- /dev/null +++ b/doc/build.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +set -e + +sphinx-build -b html ./ ./_build/html \ No newline at end of file diff --git a/doc/ch-system-design/dataflow.md b/doc/ch-system-design/dataflow.md new file mode 100644 index 0000000..307609b --- /dev/null +++ b/doc/ch-system-design/dataflow.md @@ -0,0 +1,85 @@ +(flink-dataflow) +# Flink 数据流图 + +:::{note} + +本教程已出版为《Flink原理与实践》,感兴趣的读者请在各大电商平台购买! + + ![](https://img.shields.io/badge/JD-%E8%B4%AD%E4%B9%B0%E9%93%BE%E6%8E%A5-red) + +![](https://img.shields.io/badge/GitHub-%E9%85%8D%E5%A5%97%E6%BA%90%E7%A0%81-blue) + + +::: + +## WordCount程序和数据流图 + +上一章的案例中,我们尝试构建了一个文本数据流管道,这个Flink程序可以计算数据流中单词出现的频次。如果输入数据流是“Hello Flink Hello World“,这个程序将统计出“Hello”的频次为2,“Flink”和“World”的频次为1。在大数据领域,WordCount程序就像是一个编程语言的HelloWorld程序,它展示了一个大数据引擎的基本规范。麻雀虽小,五脏俱全,从这个样例中,我们可以一窥Flink设计和运行原理。 + +如下图所示,程序分为三大部分,第一部分读取数据源(Source),第二部分对数据做转换操作(Transformation),最后将转换结果输出到一个目的地(Sink)。 + +![Flink样例程序示意图](./img/code.png) + +代码中的方法被称为函数(Function),是Flink提供给程序员的接口,程序员需要调用并实现这些函数,对数据进行操作,进而完成特定的业务逻辑。通常一到多个函数会组成一个算子(Operator), 算子执行对数据的操作(Operation)。在WordCount的例子中,有三类算子:Source算子读取数据源中的数据,数据源可以是数据流、也可以存储在文件系统中的文件。Transformation算子对数据进行必要的计算处理。Sink算子将处理结果输出,数据一般被输出到数据库、文件系统或下一个数据流程序。 + +我们可以把算子理解为1 + 2 运算中的加号,加号(+)是这个算子的一个符号表示,它表示对数字1和数字2做加法运算。同样,在Flink或Spark这样的大数据引擎中,算子对数据进行某种操作,程序员可以根据自己的需求调用合适的算子,完成所需计算任务。Flink常用的算子有`map()`、`flatMap()`、`keyBy()`、`timeWindow()`等,它们分别对数据流执行不同类型的操作。 + +我们先对这个样例程序中各个算子做一个简单的介绍,关于这些算子的具体使用方式将在后续章节中详细说明。 + +* flatMap + +`flatMap()`对输入进行处理,生成零到多个输出。本例中它执行一个简单的分词过程,对一行字符串按照空格切分,生成一个(word, 1)的二元组。 + +* keyBy + +`keyBy()`根据某个Key对数据重新分组。本例中是将二元组(word, 1)中第一项作为Key进行分组,相同的单词会被分到同一组。 + +* timeWindow + +`timeWindow()`是时间窗口函数,用来界定对多长时间之内的数据做统计。 + +* sum + +`sum()`为求和函数。`sum(1)`表示对二元组中第二个元素求和,因为经过前面的keyBy,所有相同的单词都被分到了一组,因此,在这个分组内,将单词出现次数做加和,就得到出现的总次数。 + +在程序实际运行前,Flink会将用户编写的代码做一个简单处理,生成一个如下图所示的逻辑视图。下图展示了WordCount程序中,数据从不同算子间流动的情况。图中,圆圈代表算子,圆圈间的箭头代表数据流,数据流在Flink程序中经过不同算子的计算,最终生成结果。其中,`keyBy()`、`timeWindow()`和`sum()`共同组成了一个时间窗口上的聚合操作,被归结为一个算子。我们可以在Flink的Web UI中,点击一个作业,查看这个作业的逻辑视图。 + +![WordCont程序的逻辑视图](./img/logical-view.png) + +对于词频统计这个案例,逻辑上来讲无非是对数据流中的单词做提取,然后使用一个Key-Value结构对单词做词频计数,最后输出结果即可,这样的逻辑本可以用几行代码完成,改成使用算子形式,反而让新人看着一头雾水,为什么一定要用算子的形式来写程序呢?实际上,算子进化成当前这个形态,就像人类从石块计数,到手指计数,到算盘计数,再到计算机计数这样的进化过程一样,尽管更低级的方式可以完成一定的计算任务,但是随着计算规模的增长,古老的计数方式存在着低效的弊端,无法完成更高级别和更大规模的计算需求。试想,如果我们不使用大数据框架提供的算子,而是自己实现一套上述的计算逻辑,尽管我们可以快速完成当前的词频统计的任务,但是当面临一个新计算任务时,我们需要重新编写程序,完成一整套计算任务。我们自己编写代码的横向扩展性可能很低,当输入数据暴增时,我们需要做很大改动,以部署在更多机器上。 + +大数据引擎的算子对计算做了一些抽象,对于新人来说有一定学习成本,而一旦掌握这门技术,人们所能处理的数据规模将成倍增加。算子的出现,正是针对大数据场景下,人们需要一种统一的计算描述语言来对数据做计算而进化出的新计算形态。基于Flink的算子,我们可以定义一个数据流的逻辑视图,以此完成对大数据的计算。剩下那些数据交换、横向扩展、故障恢复等问题全交由大数据引擎来解决。 + +## 从逻辑视图到物理执行 + +在绝大多数的大数据处理场景下,一台机器节点无法处理所有数据,数据被切分到多台节点上。在大数据领域,当数据量大到超过单台机器处理能力时,需要将一份数据切分到多个分区(Partition)上,每个分区分布在一台虚拟机或物理机上。 + +前一小节已经提到,大数据引擎的算子提供了编程接口,我们可以使用算子构建数据流的逻辑视图。考虑到数据分布在多个节点的情况,逻辑视图只是一种抽象,需要将逻辑视图转化为物理执行图,才能在分布式环境下执行。 + +下图为WordCount程序的物理执行示意图,数据流分布在2个分区上。箭头部分表示数据流分区,圆圈部分表示算子在分区上的算子子任务(Operator Subtask)。从逻辑视图变为物理执行图后,FlatMap算子在每个分区都有一个算子子任务,以处理该分区上的数据:FlatMap[1/2]算子子任务处理第一个数据流分区上的数据,以此类推。 + +![WordCount程序物理执行示意图](./img/physical-execution.png) + +在分布式计算环境下,执行计算的单个节点(物理机或虚拟机)被称为实例,一个算子在并行执行时,算子子任务会分布到多个节点上,所以算子子任务又被称为算子实例(Instance)。即使输入数据增多,我们也可以通过部署更多的算子实例来进行横向扩展。从图 3‑3中可以看到,除去Sink外的算子都被分成了2个算子实例,他们的并行度(Parallelism)为2,Sink算子的并行度为1。并行度是可以被设置的,当设置某个算子的并行度为2时,也就意味着这个算子有2个算子子任务(或者说2个算子实例)并行执行。实际应用中一般根据输入数据量的大小,计算资源的多少等多方面的因素来设置并行度。 + +:::{tip} +在本例中,为了演示,我们把所有算子的并行度设置为了2:`env.setParallelism(2);`,把Sink的并行度设置成了1:`wordCount.print().setParallelism(1);`。如果不单独设置print的并行度的话,它的并行度也是2。 +::: + +算子子任务是Flink物理执行的基本单元,算子子任务之间是相互独立的,某个算子子任务有自己的线程,不同算子子任务可能分布在不同的机器节点上。后文在Flink的资源分配部分我们还会重点介绍算子子任务。 + +在本书后文的描述中,算子子任务、分区、实例都是指对算子的并行切分。 + +## 数据交换策略 + +下图中出现了数据流动的现象,即数据在不同的算子子任务上进行着数据交换。无论是Hadoop、Spark还是Flink,都会涉及到数据交换策略。常见的据交换策略有4种,如下图所示。 + +![Flink数据交换策略](./img/data-exchange.png) + +* 前向传播(Forward):前一个算子子任务将数据直接传递给后一个算子子任务,数据不存在跨分区的交换,也避免了因数据交换产生的各类开销,图 3‑3中Source和和FlatMap之间就是这样的情形。 + +* 按Key分组(Key-Based):数据以(Key, Value)形式存在,该策略将所有数据按照Key进行分组,相同Key的数据会被分到一组,发送到同一个分区上。WordCount程序中,keyBy将单词作为Key,把相同单词都发送到同一分区,以方便后续算子的聚合统计。 + +* 广播(Broadcast):将某份数据发送到所有分区上,这种策略涉及到了数据在全局的拷贝,因此非常消耗资源。 + +* 随机策略(Random):该策略将所有数据随机均匀地发送到多个分区上,以保证数据平均分配到不同分区上。该策略通常为了防止数据倾斜到某些分区,导致部分分区数据稀疏,另外一些分区数据拥堵。 \ No newline at end of file diff --git a/doc/ch-system-design/exercise-wordcount.md b/doc/ch-system-design/exercise-wordcount.md new file mode 100644 index 0000000..d597bae --- /dev/null +++ b/doc/ch-system-design/exercise-wordcount.md @@ -0,0 +1,64 @@ +--- +title: 练习:WordCount +order: 4 +head: + - - meta + - name: keywords + content: Flink, DataStream, 数据流图, dataflow, wordcount, 物理执行图 +description: "本节将主要介绍Flink的数据流图,包括逻辑视图和物理执行图。" +category: [Flink] +article: false +--- + +:::{note} + +本教程已出版为《Flink原理与实践》,感兴趣的读者请在各大电商平台购买! + + ![](https://img.shields.io/badge/JD-%E8%B4%AD%E4%B9%B0%E9%93%BE%E6%8E%A5-red) + +![](https://img.shields.io/badge/GitHub-%E9%85%8D%E5%A5%97%E6%BA%90%E7%A0%81-blue) + + +::: + +## 实验目的 + +熟悉Flink开发环境,尝试对Flink WordCount程序进行简单的修改。 + +## 实验内容 + +对Flink WordCount程序做一些修改。 + +## 实验要求 + +2.4章节展示了如何使用Flink的WordCount样例程序,如何调试和提交作业。在这基础上,你可以开始尝试: + +1. 修改WordCount程序:2.4章节中的样例程序将输入按照空格隔开,真实世界中的文本通常包含各类标点符号,请修改代码,使你的程序不仅可以切分空格,也可以切分包括逗号、句号、冒号在内的标点符号(非单词字符),并统计词频。提示:可以使用正则表达式来判断哪些为非单词字符。 + +2. 使用Flink命令行工具提交该作业,并在集群监控看板上查看作业运行状态,Task Slot的数量。 + + 你可以使用《哈姆雷特》中的经典台词作为输入数据: + +To be, or not to be: that is the question: + +Whether it’s nobler in the mind to suffer + +The slings and arrows of outrageous fortune, + +Or to take arms against a sea of troubles, + +:::{tip} +我们可以在IntelliJ Idea的本地环境上进行本实验,无需启动一个Flink集群,执行环境的配置如下所示,打开相应链接也可以查看Flink Web UI。如果输入依赖了Kafka,需要提前启动Kafka集群,并向对应Topic里填充数据。 +::: + +```java +Configuration conf = new Configuration(); +// 访问http://localhost:8082 可以看到Flink Web UI +conf.setInteger(RestOptions.PORT, 8082); +// 创建本地执行环境,并行度为2 +StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, conf); +``` + +## 实验报告 + +将思路和代码整理成实验报告。实验报告中包括你的代码,不同输入数据集时的输出内容,Flink集群看板的运行截屏。 \ No newline at end of file diff --git a/doc/ch-system-design/flink-core.md b/doc/ch-system-design/flink-core.md new file mode 100644 index 0000000..b752068 --- /dev/null +++ b/doc/ch-system-design/flink-core.md @@ -0,0 +1,118 @@ +(flink-core)= +# 架构与核心组件 + +:::{note} + +本教程已出版为《Flink原理与实践》,感兴趣的读者请在各大电商平台购买! + + ![](https://img.shields.io/badge/JD-%E8%B4%AD%E4%B9%B0%E9%93%BE%E6%8E%A5-red) + +![](https://img.shields.io/badge/GitHub-%E9%85%8D%E5%A5%97%E6%BA%90%E7%A0%81-blue) + + +::: + +为了支持分布式运行,Flink跟其他大数据引擎一样,采用了主从(Master-Worker)架构。Flink运行时主要包括两个组件: + +• Master是一个Flink作业的主进程。它起到了协调管理的作用。 + +• TaskManager,又被称为Worker或Slave,是执行计算任务的进程。它拥有CPU、内存等计算资源。Flink作业需要将计算任务分发到多个TaskManager上并行执行。 + +下面将从作业执行层面来分析Flink各个模块如何工作。 + +## Flink作业提交过程 + +Flink为适应不同的基础环境(Standalone集群、YARN、Kubernetes),在不断的迭代开发过程中已经逐渐形成了一个兼容性很强的架构。不同的基础环境对计算资源的管理方式略有不同,不过都大同小异,下图以Standalone集群为例,分析作业的分布式执行流程。Standalone模式指Flink独占该集群,集群上无其他任务。 + +![Standalone模式下,Flink作业提交流程](./img/standalone-arch.png) + +在一个作业提交前,Master和TaskManager等进程需要先被启动。我们可以在Flink主目录中执行脚本来启动这些进程:`bin/start-cluster.sh`。Master和TaskManager被启动后,TaskManager需要将自己注册给Master中的ResourceManager。这个初始化和资源注册过程发生在单个作业提交前,我们称之为第0步。 + +接下来我们根据上图,逐步分析一个Flink作业如何被提交: + +1. 用户编写应用程序代码,并通过Flink客户端(Client)提交作业。程序一般为Java或Scala语言,调用Flink API,构建逻辑视角数据流图。代码和相关配置文件被编译打包,被提交到Master的Dispatcher,形成一个应用作业(Application)。 + +2. Dispatcher接收到这个作业,启动JobManager,这个JobManager会负责本次作业的各项协调工作。 + +3. JobManager向ResourceManager申请本次作业所需资源。 + +4. 由于在第0步中TaskManager已经向ResourceManager中注册了资源,这时闲置的TaskManager会被反馈给JobManager。 + +5. JobManager将用户作业中的逻辑视图转化为图所示的并行化的物理执行图,将计算任务分发部署到多个TaskManager上。至此,一个Flink作业就开始执行了。 + +TaskManager在执行计算任务过程中可能会与其他TaskManager交换数据,会使用图中的一些数据交换策略。同时,TaskManager也会将一些任务状态信息会反馈给JobManager,这些信息包括任务启动、运行或终止的状态,快照的元数据等。 + +## Flink核心组件 + +有了这个作业提交流程,我们对各组件的功能应该有了更全面的认识,接下来我们再对涉及到的各个组件进行更为详细的介绍。 + +### Client + +用户一般使用客户端(Client)提交作业,比如Flink主目录下的`bin`目录中提供的命令行工具。Client会对用户提交的Flink程序进行预处理,并把作业提交到Flink集群上。Client提交作业时需要配置一些必要的参数,比如使用Standalone集群还是YARN集群等。整个作业被打成了Jar包,DataStream API被转换成了`JobGraph`,`JobGraph`是一种类似逻辑视图。 + +### Dispatcher + +Dispatcher可以接收多个作业,每接收一个作业,Dispatcher都会为这个作业分配一个JobManager。Dispatcher对外提供一个REST式的接口,以HTTP的形式来对外提供服务。 + +### JobManager + +JobManager是单个Flink作业的协调者,一个作业会有一个JobManager来负责。JobManager会将Client提交的JobGraph转化为ExceutionGraph,ExecutionGraph是类似并行的物理执行图。JobManager会向ResourceManager申请必要的资源,当获取足够的资源后,JobManager将ExecutionGraph以及具体的计算任务分发部署到多个TaskManager上。同时,JobManager还负责管理多个TaskManager,这包括:收集作业的状态信息,生成检查点,必要时进行故障恢复等问题。 +早期,Flink Master被命名为JobManager,负责绝大多数Master进程的工作。随着迭代和开发,出现了名为JobMaster的组件,JobMaster负责单个作业的执行。本书中,我们仍然使用JobManager的概念,表示负责单个作业的组件。一些Flink文档也可能使用JobMaster的概念,读者可以将JobMaster等同于JobManager看待。 + +### ResourceManager + +如前文所说,Flink现在可以部署在Standalone、YARN或Kubernetes等环境上,不同环境中对计算资源的管理模式略有不同,Flink使用一个名为ResourceManager的模块来统一处理资源分配上的问题。在Flink中,计算资源的基本单位是TaskManager上的任务槽位(Task Slot,简称槽位Slot)。ResourceManager的职责主要是从YARN等资源提供方获取计算资源,当JobManager有计算需求时,将空闲的Slot分配给JobManager。当计算任务结束时,ResourceManager还会重新收回这些Slot。 + +### TaskManager + +TaskManager是实际负责执行计算的节点。一般地,一个Flink作业是分布在多个TaskManager上执行的,单个TaskManager上提供一定量的Slot。一个TaskManager启动后,相关Slot信息会被注册到ResourceManager中。当某个Flink作业提交后,ResourceManager会将空闲的Slot提供给JobManager。JobManager获取到空闲Slot信息后会将具体的计算任务部署到该Slot之上,任务开始在这些Slot上执行。在执行过程,由于要进行数据交换,TaskManager还要和其他TaskManager进行必要的数据通信。 + +总之,TaskManager负责具体计算任务的执行,启动时它会将Slot资源向ResourceManager注册。 + +## Flink组件栈 + +了解Flink的主从架构、作业提交以及核心组件等知识后,我们再从更宏观的角度来对Flink的组件栈分层剖析。如下图所示,Flink的组件栈分为四层:部署层、运行时层、API层和上层工具。 + +![Flink组件栈](./img/flink-component.png) + +### 部署层 + +Flink支持多种部署方式,可以部署在单机(Local)、集群(Cluster),以及云(Cloud)上。 + +* Local模式 + +Local模式有两种不同的方式,一种是单节点(SingleNode),一种是单虚拟机(SingleJVM)。 + +Local-SingleJVM模式大多是开发和测试时使用的部署方式,该模式下JobManager和TaskManager都在同一个JVM里。 + +Local-SingleNode模式下,JobManager和TaskManager等所有角色都运行在一台机器上,虽然是按照分布式集群架构进行部署,但是集群的节点只有1个。该模式大多是在测试或者IoT设备上进行部署时使用。 + +* Cluster模式 + +Flink作业投入到生产环境下一般使用Cluster模式,可以是Standalone的独立集群,也可以是YARN或Kubernetes集群。 + +对于一个Standalone集群,我们需要在配置文件中配置好JobManager和TaskManager对应的机器,然后使用Flink主目录下的脚本启动一个Standalone集群。我们将在9.1.1详细介绍如何部署一个Flink Standalone集群。Standalone集群上只运行Flink作业。除了Flink,绝大多数企业的生产环境运行着包括MapReduce、Spark等各种各样的计算任务,一般都会使用YARN或Kubernetes等方式对计算资源进行管理和调度。Flink目前已经支持了YARN、Mesos以及Kubernetes,开发者提交作业的方式变得越来越简单。 + +* Cloud模式 + +Flink也可以部署在各大云平台上,包括Amazon、Google和阿里云。 + +### 运行时层 + +运行时(Runtime)层为Flink各类计算提供了实现。这一层读本章提到的分布式执行进行了支持。Flink Runtime层是Flink最底层也是最核心的组件。 + +### API层 + +API层主要实现了流处理DataStream API和批处理DataSet API。目前,DataStream API针对有界和无界数据流,DataSet API针对有界数据集。用户可以使用这两大API进行数据处理,包括转换(Transformation)、连接(Join)、聚合(Aggregation)、窗口(Window)以及状态(State)的计算。 + +### 上层工具 + +在DataStream和DataSet两大API之上,Flink还提供了更丰富的工具,包括: + +* 面向流处理的:复杂事件处理(Complex Event Process,CEP)。 + +* 面向批处理的:机器学习计算库(Machine Learning, ML)、图计算库(Graph Processing, Gelly)。 + +* 面向SQL用户的Table API和SQL。数据被转换成了关系型数据库式的表,每个表拥有一个表模式(Schema),用户可以像操作表那样操作流式数据,例如可以使用SELECT、JOIN、GROUP BY等操作。 + +* 针对Python用户推出的PyFlink,方便Python社区使用Flink。目前,PyFlink主要基于Java的Table API之上。 \ No newline at end of file diff --git a/doc/ch-system-design/img/code.png b/doc/ch-system-design/img/code.png new file mode 100644 index 0000000..4018d82 Binary files /dev/null and b/doc/ch-system-design/img/code.png differ diff --git a/doc/ch-system-design/img/data-exchange.png b/doc/ch-system-design/img/data-exchange.png new file mode 100644 index 0000000..455d178 Binary files /dev/null and b/doc/ch-system-design/img/data-exchange.png differ diff --git a/doc/ch-system-design/img/flink-component.png b/doc/ch-system-design/img/flink-component.png new file mode 100644 index 0000000..9534625 Binary files /dev/null and b/doc/ch-system-design/img/flink-component.png differ diff --git a/doc/ch-system-design/img/graph.png b/doc/ch-system-design/img/graph.png new file mode 100644 index 0000000..1829d4b Binary files /dev/null and b/doc/ch-system-design/img/graph.png differ diff --git a/doc/ch-system-design/img/logical-view.png b/doc/ch-system-design/img/logical-view.png new file mode 100644 index 0000000..750b6cc Binary files /dev/null and b/doc/ch-system-design/img/logical-view.png differ diff --git a/doc/ch-system-design/img/operator-chain.png b/doc/ch-system-design/img/operator-chain.png new file mode 100644 index 0000000..91e896a Binary files /dev/null and b/doc/ch-system-design/img/operator-chain.png differ diff --git a/doc/ch-system-design/img/physical-execution.png b/doc/ch-system-design/img/physical-execution.png new file mode 100644 index 0000000..082f2e6 Binary files /dev/null and b/doc/ch-system-design/img/physical-execution.png differ diff --git a/doc/ch-system-design/img/slot-parallelism.png b/doc/ch-system-design/img/slot-parallelism.png new file mode 100644 index 0000000..7259a9b Binary files /dev/null and b/doc/ch-system-design/img/slot-parallelism.png differ diff --git a/doc/ch-system-design/img/slot-sharing.png b/doc/ch-system-design/img/slot-sharing.png new file mode 100644 index 0000000..6d17afe Binary files /dev/null and b/doc/ch-system-design/img/slot-sharing.png differ diff --git a/doc/ch-system-design/img/standalone-arch.png b/doc/ch-system-design/img/standalone-arch.png new file mode 100644 index 0000000..766b380 Binary files /dev/null and b/doc/ch-system-design/img/standalone-arch.png differ diff --git a/doc/ch-system-design/img/task-slot.png b/doc/ch-system-design/img/task-slot.png new file mode 100644 index 0000000..fb4306f Binary files /dev/null and b/doc/ch-system-design/img/task-slot.png differ diff --git a/doc/ch-system-design/index.md b/doc/ch-system-design/index.md new file mode 100644 index 0000000..10725fd --- /dev/null +++ b/doc/ch-system-design/index.md @@ -0,0 +1,4 @@ +# Flink的设计与运行原理 + +```{tableofcontents} +``` \ No newline at end of file diff --git a/doc/ch-system-design/task-resource.md b/doc/ch-system-design/task-resource.md new file mode 100644 index 0000000..bc0636c --- /dev/null +++ b/doc/ch-system-design/task-resource.md @@ -0,0 +1,81 @@ +(task-resource)= +# 任务执行与资源划分 + +:::{note} + +本教程已出版为《Flink原理与实践》,感兴趣的读者请在各大电商平台购买! + + ![](https://img.shields.io/badge/JD-%E8%B4%AD%E4%B9%B0%E9%93%BE%E6%8E%A5-red) + +![](https://img.shields.io/badge/GitHub-%E9%85%8D%E5%A5%97%E6%BA%90%E7%A0%81-blue) + + +::: + +## 再谈逻辑视图到物理执行图 + +了解了Flink的分布式架构和核心组件,这里我们从更细粒度上来介绍从逻辑视图转化为物理执行图过程,该过程可以分成四层:`StreamGraph` -> `JobGraph` -> `ExecutionGraph` -> 物理执行图。我们根据下图来大致了解一些这些图的功能。 + +* `StreamGraph`:根据用户编写的代码生成的最初的图,用来表示一个Flink流处理作业的拓扑结构。在`StreamGraph`中,节点`StreamNode`就是算子。 + +* `JobGraph`:`JobGraph`是提交给 JobManager 的数据结构。`StreamGraph`经过优化后生成了`JobGraph`,主要的优化为,将多个符合条件的节点链接在一起作为一个`JobVertex`节点,这样可以减少数据交换所需要的传输开销。这个链接的过程叫做算子链(Operator Chain),我们会在下一小节继续介绍算子链。`JobVertex`经过算子链后,会包含一到多个算子,它的输出是`IntermediateDataSet`,这是经过算子处理产生的数据集。 + +* `ExecutionGraph`:JobManager将`JobGraph`转化为`ExecutionGraph`。`ExecutionGraph`是`JobGraph`的并行化版本:假如某个`JobVertex`的并行度是2,那么它将被划分为2个`ExecutionVertex`,`ExecutionVertex`表示一个算子子任务,它监控着单个子任务的执行情况。每个`ExecutionVertex`会输出一个`IntermediateResultPartition`,这是单个子任务的输出,再经过`ExecutionEdge`输出到下游节点。`ExecutionJobVertex`是这些并行子任务的合集,它监控着整个算子的运行情况。`ExecutionGraph`是调度层非常核心的数据结构。 + +* 物理执行图:JobManager根据`ExecutionGraph`对作业进行调度后,在各个TaskManager上部署具体的任务,物理执行图并不是一个具体的数据结构。 + +![数据流图的转化过程](./img/graph.png) + +可以看到,Flink在数据流图上可谓煞费苦心,仅各类图就有四种之多。对于新人来说,可以不用太关心这些非常细节的底层实现,只需要了解以下几个核心概念: + +* Flink采用主从架构,Master起着管理协调作用,TaskManager负责物理执行,在执行过程中会发生一些数据交换、生命周期管理等事情。 + +* 用户调用Flink API,构造逻辑视图,Flink会对逻辑视图优化,并转化为并行化的物理执行图,最后被执行的是物理执行图。 + +## 任务、算子子任务与算子链 + +在构造物理执行图的过程中,Flink会将一些算子子任务链接在一起,组成算子链。链接后以任务(Task)的形式被TaskManager调度执行。使用算子链是一个非常有效的优化,它可以有效降低算子子任务之间的传输开销。链接之后形成的Task是TaskManager中的一个线程。下图展示了任务、子任务和算子链之间的关系。 + +![任务、子任务与算子链](./img/operator-chain.png) + +例如,数据从Source前向传播到FlatMap,这中间没有发生跨分区的数据交换,因此,我们完全可以将Source、FlatMap这两个子任务组合在一起,形成一个Task。数据经过`keyBy()`发生了数据交换,数据会跨越分区,因此无法将`keyBy()`以及其后面的窗口聚合链接到一起。由于WindowAggregation的并行度是2,Sink的并行度为1,数据再次发生了交换,我们不能把WindowAggregation和Sink两部分链接到一起。本章第一节中提到,Sink的并行度被人为设置为1,如果我们把Sink的并行度也设置为2,那么是可以让这两个算子链接到一起的。 + +默认情况下,Flink会尽量将更多的子任务链接在一起,这样能减少一些不必要的数据传输开销。但一个子任务有超过一个输入或发生数据交换时,链接就无法建立。两个算子能够链接到一起是有一些规则的,感兴趣的读者可以阅读Flink源码中`org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator`中的`isChainable`方法。`StreamingJobGraphGenerator`类的作用是将`StreamGraph`转换为`JobGraph`。 + +尽管将算子链接到一起会降低一些传输开销,但是也有一些情况并不需要太多链接。比如,有时候我们需要将一个非常长的算子链拆开,这样我们就可以将原来集中在一个线程中的计算拆分到多个线程中来并行计算。Flink允许开发者手动配置是否启用算子链,或者对哪些算子使用算子链。我们也将在9.3.1讨论算子链的具体使用方法。 + +## 任务槽位与计算资源 + +### Task Slot + +根据前文的介绍,我们已经了解到TaskManager负责具体的任务执行。在程序执行之前,经过优化,部分子任务被链接在一起,组成一个Task。TaskManager是一个JVM进程,在TaskManager中可以并行运行一到多个Task。每个Task是一个线程,需要TaskManager为其分配相应的资源,TaskManager使用Task Slot给Task分配资源。 + +在解释Flink的Task Slot的概念前,我们先回顾一下进程与线程的概念。在操作系统层面,进程(Process)是进行资源分配和调度的一个独立单位,线程(Thread)是CPU调度的基本单位。比如,我们常用的Office Word软件,在启动后就占用操作系统的一个进程。Windows上可以使用任务管理器来查看当前活跃的进程,Linux上可以使用`top`命令来查看。线程是进程的一个子集,一个线程一般专注于处理一些特定任务,不独立拥有系统资源,只拥有一些运行中必要的资源,如程序计数器。一个进程至少有一个线程,也可以有多个线程。多线程场景下,每个线程都处理一小个任务,多个线程以高并发的方式同时处理多个小任务,可以提高处理能力。 + +回到Flink的槽位分配机制上,一个TaskManager是一个进程,TaskManager可以管理一至多个Task,每个Task是一个线程,占用一个Slot。每个Slot的资源是整个TaskManager资源的子集,比如下图里的TaskManager下有3个Slot,每个Slot占用TaskManager 1/3的内存,第一个Slot中的Task不会与第二个Slot中的Task互相争抢内存资源。 + +:::{note} +在分配资源时,Flink并没有将CPU资源明确分配给各个槽位。 +::: + +![Task Slot与Task Manager](./img/task-slot.png) + +假设我们给WordCount程序分配两个TaskManager,每个TaskManager又分配3个槽位,所以共有6个槽位。结合之前图中对这个作业的并行度设置,整个作业被划分为5个Task,使用5个线程,这5个线程可以按照上图所示的方式分配到6个槽位中。 + +Flink允许用户设置TaskManager中槽位的数目,这样用户就可以确定以怎样的粒度将任务做相互隔离。如果每个TaskManager只包含一个槽位,那么运行在该槽位内的任务将独享JVM。如果TaskManager包含多个槽位,那么多个槽位内的任务可以共享JVM资源,比如共享TCP连接、心跳信息、部分数据结构等。官方建议将槽位数目设置为TaskManager下可用的CPU核心数,那么平均下来,每个槽位都能平均获得1个CPU核心。 + +### 槽位共享 + +之前的图展示了任务的一种资源分配方式,默认情况下, Flink还提供了一种槽位共享(Slot Sharing)的优化机制,进一步优化数据传输开销,充分利用计算资源。将之前图中的任务做槽位共享优化后,结果如下图所示。 + +![槽位共享示意图](./img/slot-sharing.png) + +开启槽位共享后,Flink允许多个Task共享一个槽位。如上图中最左侧的数据流,一个作业从Source到Sink的所有子任务都可以放置在一个槽位中,这样数据交换成本更低。而且,对于一个数据流图来说,Source、FlatMap等算子的计算量相对不大,WindowAggregation算子的计算量比较大,计算量较大的算子子任务与计算量较小的算子子任务可以互补,腾出更多的槽位,分配给更多Task,这样可以更好地利用资源。如果不开启槽位共享,如之前图所示,计算量小的Source、FlatMap算子子任务独占槽位,造成一定的资源浪费。 + +![槽位共享后,增大并行度,可以部署更多算子实例](./img/slot-parallelism.png) + +最初图中的方式共占用5个槽位,支持槽位共享后,上上图只占用2个槽位。为了充分利用空槽位,剩余的4个空槽位可以分配给别的作业,也可以通过修改并行度来分配给这个作业。例如,这个作业的输入数据量非常大,我们可以把并行度设为6,更多的算子实例会将这些槽位填充,如上图所示。 + +综上,Flink的一个槽位中可能运行一个算子子任务、也可以是被链接的多个子任务组成的Task,或者是共享槽位的多个Task,具体这个槽位上运行哪些计算由算子链和槽位共享两个优化措施决定。我们将在9.3节再次讨论算子链和槽位共享这两个优化选项。 + +并行度和槽位数目的概念可能容易让人混淆,这里再次阐明一下。用户使用Flink提供的API算子可以构建一个逻辑视图,需要将任务并行才能被物理执行。一个算子将被切分为多个子任务,每个子任务处理整个作业输入数据的一部分。如果输入数据过大,增大并行度可以让算子切分为更多的子任务,加快数据处理速度。可见,并行度是Flink对任务并行切分的一种描述。槽位数目是在资源设置时,对单个TaskManager的资源切分粒度。关于并行度、槽位数目等配置,将在9.2.2中详细说明。 \ No newline at end of file diff --git a/doc/conf.py b/doc/conf.py new file mode 100644 index 0000000..e57ef95 --- /dev/null +++ b/doc/conf.py @@ -0,0 +1,65 @@ +author = 'Weizheng Lu' +bibtex_bibfiles = ['references.bib'] +bibtex_reference_style = 'author_year' +comments_config = {'hypothesis': False, 'utterances': False} +copyright = '2023-2024' +exclude_patterns = ['**.ipynb_checkpoints', '.DS_Store', 'Thumbs.db', '_build'] +extensions = ['sphinx_togglebutton', 'sphinx_copybutton', 'myst_nb', 'jupyter_book', 'sphinx_thebe', 'sphinx_comments', 'sphinx_external_toc', 'sphinx.ext.intersphinx', 'sphinx_design', 'sphinx_book_theme', 'sphinxcontrib.bibtex', 'sphinx_jupyterbook_latex'] +external_toc_exclude_missing = True +external_toc_path = '_toc.yml' +html_baseurl = '' +html_favicon = "_static/logo.ico" +html_logo = 'logo.svg' +html_sourcelink_suffix = '' +html_theme = 'sphinx_book_theme' +html_theme_options = { + 'search_bar_text': '搜索...', + 'launch_buttons': { + 'notebook_interface': 'classic', + 'binderhub_url': '', + 'jupyterhub_url': '', + 'thebe': False, + 'colab_url': 'https://colab.research.google.com' + }, + 'path_to_docs': 'docs', + 'repository_url': 'https://github.com/godaai/flink-book-zh', + 'repository_branch': 'main', + 'extra_footer': '', + 'home_page_in_toc': True, + 'icon_links': [ + { + "name": "GitHub", + "url": "https://github.com/godaai/flink-book-zh", + "icon": "https://img.shields.io/github/stars/godaai/flink-book-zh?style=for-the-badge", + "type": "url", + }, + ], + 'announcement': "如果你觉得内容对你有帮助,请在 GitHub 上点个 star 吧!", + 'analytics': {'google_analytics_id': ''}, + 'use_repository_button': True, + 'use_edit_page_button': False, + 'use_issues_button': False, + "toc_title": "本节目录", +} +html_static_path = ["_static"] +html_css_files = ["custom.css"] +html_js_files = [ + "https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.6/require.min.js", +] +html_title = 'Flink 原理与实践' +latex_engine = 'pdflatex' +myst_enable_extensions = ['colon_fence', 'dollarmath', 'linkify', 'substitution', 'tasklist'] +myst_url_schemes = ['mailto', 'http', 'https'] +nb_execution_allow_errors = False +nb_execution_cache_path = '' +nb_execution_excludepatterns = [] +nb_execution_in_temp = False +nb_execution_mode = 'off' +nb_execution_timeout = 30 +nb_output_stderr = 'show' +numfig = True +numfig_format = {'figure': '图 %s', 'table': '表 %s', 'code-block': '代码片段 %s', 'section': '章节 %s'} +pygments_style = 'sphinx' +suppress_warnings = ['myst.domains'] +use_jupyterbook_latex = True +use_multitoc_numbering = True diff --git a/doc/img/donate/alipay.png b/doc/img/donate/alipay.png new file mode 100644 index 0000000..9f98e7a Binary files /dev/null and b/doc/img/donate/alipay.png differ diff --git a/doc/img/donate/wechat.png b/doc/img/donate/wechat.png new file mode 100644 index 0000000..a0bda1b Binary files /dev/null and b/doc/img/donate/wechat.png differ diff --git a/doc/index.md b/doc/index.md new file mode 100644 index 0000000..0feb167 --- /dev/null +++ b/doc/index.md @@ -0,0 +1,75 @@ +# Flink 原理与实践 + +::::{grid} 2 +:reverse: + +:::{grid-item} +:columns: 3 +:class: sd-m-auto + + +::: + +:::{grid-item} +:columns: 9 +:class: sd-fs-3 + +开源的 Flink 流处理教程。 + +% The SVG rendering breaks latex builds for the GitHub badge, so only include in HTML +```{only} html +[![](https://img.shields.io/github/stars/godaai/flink-book-zh?style=for-the-badge)](https://github.com/godaai/flink-book-zh) +``` + +::: + +:::: + +## 主要作者 + +::::{grid} +:class-container: text-center +:gutter: 3 + +:::{grid-item-card} +:link-type: doc +:class-header: bg-light + +鲁蔚征 +^^^ + +现就职于中国人民大学,CCF高性能计算专业委员会执行委员,主要研究大数据与机器学习系统。 +::: + +:::: + +## 赞赏与支持 + +如果您觉得本书有价值,您可通过支付宝或微信赞赏。 + +::::{card-carousel} 2 + +:::{card} +:margin: 3 +:class-body: text-center +:class-header: bg-light text-center +**支付宝** +^^^ +```{image} ./img/donate/alipay.png +:height: 100 +``` +::: + +:::{card} +:margin: 3 +:class-body: text-center +:class-header: bg-light text-center + +**微信** +^^^ +```{image} ./img/donate/wechat.png +:height: 100 +``` +::: + +:::: \ No newline at end of file diff --git a/doc/logo.svg b/doc/logo.svg new file mode 100644 index 0000000..507361c --- /dev/null +++ b/doc/logo.svg @@ -0,0 +1,145 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/doc/ref.md b/doc/ref.md new file mode 100644 index 0000000..7ae1269 --- /dev/null +++ b/doc/ref.md @@ -0,0 +1,4 @@ +# 参考文献 + +```{bibliography} +``` \ No newline at end of file diff --git a/doc/references.bib b/doc/references.bib new file mode 100644 index 0000000..e69de29 diff --git a/doc/requirements.txt b/doc/requirements.txt new file mode 100644 index 0000000..f8bbe4a --- /dev/null +++ b/doc/requirements.txt @@ -0,0 +1,11 @@ +jupyter-book +sphinx_togglebutton +sphinx_copybutton +myst_nb +sphinx_comments +sphinx_external_toc +sphinx_design +sphinx_book_theme +sphinxcontrib-bibtex +sphinx-jupyterbook-latex +sphinxcontrib-jsmath \ No newline at end of file