diff --git a/doc/ch-table-sql/sql-join.md b/doc/ch-table-sql/sql-join.md
new file mode 100644
index 0000000..765557c
--- /dev/null
+++ b/doc/ch-table-sql/sql-join.md
@@ -0,0 +1,229 @@
+(sql-join)=
+# Join
+
+:::{note}
+
+本教程已出版为《Flink原理与实践》,感兴趣的读者请在各大电商平台购买!
+
+ ![](https://img.shields.io/badge/JD-%E8%B4%AD%E4%B9%B0%E9%93%BE%E6%8E%A5-red)
+
+
+:::
+
+Join是SQL中最常用的数据处理机制,它可以将两个数据源中的相关行相互连接起来。常用的Join方式有:`INNER JOIN`、`LEFT/RIGHT/FULL OUTER JOIN`。不同的Join决定了两个数据源连接方式的不同。在批处理上,对静态的数据上进行Join已经比较成熟,常用的算法有:嵌套循环(Nested Join)、排序合并(Sort Merge)、哈希合并(Hash Merge)等。这里以嵌套循环为例解释一下Join的实现原理。
+
+假设我们有这样一个批处理查询:
+
+```sql
+SELECT
+ orders.order_id,
+ customers.customer_name,
+ orders.order_date
+FROM orders INNER JOIN customers
+ON orders.customer_id = customers.customer_id;
+```
+
+这个语句在两个确定的数据集上进行计算,它翻译成伪代码:
+
+```
+// 循环遍历orders的每个元素
+for row_order in orders:
+ // 循环遍历customers的每个元素
+ for row_customer in customers:
+ if row_order.customer_id = row_customer.customer_id
+ return (row_order.order_id, row_customer.customer_mame, row_order.order_date)
+ end
+end
+```
+
+嵌套循环的基本原理是使用两层循环,遍历表中的每个元素,当两个表中的数据相匹配时,返回结果。我们知道,一旦数据量增大,嵌套循环算法会产生非常大的计算压力。之前多次提到,流处理场景下数据是不断生成的,一旦数据源有更新,相应的Dynamic Table也要随之更新,进而重新进行一次上述的循环算法,这对流处理来说是一个不小的挑战。
+
+目前,Flink提供了三种基于Dynamic Table的Join:时间窗口Join(Time-windowed Join)、临时表Join(Temporal Table Join)和传统意义上的Join(Regular Join)。这里我们先介绍前两种流处理中所特有的Join,了解前两种流处理的特例可以让我们更好地理解传统意义上的Join。
+
+## Time-windowed Join
+
+在电商平台,我们作为客户一般会先和卖家聊天沟通,经过一些对话之后才会下单购买。这里我们做一个简单的数据模型,假设一个关于聊天对话的数据流`chat`表记录了买家首次和卖家的聊天信息,它包括以下字段:买家ID(buyer_id),商品ID(item_id),时间戳(ts)。如果买家从开启聊天到最后下单的速度比较快,说明这个商品的转化率比较高,非常值得进一步分析。我们想统计这些具有较高转化率的商品,比如统计从首次聊天到用户下单购买的时间小于1分钟。
+
+![Time-windowed Join:聊天对话和用户行为的Join](./img/time-window-join.png)
+
+图中,左侧为记录用户首次聊天的数据流`chat`表,它有两个字段:`buyer_id`、`item_id`和`ts`,右侧为我们之前一直使用的`user_behavior`。我们以`item_id`字段来对两个数据流进行Join,同时还增加一个时间窗口的限制,即首次聊天发生之后1分钟内用户有购买行为。相应的SQL语句如下:
+
+```sql
+SELECT
+ user_behavior.item_id,
+ user_behavior.ts AS buy_ts
+FROM chat, user_behavior
+WHERE chat.item_id = user_behavior.item_id
+ AND user_behavior.behavior = 'buy'
+ AND user_behavior.ts BETWEEN chat.ts AND chat.ts + INTERVAL '1' MINUTE;
+```
+
+Time-windowed Join其实和第五章中的Interval Join比较相似,可以用下图来解释其原理。我们对A和B两个表做Join,需要对B设置一个上下限,A表中所有界限内的数据都要与B表中的数据做连接。
+
+![Time-windowed Join时间线](./img/time-window-join-timeline.png)
+
+一个更加通用的模板为:
+
+```sql
+SELECT
+ *
+FROM A, B
+WHERE A.id = B.id
+ AND A.ts BETWEEN B.ts - lowBound AND B.ts + upperBound;
+```
+
+从语法中可以读出,`BETWEEN ... AND ...`设置了一个时间窗口,B表中某个元素的窗口为:[B.ts - lowBound, B.ts + upperBound]的闭区间,如果A表元素恰好落在这个区间,则该元素与B中这个元素连接。其中,A和B都使用时间属性进行上述窗口操作。此外,我们还需要等于谓词来进行匹配,比如`A.id = B.id`,否则大量数据会被连接到一起。
+
+除了使用`BETWEEN ... AND ...`来确定窗口起始结束点外,Flink也支持比较符号 `>, <, >=, <=`,所以,一个时间窗口也可以被写为`A.ts >= B.ts - lowBound AND A.ts <= B.ts + upperBound`这样的语法。
+
+:::info
+A表和B表必须是Append-only模式的表,即只可以追加,不可以更新。
+:::
+
+在实现上,Flink使用状态来存储一些时间窗口相关数据。时间一般接近单调递增(Event Time模式不可能保证百分百的单调递增)。过期后,这些状态数据会被清除。当然,使用Event Time意味着窗口要等待更长的时间才能关闭,状态数据会更大。
+
+## Temporal Table Join
+
+电商平台的商品价格有可能发生变化,假如我们有一个商品数据源,里面有各个商品的价格变动,它由`item_id`、`price`和`version_ts`组成,其中,`price`为当前的价格,`version_ts`为价格改动的时间戳。一旦一件商品的价格有改动,数据都会追加到这个表中,这个表保存了价格变动的日志。如果我们想获取一件被购买的商品最近的价格,需要从这个表中找到最新的数据。这个表可以根据时间戳拆分为临时表(Temporal Table),Temporal Table如下图所示:
+
+![将item_log拆解为临时表](./img/temporal-table.png)
+
+从图中可以看到,由于商品价格在更新,不同时间点的各商品价格不同。假如我们想获取00:00:07时刻各商品价格,得到的结果为右侧上表,如果获取00:01:00时刻各商品的价格,得到的结果为右侧下表。从图中拆解的过程可以看到,Temporal Table可以让我们获得某个时间点的信息,就像整个数据的一个子版本,版本之间通过时间属性来区分。
+
+对于Temporal Table来说,数据源必须是一个Append-only的追加表,表中有一个Key作为唯一标识,数据追加到这个表后,我们可以根据Key来更新Temporal Table。上图中的表使用`item_id`作为Key。每行数据都有一个时间属性,用来标记不同的版本,时间属性可以是Event Time也可以是Processing Time。上图中的表使用`version_ts`作为时间属性字段。
+
+总结下来,定义一个Temporal Time需要注意以下几点:
+
+* 数据源是一个Append-only的追加表
+* 定义Key,Key用来做唯一标识
+* 数据源中有时间属性字段,根据时间的先后来区分不同的版本
+
+下面的代码生成Temporal Table,其中`registerFunction`方法对这个Temporal Table进行了注册,它定义了Key并指定了时间属性字段,我们将在用户自定义方法的章节中专门介绍`registerFunction`使用方法。
+
+```java
+DataStream> itemStream = ...
+
+// 获取 Table
+Table itemTable = tEnv.fromDataStream(itemStream, "item_id, price, version_ts.rowtime");
+// 注册 Temporal Table Function,指定时间属性和Key
+tEnv.registerFunction("item", itemTable.createTemporalTableFunction("version_ts", "item_id"));
+```
+
+注册后,我们拥有了一个名为`item`的Temporal Table,接下来可以在SQL中对这个表进行Join:
+
+```sql
+SELECT
+ user_behavior.item_id,
+ latest_item.price,
+ user_behavior.ts
+FROM
+ user_behavior, LATERAL TABLE(item(user_behavior.ts)) AS latest_item
+WHERE user_behavior.item_id = latest_item.item_id
+ AND user_behavior.behavior = 'buy'
+```
+
+这个SQL语句筛选购买行为:`user_behavior.behavior = 'buy'`,Temporal Table `item(user_behavior.ts)`按照`user_behavior`表中的时间`ts`来获取该时间点上对应的`item`的版本,将这个表重命名为`latest_item`。这个SQL语句的计算过程如下图所示:
+
+![Temporal Table Join:对user_behavior和item_log进行Join](./img/temporal-table-join.png)
+
+整个程序的Java实现如下:
+
+```java
+// userBehavior
+DataStream> userBehaviorStream = env
+ .fromCollection(userBehaviorData)
+ // 使用Event Time必须设置时间戳和Watermark
+ .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() {
+ @Override
+ public long extractAscendingTimestamp(Tuple4 element) {
+ return element.f3.getTime();
+ }
+ });
+
+// 获取Table
+Table userBehaviorTable = tEnv.fromDataStream(userBehaviorStream, "user_id, item_id, behavior,ts.rowtime");
+tEnv.createTemporaryView("user_behavior", userBehaviorTable);
+
+// item
+DataStream> itemStream = env
+ .fromCollection(itemData)
+ .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() {
+ @Override
+ public long extractAscendingTimestamp(Tuple3 element) {
+ return element.f2.getTime();
+ }
+ });
+Table itemTable = tEnv.fromDataStream(itemStream, "item_id, price, version_ts.rowtime");
+
+// 注册 Temporal Table Function,指定时间戳和Key
+tEnv.registerFunction(
+ "item",
+ itemTable.createTemporalTableFunction("version_ts", "item_id"));
+
+String sqlQuery = "SELECT \n" +
+ " user_behavior.item_id," +
+ " latest_item.price,\n" +
+ " user_behavior.ts\n" +
+ "FROM " +
+ " user_behavior, LATERAL TABLE(item(user_behavior.ts)) AS latest_item\n" +
+ "WHERE user_behavior.item_id = latest_item.item_id" +
+ " AND user_behavior.behavior = 'buy'";
+
+// 执行SQL语句
+Table joinResult = tEnv.sqlQuery(sqlQuery);
+DataStream result = tEnv.toAppendStream(joinResult, Row.class);
+```
+
+从时间维度上来看,Temporal Table Join的效果如下图所示。
+
+![Append-only Table与Temporal Table进行Join操作](./img/temporal-join-timeline.png)
+
+将这个场景推广,如果想在其他地方使用Temporal Table Join,需要按照下面的模板编写SQL:
+
+```sql
+SELECT *
+FROM A, LATERAL TABLE(B(A.ts))
+WHERE A.id = B.id
+```
+
+使用时,要注意:
+
+* A表必须是一个Append-only的追加表。
+* B表的数据源必须是一个Append-only的追加表,且必须使用`registerFunction`将该表注册到Catalog中。注册时需要指定Key和时间属性。
+* A表和B表通过Key进行等于谓词匹配:`A.id = B.id`。
+
+在具体实现Temporal Table时,Flink维护了一个类似Keyed State的状态,某个Key值下会保存对应的数据。Event Time下,为了等待一些迟到数据,状态数据会更大一些。
+
+## Regular Join
+
+基于前面列举的两种时间维度上的Join,我们可以更好地理解传统意义上的Regular Join。对于刚刚的例子,如果商品表不是把所有改动历史都记录下来,而是只保存了某一时刻的最新值,那么我们应该使用Regular Join。如下图所示,`item`表用来存储当前最新的商品信息数据,00:02:00时刻,`item`表有了改动,Join结果如图中的`result`表。
+
+![Regular Join](./img/regular-join.png)
+
+实际上,大部分数据库都如图中左侧所示,只保存数据的最新值,而数据库的改动历史日志不会呈现给用户,仅用来做故障恢复。那么,对于这种类型的表,具体的SQL语句为:
+
+```sql
+SELECT
+ user_behavior.item_id,
+ item.price
+FROM
+ user_behavior, item
+WHERE user_behavior.item_id = item.item_id
+ AND user_behavior.behavior = 'buy'
+```
+
+Regular Join是最常规的Join,它不像Time-windowed Join和Temporal Table Join那样需要在SQL语句中考虑太多时间。它的SQL语法也和批处理中的Join一样,一般符合下面的模板:
+
+```sql
+SELECT *
+FROM A INNER JOIN B
+ON A.id = B.id
+```
+
+A和B可以是Append-only的追加表,也可以是可更新的Update表,A、B两个表中的数据可以插入、删除和更新。A、B表对应的元素都会被连接起来。在具体实现上,Flink需要将两个表都放在状态中存储。任何一个表有新数据加入,都会和另外表中所有对应行进行连接。因此,Regular Join适合输入源不太大或源数据增长量非常小的场景。我们可以配置一定的过期时间,超过这个时间后,数据会被清除。我们在[Dynamic Table](dynamic-table.md)节提到的使用下面的方法来设置过期时间:`tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))`。
+
+目前,Flink可以支持`INNER JOIN`、`LEFT JOIN`、`RIGHT JOIN`和`FULL OUTER JOIN`,只支持等于谓词匹配:`ON A.id = B.id`。使用Regular Join时,尽量避免出现笛卡尔积式的连接。
+
+:::info
+在Regular Join中,我们无法`SELECT`时间属性,因为Flink SQL无法严格保证数据按照时间属性排序。如果我们想要`SELECT`时间字段,一个办法是在定义Schema时,不明确指定该字段为时间属性,比如使用SQL DDL定义时,不设置`WATERMARK FOR rowtime_column AS watermark_strategy_expression`。
+:::
\ No newline at end of file
diff --git a/doc/ch-table-sql/sql-window.md b/doc/ch-table-sql/sql-window.md
new file mode 100644
index 0000000..d255d81
--- /dev/null
+++ b/doc/ch-table-sql/sql-window.md
@@ -0,0 +1,387 @@
+(sql-window)=
+# 时间和窗口
+
+:::{note}
+
+本教程已出版为《Flink原理与实践》,感兴趣的读者请在各大电商平台购买!
+
+ ![](https://img.shields.io/badge/JD-%E8%B4%AD%E4%B9%B0%E9%93%BE%E6%8E%A5-red)
+
+
+:::
+
+本节主要讨论如何在Flink SQL上使用窗口。
+
+## 时间属性
+
+Table API & SQL支持时间维度上的处理。时间属性(Time Attribute)用一个`TIMESTAMP(int precision)`数据类型来表示,这个类型与SQL标准中的时间戳类型相对应,是Table API& SQL中专门用来表征时间属性的数据类型。`precision`为精度,表示秒以下保留几位小数点,可以是0到9的一个数字。具体而言,时间的格式为:
+
+```
+year-month-day hour:minute:second[.fractional]
+```
+
+假如我们想要使用一个纳秒精度的时间,应该声明类型为`TIMESTAMP(9)`,套用上面的时间格式的话,可以表征从`0000-01-01 00:00:00.000000000`到`9999-12-31 23:59:59.999999999`。绝大多数情况下,我们使用毫秒精度即可,即`TIMESTAMP(3)`。
+
+当涉及到时间窗口,往往就要涉及到窗口的长度单位,现有的时间单位有`MILLISECOND`、`SECOND`、`MINUTE`、`HOUR`、`DAY`、`MONTH`和`YEAR`。
+
+在第五章中,我们曾介绍,Flink提供了三种时间语义:Processing Time、Ingestion Time和Event Time。Processing Time是数据被机器处理时的系统时间,Ingestion Time是数据流入Flink的时间,Event Time是数据实际发生的时间。我们在之前章节曾详细探讨这几种时间语义的使用方法,这里我们主要介绍一下在Table API & SQL中Processing Time和Event Time两种时间语义的使用方法。
+
+如果想在Table API & SQL中使用时间相关的计算,我们必须在Java或Scala代码中设置使用哪种时间语义:
+
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+// 默认使用Processing Time
+env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+// 使用IngestionTime
+env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+// 使用EventTime
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+```
+
+同时,我们必须要在Schema中指定一个字段为时间属性,否则Flink无法知道具体哪个字段与时间相关。
+
+指定时间属性时可以有下面几种方式:使用SQL DDL或者由`DataStream`转为`Table`时定义一个时间属性。
+
+### Processing Time
+
+#### SQL DDL
+
+Processing Time使用当机器的系统时间作为时间,在Table API & SQL中这个字段被称为`proctime`。它不需要配置Watermark。使用时,我们在原本的Schema上添加一个虚拟的时间戳列,时间戳列由`PROCTIME()`函数计算产生。
+
+```sql
+CREATE TABLE user_behavior (
+ user_id BIGINT,
+ item_id BIGINT,
+ category_id BIGINT,
+ behavior STRING,
+ ts TIMESTAMP(3),
+ -- 在原有Schema基础上添加一列proctime
+ proctime as PROCTIME()
+) WITH (
+ ...
+);
+```
+
+后续过程中,我们可以在`proctime`这个时间属性上进行相关计算:
+
+```sql
+SELECT
+ user_id,
+ COUNT(behavior) AS behavior_cnt,
+ TUMBLE_END(proctime, INTERVAL '1' MINUTE) AS end_ts
+FROM user_behavior
+GROUP BY user_id, TUMBLE(proctime, INTERVAL '1' MINUTE)
+```
+
+#### 由DataStream转化
+
+将`DataStream`转为`Table`:
+
+```java
+DataStream userBehaviorDataStream = ...
+
+// 定义了Schema中各字段的名字,其中proctime使用了.proctime属性,这个属性帮我们生成一个Processing Time
+tEnv.createTemporaryView("user_behavior", userBehaviorDataStream,
+ "userId as user_id, itemId as item_id, categoryId as category_id, behavior, proctime.proctime");
+```
+
+可以看到,`proctime`这个属性追加到了其他字段之后,是在原有Schema基础上增加的一个字段。Flink帮我们自动生成了Processing Time的时间属性。
+
+### Event Time
+
+Event Time时间语义使用一条数据实际发生的时间作为时间属性,在Table API & SQL中这个字段通常被称为`rowtime`。这种模式下多次重复计算时,计算结果是确定的。这意味着,Event Time时间语义可以保证流处理和批处理的统一。Event Time时间语义下,我们需要设置每条数据发生时的时间戳,并提供一个Watermark。Watermark表示迟于该时间的数据都作为迟到数据对待。
+
+#### SQL DDL
+
+我们需要在SQL DDL中使用`WATERMARK`关键字,用来表明某个字段是Event Time时间属性,并且设置一个Watermark等待策略。
+
+```sql
+CREATE TABLE user_behavior (
+ user_id BIGINT,
+ item_id BIGINT,
+ category_id BIGINT,
+ behavior STRING,
+ ts TIMESTAMP(3),
+ -- 定义ts字段为Event Time时间戳,Watermark比监测到的最晚时间还晚5秒
+ WATERMARK FOR ts as ts - INTERVAL '5' SECOND
+) WITH (
+ ...
+);
+```
+
+在上面的DDL中,`WATERMARK`起到了定义Event Time时间属性的作用,它的基本语法规则为:`WATERMARK FOR rowtime_column AS watermark_strategy_expression`。
+
+`rowtime_column`为时间戳字段,可以是数据中的自带字段,也可以是类似`PROCTIME()`函数计算出的虚拟时间戳字段,这个字段必须是`TIMESTAMP(3)`类型。
+
+`watermark_strategy_expression`定义了Watermark的生成策略,返回值必须是`TIMESTAMP(3)`类型。Flink提供了几种常用的策略:
+
+* 数据自身的时间戳严格按照单调递增的形式出现,即晚到达的时间戳总比早到达的时间戳大,可以使用`WATERMARK FOR rowtime_column AS rowtime_column`或`WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND`生成Watermark。这个策略的原理是:监测所有数据时间戳,并记录时间戳最大值,在最大值基础上添加一个1毫秒的延迟作为Watermark时间。
+
+* 数据本身是乱序到达的,Watermark在时间戳最大值的基础上延迟一定时间间隔,如果数据仍比这个时间还晚,则被定为迟到数据。我们可以使用`WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'duration' timeUnit`生成Watermark。例如,`WATERMARK FOR ts as ts - INTERVAL '5' SECOND`定义的Watermark比时间戳最大值还延迟了5秒。这里`timeUnit`可以是`SECOND`、`MINUTE`或`HOUR`等时间单位。
+
+#### 由DataStream转化
+
+如果由`DataStream`转化为一个`Table`,那么需要在`DataStream`上设置好时间戳和Watermark。我们曾在第五章中讲解如何对数据流设置时间戳和Watermark。设置好后,再将`DataStream`转为`Table`:
+
+```java
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+DataStream userBehaviorDataStream = env
+ .addSource(...)
+ // 在DataStream里设置时间戳和Watermark
+ .assignTimestampsAndWatermarks(...);
+
+// 创建一个user_behavior表
+// ts.rowtime表示该列使用EventTime Timestamp
+tEnv.createTemporaryView("user_behavior", userBehaviorDataStream, "userId as user_id, itemId as item_id, categoryId as category_id, behavior, ts.rowtime");
+```
+
+## 窗口聚合
+
+基于上述的时间属性,我们可以在时间维度上进行一些分组和聚合操作。SQL用户经常使用聚合操作,比如`GROUP BY`和`OVER WINDOW`,目前Flink已经在流处理中支持了这两种SQL语法。
+
+### GROUP BY
+
+`GROUP BY`是很多SQL用户经常使用的窗口聚合函数,在流处理的一个时间窗口上进行`GROUP BY`与批处理中的非常相似,在之前的例子中我们已经开始使用了`GROUP BY`。以`GROUP BY field1, time_attr_window`语句为例,所有含有相同`field1 + time_attr_window`的行会被分到一组中,再对这组数据中的其他字段`field2`进行聚合操作,常见的聚合操作有`COUNT`、`SUM`、`AVG`、`MAX`等。可见,时间窗口`time_attr_window`被作当做整个表的一个字段,用来做分组,下图展示了这个过程。
+
+![时间窗口上的GROUP BY](./img/group-by.png)
+
+下面的SQL语句是我们之前使用的例子:
+
+```sql
+SELECT
+ user_id,
+ COUNT(behavior) AS behavior_cnt,
+ TUMBLE_END(proctime, INTERVAL '1' MINUTE) AS end_ts
+FROM user_behavior
+GROUP BY user_id, TUMBLE(proctime, INTERVAL '1' MINUTE)
+```
+
+这里再次对这个SQL语句进行分析解释。我们定义一个1分钟的滚动窗口,滚动窗口函数定义为:`TUMBLE(proctime, INTERVAL '1' MINUTE)`,窗口以`proctime`这个Processing Time为时间属性。这里我们使用了一个窗口分组函数,这是一个滚动窗口,它形如:`TUMBLE(time_attr, interval)`,它将某个时间段内的数据都分到一组上。我们可以在`SELECT`中添加字段`TUMBLE_START(proctime, INTERVAL '1' MINUTE)`查看窗口的的起始时间。接下来我们将介绍几种常见的窗口分组函数。
+
+#### 三种窗口分组函数
+
+我们在第5章中曾详细分析几种窗口的区别,下表展示了Flink SQL中窗口分组函数和相对应的使用方法。
+
+| 窗口分组函数 | 使用介绍 |
+| :-------------------------------------------: | :----------------------------------------------------------: |
+| TUMBLE(time_attr, interval) | 定义一个滚动窗口,窗口是定长的,长度为interval,窗口之间互不重叠,滚动向前。比如我们刚才定义了一个1分钟的滚动窗口,所有属于该分钟的数据都会被归到该窗口中。 |
+| HOP(time_attr, slide_interval, size_interval) | 定义一个滑动窗口,窗口长度是定长的,长度为size_interval,窗口以slide_interval的速度向前滑动。如果slide_interval比size_interval小,那么窗口之间会重叠。这意味着一条数据可能被划分到多个窗口中。比如,窗口长度size_interval为3分钟,滑动速度slide_interval为1分钟,那么每1分钟都产生一个窗口,一条数据应该会被分到3个窗口中。如果slide_interval等于size_interval,这就是一个滚动窗口。如果slide_interval大于size_interval,那么窗口之间有间隙。 |
+| SESSION(time_attr, interval) | 定义一个会话窗口,窗口长度是变长的,当两条数据之间的Session Gap超过了interval,这两条数据被分到两个窗口上。或者说,一个窗口等待超过interval后仍无数据进入,该窗口关闭。比如,我们定义Session Gap为3分钟,一个窗口最后一条数据之后的三分钟内没有新数据出现,则该窗口关闭,再之后的数据被归为下一个窗口。 |
+
+在这些函数中,时间间隔应该按照`INTERVAL 'duration' timeUnit`的格式来写。比如,1分钟可以写为:`INTERVAL '1' MINUTE`。
+
+Flink的流处理和批处理都支持上述三种窗口函数的。批处理没有时间语义之说,直接使用数据集中的时间字段;流处理中,如上一小节所示,时间语义可以选择为Event Time或Processing Time。当然,时间窗口必须基于上一节所提到的时间属性。
+
+#### 窗口的起始结束时间
+
+如果想查看窗口的起始结束时间,需要使用一个起始时间函数或结束时间函数。如下表所示,我们以滚动窗口为例,列出常用的函数:
+
+| 函数 | 使用介绍 |
+| :---------------------------------------: | :----------------------------------------------------------: |
+| TUMBLE_START(time_attr, interval) | 返回当前窗口的起始时间(包含边界),如`[00:10, 00:20) `的窗口,返回 `00:10` 。 |
+| TUMBLE_END(time_attr, interval) | 返回当前窗口的结束时间(包含边界),如`[00:00, 00:20)` 的窗口,返回 `00:20`。 |
+| TUMBLE_ROWTIME(time_attr, interval) | 返回窗口的结束时间(不包含边界)。如 `[00:00, 00:20]` 的窗口,返回 `00:19:59.999` 。返回值是一个rowtime,可以基于该字段做时间属性的操作,如内联视图子查询或时间窗口上的JOIN。只能用在Event Time时间语义的作业上。 |
+| TUMBLE_PROCTIME(time-attr, size-interval) | 返回窗口的结束时间(不包含边界)。如 `[00:00, 00:20]` 的窗口,返回 `00:19:59.999` 。返回值是一个proctime,可以基于该字段做时间属性的操作,如内联视图子查询或时间窗口上的JOIN。 只能用在Processing Time时间语义的作业上。 |
+
+:::note
+同一个SQL查询中,`TUMBLE(time_attr, interval)`函数中的`interval`和`TUMBLE_START(time_attr, interval)`函数中的`interval`要保持一致。确切地说,`INTERVAL 'duration' timeUnit`中的`duration`时间长度和`timeUnit`时间单位都要前后保持一致。
+:::
+
+我们已经在前面的例子中展示了`TUMBLE_END`的例子,这里不再过多解释。`TUMBLE_START`或`TUMBLE_END`返回的是展示的结果,已经不再是一个时间属性,无法被后续其他查询用来作为时间属性做进一步查询。假如我们想基于窗口时间戳做进一步的查询,比如内联视图子查询或Join等操作,我们需要使用`TUMBLE_ROWTIME`和`TUMBLE_PROCTIME`。比如下面的例子:
+
+```java
+SELECT
+ TUMBLE_END(rowtime, INTERVAL '20' MINUTE),
+ user_id,
+ SUM(cnt)
+FROM (
+ SELECT
+ user_id,
+ COUNT(behavior) AS cnt,
+ TUMBLE_ROWTIME(ts, INTERVAL '10' SECOND) AS rowtime
+ FROM user_behavior
+ GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)
+ )
+GROUP BY TUMBLE(rowtime, INTERVAL '20' MINUTE), user_id
+```
+
+这是一个嵌套的内联视图查询,我们先做一个10秒钟的视图,再在此基础上进行20分钟的聚合。子查询使用了`TUMBLE_ROWTIME`,这个字段仍然是一个时间属性,后续其他操作可以在此基础上继续使用各种时间相关计算。
+
+前面详细分析了滚动窗口的各个函数,对于滑动窗口,Flink提供有`HOP_START()`、`HOP_END()`、`HOP_ROWTIME()`、`HOP_PROCTIME()`这些函数;对于会话窗口,有`SESSION_START()`、`SESSION_END()`、`SESSION_ROWTIME()`和`SESSION_PROCTIME()`。这些函数的使用方法比较相似,这里不再赘述。
+
+综上,`GROUP BY`将多行数据分到一组,然后对一组的数据进行聚合,聚合结果为一行数据。或者说,`GROUP BY`一般是多行变一行。
+
+### OVER WINDOW
+
+传统SQL中专门进行窗口处理的函数为`OVER WINDOW`。`OVER WINDOW`与`GROUP BY`有些不同,它对每一行数据都生成窗口,在窗口上进行聚合,聚合的结果会生成一个新字段。或者说,`OVER WINDOW`一般是一行变一行。
+
+![OVER WINDOW示意图](./img/over-window.png)
+
+上图展示了`OVER WINDOW`的工作示意图,窗口确定的方式为:先对`field1`做分组,相同`field1`的数据被分到一起,按照时间属性排序,即上图中的`PARTITION BY`和`ORDER BY`部分;然后每行数据都建立一个窗口,窗口起始点是`field1`分组的第一行数据,结束点是当前行;窗口划分好后,再对窗口内的`field2`字段做各类聚合操作,生成`field2_agg`的新字段,常见的聚合操作有`COUNT`、`SUM`、`AVG`或`MAX`等。从图中可以看出,每一行都有一个窗口,当前行是这个窗口的最后一行,窗口的聚合结果生成一个新的字段。具体的实现逻辑上,Flink为每一个元素维护一个窗口,为每一个元素执行一次窗口计算,完成计算后会清除过期数据。
+
+Flink SQL中对`OVER WINDOW`的定义遵循了标准的SQL语法,我们先来看一下`OVER WINDOW`的语法结构:
+
+```sql
+SELECT
+ AGG_FUNCTION(field2) OVER (windowDefinition2) AS field2_agg,
+ ...
+ AGG_FUNCTION(fieldN) OVER (windowDefinitionN) AS fieldN_agg
+FROM tab1
+```
+
+其中,`windowDefinition2`是定义窗口的规则,包括根据哪些字段进行`PARTITION BY`等,在定义好的窗口上,我们使用`AGG_FUNCTION(field2)`对`field2`字段进行聚合计算。或者我们可以使用别名来定义窗口`WINDOW w AS ...`:
+
+```sql
+SELECT
+ AGG_FUNCTION(field2) OVER w AS field2_agg,
+ ...
+FROM tab1
+WINDOW w AS (windowDefinition)
+```
+
+那么具体应该如何划分窗口,如何写`windowDefinition`呢?上图中只演示了一种窗口划分的方式,常用的窗口划分方式可以基于行,也可以基于时间段,接下来我们通过一些例子来展示窗口的划分。
+
+#### ROWS OVER WINDOW
+
+我们首先演示基于行来划分窗口,这里仍然以用户行为数据流来演示:
+
+```sql
+SELECT
+ user_id,
+ behavior,
+ COUNT(*) OVER w AS behavior_count,
+ ts
+FROM user_behavior
+WINDOW w AS (
+ PARTITION BY user_id
+ ORDER BY ts
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+)
+```
+
+上面的SQL语句中,`WINDOW w AS (...)`定义了一个名为`w`的窗口,它根据用户的`user_id`来分组,并按照`ts`来排序。原始数据并不是基于用户ID来分组的,`PARTITION BY user_id`起到了分组的作用,相同`user_id`的用户被分到了一组,组内按照时间戳`ts`来排序。这里完成了上图中最左侧表到中间表的转化。
+
+`ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`定义了窗口的起始和结束,窗口的起始点为`UNBOUNDED PRECEDING`,这两个SQL关键词组合在一起表示窗口起始点是数据流的最开始的行,`CURRENT ROW`表示结束点是当前行。`ROWS BETWEEN ... AND ...`这样的语句定义了窗口的起始和结束。结合分组和排序策略,这就意味着,这个窗口从数据流的第一行开始到当前行结束,按照`user_id`分组,按照`ts`排序。
+
+:::note
+目前`OVER WINDOW`上,Flink只支持基于时间属性的`ORDER BY`排序,无法基于其他字段进行排序。
+:::
+
+![ROWS:按行划分窗口](./img/rows-over-window.png)
+
+上图展示了按行划分窗口的基本原理,图中上半部分使用`UNBOUNDED PRECEDING`表示起始位置,那么窗口是从数据流的第一个元素开始一直到当前元素;下半部分使用`1 PRECEDING`表示起始位置,窗口的起始点是本元素的前一个元素,我们可以把1换成其他我们想要的数字。
+
+:::note
+图中最后两行数据从时间上虽然同时到达,但由于窗口是按行划分的,这两行数据被划分为两个窗口,这与后文提到的按时间段划分有所区别。
+:::
+
+如果输入数据流如下表:
+
+| user_id | pv_count | ts |
+| :-----: | :------: | :-----------------: |
+| 1 | pv | 2017-12-01 00:00:00 |
+| 2 | fav | 2017-12-01 00:00:00 |
+| 1 | pv | 2017-12-01 00:00:02 |
+| 2 | cart | 2017-12-01 00:00:03 |
+
+那么对于之前的SQL语句,一个查询的结果将产生下面的数据:
+
+| user_id | behavior | behavior_cnt | ts |
+| :-----: | :------: | :----------: | :-----------------: |
+| 1 | pv | 1 | 2017-12-01 00:00:00 |
+| 2 | fav | 1 | 2017-12-01 00:00:00 |
+| 1 | pv | 2 | 2017-12-01 00:00:02 |
+| 2 | cart | 2 | 2017-12-01 00:00:03 |
+
+可以看到,对于输入的每一行数据,都有一行输出。
+
+总结下来,`ROWS OVER WINDOW`的模式应该按照下面的模式来编写SQL:
+
+```sql
+SELECT
+ field1,
+ AGG_FUNCTION(field2) OVER (
+ [PARTITION BY (value_expression1,..., value_expressionN)]
+ ORDER BY timeAttr
+ ROWS
+ BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS fieldName
+FROM tab1
+
+-- 使用AS
+SELECT
+ field1,
+ AGG_FUNCTION(field2) OVER w AS fieldName
+FROM tab1
+WINDOW w AS (
+ [PARTITION BY (value_expression1,..., value_expressionN)]
+ ORDER BY timeAttr
+ ROWS
+ BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW
+)
+```
+
+需要注意:
+
+* `PARTITION BY`是可选的,可以根据一到多个字段来对数据进行分组。
+* `ORDER BY`之后必须是一个时间属性,用于对数据进行排序。
+* `ROWS BETWEEN ... AND ...`用来界定窗口的起始结束点。`UNBOUNDED PRECEDING`表示整个数据流的开始作为起始点,也可以使用`rowCount PRECEDING`来表示当前行之前的某个元素作为起始点,`rowCount`是一个数字;`CURRENT ROW`表示当前行作为结束点。
+
+#### RANGES OVER WINDOW
+
+第二种划分的方式是按照时间段来划分窗口,SQL中关键字为`RANGE`。这种窗口的结束点也是当前行,起始点是当前行之前的某个时间点。我们仍然以用户行为为例,SQL语句改为:
+
+```sql
+SELECT
+ user_id,
+ COUNT(*) OVER w AS behavior_count,
+ ts
+FROM user_behavior
+WINDOW w AS (
+ PARTITION BY user_id
+ ORDER BY ts
+ RANGE BETWEEN INTERVAL '2' SECOND PRECEDING AND CURRENT ROW
+)
+```
+
+可以看到,与`ROWS`的区别在于,`RANGE`后面使用的是一个时间段,根据当前行的时间减去这个时间段,可以得到起始时间。
+
+![RANGE:按时间段划分窗口](./img/range-over-window.png)
+
+上图展示了按时间段划分窗口的基本原理,图中上半部分使用`UNBOUNDED PRECEDING`表示起始位置,与`ROWS`按行划分不同的是,最后两个元素虽然同时到达,但是他们被划分为一个窗口(图上半部分中的w4);下半部分使用`INTERVAL '2' SECOND`表示起始位置,窗口的起始点是当前元素减去2秒,最后两个元素也被划分到了一个窗口(图下半部分中的w4)。
+
+总结下来,`RANGE OVER WINDOW`的格式应该按照下面的模式来编写SQL:
+
+```sql
+SELECT
+ field1,
+ AGG_FUNCTION(field2) OVER (
+ [PARTITION BY (value_expression1,..., value_expressionN)]
+ ORDER BY timeAttr
+ RANGE
+ BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS fieldName
+FROM tab1
+
+-- 使用AS
+SELECT
+ field1,
+ AGG_FUNCTION(field2) OVER w AS fieldName
+FROM tab1
+WINDOW w AS (
+ [PARTITION BY (value_expression1,..., value_expressionN)]
+ ORDER BY timeAttr
+ RANGE
+ BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW
+)
+```
+
+需要注意:
+
+* `PARTITION BY`是可选的,可以根据一个一到多个字段来对数据进行分组。
+* `ORDER BY`之后必须是一个时间属性,用于对数据进行排序。
+* `RANGE BETWEEN ... AND ...`用来界定窗口的起始结束点。我们可以使用`UNBOUNDED PRECEDING`表示数据流的开始作为起始点,也可以使用一个`timeInterval PRECEDING`来表示当前行之前的某个时间点作为起始点。
+
+综上,`OVER WINDOW`下,每行数据都生成一个窗口,窗口内的数据聚合后生成一个新字段。窗口的划分可以按行`ROWS`,也可以按时间段`RANGE`。
\ No newline at end of file