-
Notifications
You must be signed in to change notification settings - Fork 42
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
haiji.yang
committed
Dec 4, 2020
1 parent
7585187
commit a518c9c
Showing
6 changed files
with
380 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
## Balking模式:再谈线程安全的单例模式 | ||
|
||
上一篇文章中,我们提到可以用“多线程版本的 if”来理解 Guarded Suspension 模式,不同于单线程中的 if,这个“多线程版本的 if”是需要等待的,而且还很执着,必须要等到条件为真。但很显然这个世界,不是所有场景都需要这么执着,有时候我们还需要快速放弃。 | ||
|
||
需要快速放弃的一个最常见的例子是各种编辑器提供的自动保存功能。自动保存功能的实现逻辑一般都是隔一定时间自动执行存盘操作,存盘操作的前提是文件做过修改,如果文件没有执行过修改操作,就需要快速放弃存盘操作。下面的示例代码将自动保存功能代码化了,很显然 AutoSaveEditor 这个类不是线程安全的,因为对共享变量 changed 的读写没有使用同步,那如何保证 AutoSaveEditor 的线程安全性呢? | ||
|
||
```java | ||
|
||
class AutoSaveEditor{ | ||
//文件是否被修改过 | ||
boolean changed=false; | ||
//定时任务线程池 | ||
ScheduledExecutorService ses = | ||
Executors.newSingleThreadScheduledExecutor(); | ||
//定时执行自动保存 | ||
void startAutoSave(){ | ||
ses.scheduleWithFixedDelay(()->{ | ||
autoSave(); | ||
}, 5, 5, TimeUnit.SECONDS); | ||
} | ||
//自动存盘操作 | ||
void autoSave(){ | ||
if (!changed) { | ||
return; | ||
} | ||
changed = false; | ||
//执行存盘操作 | ||
//省略且实现 | ||
this.execSave(); | ||
} | ||
//编辑操作 | ||
void edit(){ | ||
//省略编辑逻辑 | ||
...... | ||
changed = true; | ||
} | ||
} | ||
``` | ||
### 用 volatile 实现 Balking 模式 | ||
|
||
前面我们用 synchronized 实现了 Balking 模式,这种实现方式最为稳妥,建议你实际工作中也使用这个方案。不过在某些特定场景下,也可以使用 volatile 来实现,但使用 volatile 的前提是对原子性没有要求。 | ||
|
||
在 [Copy-on-Write 模式:不是延时策略的 COW](CopyOnWrite.md)中,有一个 RPC 框架路由表的案例,在 RPC 框架中,本地路由表是要和注册中心进行信息同步的,应用启动的时候,会将应用依赖服务的路由表从注册中心同步到本地路由表中,如果应用重启的时候注册中心宕机,那么会导致该应用依赖的服务均不可用,因为找不到依赖服务的路由表。为了防止这种极端情况出现,RPC 框架可以将本地路由表自动保存到本地文件中,如果重启的时候注册中心宕机,那么就从本地文件中恢复重启前的路由表。这其实也是一种降级的方案。 | ||
|
||
自动保存路由表和前面介绍的编辑器自动保存原理是一样的,也可以用 Balking 模式实现,不过我们这里采用 volatile 来实现,实现的代码如下所示。之所以可以采用 volatile 来实现,是因为对共享变量 changed 和 rt 的写操作不存在原子性的要求,而且采用 scheduleWithFixedDelay() 这种调度方式能保证同一时刻只有一个线程执行 autoSave() 方法。 | ||
|
||
```java | ||
|
||
//路由表信息 | ||
public class RouterTable { | ||
//Key:接口名 | ||
//Value:路由集合 | ||
ConcurrentHashMap<String, CopyOnWriteArraySet<Router>> | ||
rt = new ConcurrentHashMap<>(); | ||
//路由表是否发生变化 | ||
volatile boolean changed; | ||
//将路由表写入本地文件的线程池 | ||
ScheduledExecutorService ses= | ||
Executors.newSingleThreadScheduledExecutor(); | ||
//启动定时任务 | ||
//将变更后的路由表写入本地文件 | ||
public void startLocalSaver(){ | ||
ses.scheduleWithFixedDelay(()->{ | ||
autoSave(); | ||
}, 1, 1, MINUTES); | ||
} | ||
//保存路由表到本地文件 | ||
void autoSave() { | ||
if (!changed) { | ||
return; | ||
} | ||
changed = false; | ||
//将路由表写入本地文件 | ||
//省略其方法实现 | ||
this.save2Local(); | ||
} | ||
//删除路由 | ||
public void remove(Router router) { | ||
Set<Router> set=rt.get(router.iface); | ||
if (set != null) { | ||
set.remove(router); | ||
//路由表已发生变化 | ||
changed = true; | ||
} | ||
} | ||
//增加路由 | ||
public void add(Router router) { | ||
Set<Router> set = rt.computeIfAbsent( | ||
route.iface, r -> | ||
new CopyOnWriteArraySet<>()); | ||
set.add(router); | ||
//路由表已发生变化 | ||
changed = true; | ||
} | ||
} | ||
``` | ||
|
||
Balking 模式有一个非常典型的应用场景就是单次初始化,下面的示例代码是它的实现。这个实现方案中,我们将 init() 声明为一个同步方法,这样同一个时刻就只有一个线程能够执行 init() 方法;init() 方法在第一次执行完时会将 inited 设置为 true,这样后续执行 init() 方法的线程就不会再执行 doInit() 了。 | ||
|
||
```java | ||
|
||
class InitTest{ | ||
boolean inited = false; | ||
synchronized void init(){ | ||
if(inited){ | ||
return; | ||
} | ||
//省略doInit的实现 | ||
doInit(); | ||
inited=true; | ||
} | ||
} | ||
``` | ||
线程安全的单例模式本质上其实也是单次初始化,所以可以用 Balking 模式来实现线程安全的单例模式,下面的示例代码是其实现。这个实现虽然功能上没有问题,但是性能却很差,因为互斥锁 synchronized 将 getInstance() 方法串行化了,那有没有办法可以优化一下它的性能呢? | ||
|
||
```java | ||
|
||
class Singleton{ | ||
private static | ||
Singleton singleton; | ||
//构造方法私有化 | ||
private Singleton(){} | ||
//获取实例(单例) | ||
public synchronized static | ||
Singleton getInstance(){ | ||
if(singleton == null){ | ||
singleton=new Singleton(); | ||
} | ||
return singleton; | ||
} | ||
} | ||
``` | ||
办法当然是有的,那就是经典的双重检查(Double Check)方案,下面的示例代码是其详细实现。在双重检查方案中,一旦 Singleton 对象被成功创建之后,就不会执行 synchronized(Singleton.class){}相关的代码,也就是说,此时 getInstance() 方法的执行路径是无锁的,从而解决了性能问题。 | ||
|
||
```java | ||
|
||
class Singleton{ | ||
private static volatile | ||
Singleton singleton; | ||
//构造方法私有化 | ||
private Singleton() {} | ||
//获取实例(单例) | ||
public static Singleton | ||
getInstance() { | ||
//第一次检查 | ||
if(singleton==null){ | ||
synchronize(Singleton.class){ | ||
//获取锁后二次检查 | ||
if(singleton==null){ | ||
singleton=new Singleton(); | ||
} | ||
} | ||
} | ||
return singleton; | ||
} | ||
} | ||
``` | ||
|
||
### 总结 | ||
|
||
Balking 模式和 Guarded Suspension 模式从实现上看似乎没有多大的关系,Balking 模式只需要用互斥锁就能解决,而 Guarded Suspension 模式则要用到管程这种高级的并发原语;但是从应用的角度来看,它们解决的都是“线程安全的 if”语义,不同之处在于,Guarded Suspension 模式会等待 if 条件为真,而 Balking 模式不会等待。 | ||
|
||
Balking 模式的经典实现是使用互斥锁,你可以使用 Java 语言内置 synchronized,也可以使用 SDK 提供 Lock;如果你对互斥锁的性能不满意,可以尝试采用 volatile 方案,不过使用 volatile 方案需要你更加谨慎。当然你也可以尝试使用双重检查方案来优化性能,双重检查中的第一次检查,完全是出于对性能的考量:避免执行加锁操作,因为加锁操作很耗时。而加锁之后的二次检查,则是出于对安全性负责。 | ||
|
||
|
||
|
211 changes: 211 additions & 0 deletions
211
note/java/concurrency/designpattern/GuardedSuspension.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
## Guarded Suspension模式:等待唤醒机制的规范实现 | ||
|
||
前不久,同事小灰工作中遇到一个问题,他开发了一个 Web 项目:Web 版的文件浏览器,通过它用户可以在浏览器里查看服务器上的目录和文件。这个项目依赖运维部门提供的文件浏览服务,而这个文件浏览服务只支持消息队列(MQ)方式接入。消息队列在互联网大厂中用的非常多,主要用作流量削峰和系统解耦。在这种接入方式中,发送消息和消费结果这两个操作之间是异步的,你可以参考下面的示意图来理解。 | ||
|
||
![MQ 示意图](img/d1ad5ce1df66d85698308c41e4e93a21.png) | ||
|
||
在小灰的这个 Web 项目中,用户通过浏览器发过来一个请求,会被转换成一个异步消息发送给 MQ,等 MQ 返回结果后,再将这个结果返回至浏览器。小灰同学的问题是:给 MQ 发送消息的线程是处理 Web 请求的线程 T1,但消费 MQ 结果的线程并不是线程 T1,那线程 T1 如何等待 MQ 的返回结果呢?为了便于你理解这个场景,我将其代码化了,示例代码如下。 | ||
|
||
```java | ||
|
||
class Message{ | ||
String id; | ||
String content; | ||
} | ||
//该方法可以发送消息 | ||
void send(Message msg){ | ||
//省略相关代码 | ||
} | ||
//MQ消息返回后会调用该方法 | ||
//该方法的执行线程不同于 | ||
//发送消息的线程 | ||
void onMessage(Message msg){ | ||
//省略相关代码 | ||
} | ||
//处理浏览器发来的请求 | ||
Respond handleWebReq(){ | ||
//创建一消息 | ||
Message msg1 = new | ||
Message("1","{...}"); | ||
//发送消息 | ||
send(msg1); | ||
//如何等待MQ返回的消息呢? | ||
String result = ...; | ||
} | ||
``` | ||
### Guarded Suspension 模式 | ||
|
||
上面小灰遇到的问题,在现实世界里比比皆是,只是我们一不小心就忽略了。比如,项目组团建要外出聚餐,我们提前预订了一个包间,然后兴冲冲地奔过去,到那儿后大堂经理看了一眼包间,发现服务员正在收拾,就会告诉我们:“您预订的包间服务员正在收拾,请您稍等片刻。”过了一会,大堂经理发现包间已经收拾完了,于是马上带我们去包间就餐。 | ||
|
||
我们等待包间收拾完的这个过程和小灰遇到的等待 MQ 返回消息本质上是一样的,都是等待一个条件满足:就餐需要等待包间收拾完,小灰的程序里要等待 MQ 返回消息。 | ||
|
||
那我们来看看现实世界里是如何解决这类问题的呢?现实世界里大堂经理这个角色很重要,我们是否等待,完全是由他来协调的。通过类比,相信你也一定有思路了:我们的程序里,也需要这样一个大堂经理。的确是这样,那程序世界里的大堂经理该如何设计呢?其实设计方案前人早就搞定了,而且还将其总结成了一个设计模式:Guarded Suspension。所谓 Guarded Suspension,直译过来就是“保护性地暂停”。那下面我们就来看看,Guarded Suspension 模式是如何模拟大堂经理进行保护性地暂停的。 | ||
|
||
下图就是 Guarded Suspension 模式的结构图,非常简单,一个对象 GuardedObject,内部有一个成员变量——受保护的对象,以及两个成员方法——get(Predicate p)和onChanged(T obj)方法。其中,对象 GuardedObject 就是我们前面提到的大堂经理,受保护对象就是餐厅里面的包间;受保护对象的 get() 方法对应的是我们的就餐,就餐的前提条件是包间已经收拾好了,参数 p 就是用来描述这个前提条件的;受保护对象的 onChanged() 方法对应的是服务员把包间收拾好了,通过 onChanged() 方法可以 fire 一个事件,而这个事件往往能改变前提条件 p 的计算结果。下图中,左侧的绿色线程就是需要就餐的顾客,而右侧的蓝色线程就是收拾包间的服务员。 | ||
|
||
![Guarded Suspension 示意图](img/630f3eda98a0e6a436953153c68464dc.png) | ||
|
||
GuardedObject 的内部实现非常简单,是管程的一个经典用法,你可以参考下面的示例代码,核心是:get() 方法通过条件变量的 await() 方法实现等待,onChanged() 方法通过条件变量的 signalAll() 方法实现唤醒功能。逻辑还是很简单的,所以这里就不再详细介绍了。 | ||
|
||
```java | ||
|
||
class GuardedObject<T>{ | ||
//受保护的对象 | ||
T obj; | ||
final Lock lock = | ||
new ReentrantLock(); | ||
final Condition done = | ||
lock.newCondition(); | ||
final int timeout=1; | ||
//获取受保护对象 | ||
T get(Predicate<T> p) { | ||
lock.lock(); | ||
try { | ||
//MESA管程推荐写法 | ||
while(!p.test(obj)){ | ||
done.await(timeout, | ||
TimeUnit.SECONDS); | ||
} | ||
}catch(InterruptedException e){ | ||
throw new RuntimeException(e); | ||
}finally{ | ||
lock.unlock(); | ||
} | ||
//返回非空的受保护对象 | ||
return obj; | ||
} | ||
//事件通知方法 | ||
void onChanged(T obj) { | ||
lock.lock(); | ||
try { | ||
this.obj = obj; | ||
done.signalAll(); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
} | ||
} | ||
``` | ||
|
||
### 扩展 Guarded Suspension 模式 | ||
|
||
上面我们介绍了 Guarded Suspension 模式及其实现,这个模式能够模拟现实世界里大堂经理的角色,那现在我们再来看看这个“大堂经理”能否解决小灰同学遇到的问题。 | ||
|
||
Guarded Suspension 模式里 GuardedObject 有两个核心方法,一个是 get() 方法,一个是 onChanged() 方法。很显然,在处理 Web 请求的方法 handleWebReq() 中,可以调用 GuardedObject 的 get() 方法来实现等待;在 MQ 消息的消费方法 onMessage() 中,可以调用 GuardedObject 的 onChanged() 方法来实现唤醒。 | ||
|
||
```java | ||
|
||
//处理浏览器发来的请求 | ||
Respond handleWebReq(){ | ||
//创建一消息 | ||
Message msg1 = new | ||
Message("1","{...}"); | ||
//发送消息 | ||
send(msg1); | ||
//利用GuardedObject实现等待 | ||
GuardedObject<Message> go | ||
=new GuardObjec<>(); | ||
Message r = go.get( | ||
t->t != null); | ||
} | ||
void onMessage(Message msg){ | ||
//如何找到匹配的go? | ||
GuardedObject<Message> go=??? | ||
go.onChanged(msg); | ||
} | ||
``` | ||
|
||
但是在实现的时候会遇到一个问题,handleWebReq() 里面创建了 GuardedObject 对象的实例 go,并调用其 get() 方等待结果,那在 onMessage() 方法中,如何才能够找到匹配的 GuardedObject 对象呢?这个过程类似服务员告诉大堂经理某某包间已经收拾好了,大堂经理如何根据包间找到就餐的人。现实世界里,大堂经理的头脑中,有包间和就餐人之间的关系图,所以服务员说完之后大堂经理立刻就能把就餐人找出来。 | ||
|
||
我们可以参考大堂经理识别就餐人的办法,来扩展一下 Guarded Suspension 模式,从而使它能够很方便地解决小灰同学的问题。在小灰的程序中,每个发送到 MQ 的消息,都有一个唯一性的属性 id,所以我们可以维护一个 MQ 消息 id 和 GuardedObject 对象实例的关系,这个关系可以类比大堂经理大脑里维护的包间和就餐人的关系。 | ||
|
||
有了这个关系,我们来看看具体如何实现。下面的示例代码是扩展 Guarded Suspension 模式的实现,扩展后的 GuardedObject 内部维护了一个 Map,其 Key 是 MQ 消息 id,而 Value 是 GuardedObject 对象实例,同时增加了静态方法 create() 和 fireEvent();create() 方法用来创建一个 GuardedObject 对象实例,并根据 key 值将其加入到 Map 中,而 fireEvent() 方法则是模拟的大堂经理根据包间找就餐人的逻辑。 | ||
|
||
```java | ||
|
||
class GuardedObject<T>{ | ||
//受保护的对象 | ||
T obj; | ||
final Lock lock = | ||
new ReentrantLock(); | ||
final Condition done = | ||
lock.newCondition(); | ||
final int timeout=2; | ||
//保存所有GuardedObject | ||
final static Map<Object, GuardedObject> | ||
gos=new ConcurrentHashMap<>(); | ||
//静态方法创建GuardedObject | ||
static <K> GuardedObject | ||
create(K key){ | ||
GuardedObject go=new GuardedObject(); | ||
gos.put(key, go); | ||
return go; | ||
} | ||
static <K, T> void | ||
fireEvent(K key, T obj){ | ||
GuardedObject go=gos.remove(key); | ||
if (go != null){ | ||
go.onChanged(obj); | ||
} | ||
} | ||
//获取受保护对象 | ||
T get(Predicate<T> p) { | ||
lock.lock(); | ||
try { | ||
//MESA管程推荐写法 | ||
while(!p.test(obj)){ | ||
done.await(timeout, | ||
TimeUnit.SECONDS); | ||
} | ||
}catch(InterruptedException e){ | ||
throw new RuntimeException(e); | ||
}finally{ | ||
lock.unlock(); | ||
} | ||
//返回非空的受保护对象 | ||
return obj; | ||
} | ||
//事件通知方法 | ||
void onChanged(T obj) { | ||
lock.lock(); | ||
try { | ||
this.obj = obj; | ||
done.signalAll(); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
} | ||
} | ||
``` | ||
|
||
这样利用扩展后的 GuardedObject 来解决小灰同学的问题就很简单了,具体代码如下所示。 | ||
|
||
```java | ||
|
||
//处理浏览器发来的请求 | ||
Respond handleWebReq(){ | ||
int id=序号生成器.get(); | ||
//创建一消息 | ||
Message msg1 = new | ||
Message(id,"{...}"); | ||
//创建GuardedObject实例 | ||
GuardedObject<Message> go= | ||
GuardedObject.create(id); | ||
//发送消息 | ||
send(msg1); | ||
//等待MQ消息 | ||
Message r = go.get( | ||
t->t != null); | ||
} | ||
void onMessage(Message msg){ | ||
//唤醒等待的线程 | ||
GuardedObject.fireEvent( | ||
msg.id, msg); | ||
} | ||
``` | ||
|
||
### 总结 | ||
|
||
Guarded Suspension 模式本质上是一种等待唤醒机制的实现,只不过 Guarded Suspension 模式将其规范化了。规范化的好处是你无需重头思考如何实现,也无需担心实现程序的可理解性问题,同时也能避免一不小心写出个 Bug 来。但 Guarded Suspension 模式在解决实际问题的时候,往往还是需要扩展的,扩展的方式有很多,本篇文章就直接对 GuardedObject 的功能进行了增强,Dubbo 中 DefaultFuture 这个类也是采用的这种方式,你可以对比着来看,相信对 DefaultFuture 的实现原理会理解得更透彻。当然,你也可以创建新的类来实现对 Guarded Suspension 模式的扩展。 | ||
|
||
Guarded Suspension 模式也常被称作 Guarded Wait 模式、Spin Lock 模式(因为使用了 while 循环去等待),这些名字都很形象,不过它还有一个更形象的非官方名字:多线程版本的 if。单线程场景中,if 语句是不需要等待的,因为在只有一个线程的条件下,如果这个线程被阻塞,那就没有其他活动线程了,这意味着 if 判断条件的结果也不会发生变化了。但是多线程场景中,等待就变得有意义了,这种场景下,if 判断条件的结果是可能发生变化的。所以,用“多线程版本的 if”来理解这个模式会更简单。课后思考 |
Binary file added
BIN
+70.8 KB
note/java/concurrency/designpattern/img/630f3eda98a0e6a436953153c68464dc.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+68.5 KB
note/java/concurrency/designpattern/img/d1ad5ce1df66d85698308c41e4e93a21.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters