image.png

观察者模式 Observer Design Pattern / 发布订阅模式 Publish-Subscribe Design Pattern

Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.——GoF
在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。

一般情况下,被依赖的对象叫作被观察者(Observable),依赖的对象叫作观察者(Observer)。不过,在实际的项目开发中两种对象的称呼是比较灵活的,有各种不同的叫法,比如:Subject-Observer、Publisher-Subscriber、Producer-Consumer、EventEmitter-EventListener、Dispatcher-Listener。不管怎么称呼,只要应用场景符合刚刚给出的定义,都可以看作观察者模式。

Observer 本来的意思是「观察者」,但实际上 Observer 角色并非主动地去观察,而是被动地接受来自 Subject 角色的通知。因此,观察者模式也被称为 Publish-Subscribe(发布 - 订阅)模式。《图解设计模式》的作者也认为 Publish(发布) 和 Subscribe(订阅) 这个名字可能更加合适。

登场角色:

  • Subject(被观察对象):Subject 角色表示被观察对象。Subject 角色定义了注册观察者和删除观察者的方法。此外,它还声明了「获取现在的状态」的方法。
  • ConcreteSubject(具体的观察对象):ConcreteSubject 角色表示具体的被观察对象。当自身状态发生变化后,它会通知所有已经注册的 Observer 角色。
  • Observer(观察者):Observer 角色负责接收来自 Subject 角色的状态变化的通知。为此,它声明了 update 方法。
  • ConcreteObserver(具体的观察者):ConcreteObserver 角色表示具体的 Observer。当它的 update 方法被调用后,会去获取要观察的对象的最新状态。

image.png

方法说明:

  • getSubjectStatus 是给 Observer 获取状态用的。
  • notifyObserver 调用 Observerupdate 方法时,是把自己 this 传了过去。比如示例程序中 notifyObserver 是这样写的:

一种经典的实现方式(同步阻塞实现方式):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public interface Subject {
void registerObserver(Observer observer);
void removeObserver(Observer observer);
void notifyObservers(Message message);
}

public interface Observer {
void update(Message message);
}

public class ConcreteSubject implements Subject {
private List<Observer> observers = new ArrayList<Observer>();

@Override
public void registerObserver(Observer observer) {
observers.add(observer);
}

@Override
public void removeObserver(Observer observer) {
observers.remove(observer);
}

@Override
public void notifyObservers(Message message) {
for (Observer observer : observers) {
observer.update(message);
}
}
}

public class ConcreteObserverOne implements Observer {
@Override
public void update(Message message) {
//TODO: 获取消息通知,执行自己的逻辑...
System.out.println("ConcreteObserverOne is notified.");
}
}

public class ConcreteObserverTwo implements Observer {
@Override
public void update(Message message) {
//TODO: 获取消息通知,执行自己的逻辑...
System.out.println("ConcreteObserverTwo is notified.");
}
}

public class Demo {
public static void main(String[] args) {
ConcreteSubject subject = new ConcreteSubject();
subject.registerObserver(new ConcreteObserverOne());
subject.registerObserver(new ConcreteObserverTwo());
subject.notifyObservers(new Message());
}
}

观察者模式的实现方法各式各样,函数、类的命名等会根据业务场景的不同有很大的差别,比如 register 函数还可以叫作 attachremove 函数还可以叫作 detach 等等。

拓展思路:

  • 可替换性。具体的被观察者不需要知道正在观察自己的是哪一个具体观察者。可替换性的设计思想:
    • 利用抽象类和接口从具体类中抽出抽象方法
    • 在将实例作为参数传递至类中,或者在类的字段中保存实例时,不使用具体类型,而是使用抽象类型和接口
  • 注意 Observer 的调用顺序。必须要保证 update 方法调用顺序改变时不会发生问题。
  • Observer 角色也有可能会触发 Subject 角色调用 update 方法,所以要注意不要导致循环调用。观察者不要改变被观察者。
  • MVC 中的 Model 和 View 的关系与 Subject 角色和 Observer 角色的关系相对应。Model 是指操作「不依赖于显示形式的内部模型」的部分,View 则是管理 Model「怎样显示」的部分。通常情况下,一个 Model 对应多个 View。

相关的设计模式:站内文章中介者模式

  • 在中介者模式中,有时会使用 Observer 模式来实现 Mediator 角色与 Colleague 角色之间的通信。
  • 就「发送状态变化通知」这一点而言,Mediator 模式与 Observer 模式是类似的。不过,两种模式中,通知的目的和视角不同
    • 在中介者模式中,虽然也会发送通知,不过那不过是为了对 Colleague 角色进行仲裁而已。
    • 而在观察者模式中,将 Subject 角色的状态变化通知给 Observer 角色的目的则主要是为了使 Subject 角色和 Observer 角色同步

观察者模式的实现方式

观察者模式的应用场景非常广泛,小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子,比如,邮件订阅、RSS Feeds,本质上都是观察者模式。

根据应用场景的不同,观察者模式会对应不同的代码实现方式:有同步阻塞的实现方式,也有异步非阻塞的实现方式;有进程内的实现方式,也有跨进程的实现方式。

异步非阻塞的实现方式

上一个章节介绍的示例模板代码是同步阻塞实现方式,这也是最经典的实现方式,他的作用是代码解耦。观察者和被观察者代码在同一个线程内执行,被观察者一直阻塞,直到所有的观察者代码都执行完成之后,才执行后续的代码。

我们可以改造 notifyObservers(Message message),使其成为异步非阻塞的实现方式:

  1. 多线程实现方式
  2. 引入框架,如 Google Guava EventBus

异步非阻塞除了可以实现代码解耦外,还能提高代码的执行效率。

多线程实现方式

多线程实现方式 1:观察者使用多线程处理消息。这种方式的缺点为频繁地创建和销毁线程比较耗时,并且并发线程数无法控制,创建过多的线程会导致堆栈溢出。

1
2
3
4
5
6
7
8
9
10
11
12
public class ConcreteObserverX implements Observer {
@Override
public void update(Message message) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("ConcreteObserverOne is notified.");
}
});
thread.start();
}
}

多线程实现方式 2:使用线程池通知观察者。尽管利用了线程池解决了第一种实现方式的问题,但线程池、异步执行逻辑都耦合在了 notifyObservers() 函数中,增加了这部分业务代码的维护成本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ConcreteSubject implements Subject {
private List<Observer> observers = new ArrayList<Observer>();
private Executor executor; // 可通过构造函数初始化线程池
@Override
public void notifyObservers(Message message) {
for (Observer observer : observers) {
executor.execute(new Runnable() {
@Override
public void run() {
observer.update(message);
}
});
}
}
}

如果我们的业务更加复杂,需要在同步阻塞和异步非阻塞之间灵活切换,上面的代码显然无法满足这个需求。我们可以使用 Google Guava EventBus 框架解决这个问题。使用新的框架可以隐藏实现细节,降低开发难度,实现代码的复用,解耦业务与非业务代码。

使用 Google Guava EventBus

站内文章EventBus 框架的简易实现 这篇文章中,我们可以模仿 Google Guava EventBus,动手实现一个简单的框架,加深对观察者模式的理解。

其他实现方式

在上面讲的场景中,不管是同步阻塞的实现方式还是异步非阻塞的实现方式,都是进程内的实现方式。对于跨进程、跨系统的观察者模式,我们可以使用 站内文章RPC 接口执行通知操作。

或者使用更优雅、更常用的方式——引入消息队列。消息队列将观察者和被观察者解耦得更加彻底,被观察者完全不感知观察者,同理,观察者也完全不感知被观察者。被观察者只管发送消息到消息队列,观察者只管从消息队列中读取消息来执行相应的逻辑。不过这样会增加维护成本。

java.util.Observer 接口

Java 类库中的 java.util.Observer 接口和 java.util.Observable 类就是一种观察者模式。

java.util.Observer 接口中定义了以下方法。

1
public void update (Observable obj, Object arg)

update 方法的参数则接收到了如下内容:

  • Observable 类的实例是被观察的 Subject 角色。
  • Object 类的实例是附加信息

我们一般不直接用这些接口实现 Observer 模式,因为 java.util.Observer 接口和 java.util.observable 类并不好用。理由很简单,传递给 java.util.Observer 接口的 Subject 角色必须是 java.util.Observable 类型(或者它的子类型)的。但 Java 只能单一继承,也就说如果 Subject 角色已经是某个类的子类了,那么它将无法继承 java.util.observable 类。

Coad 书讲解了这个问题的解决办法。在该书介绍的观察者模式中,Subject 角色和 Observer 接口都被定义为 Java 的接口,这种观察者模式更容易使用。

本文参考