重学设计模式——线程安全的观察者模式

栏目: 后端 · 发布时间: 5年前

内容简介:先来说下观察者模式,其实在Android开发中,我们使用观察者模式的时候还是非常多的,无论是广播使用的发布-订阅模式,还是listview的notifyDataSetChanged,或者是RxJava的使用,都是观察者模式的运用。今天就来重新看一下观察者模式。订阅模式,又称观察者模式。定义对象间一种一对多的依赖关系(注册),使得当一个对象改变状态后,则所有依赖它的对象都会得到通知并被自动更新(通知)。说白了就是个注册,通知的过程。先来定下一种场景,我每个月都会在微信上,收到房东的房租单(跪舔有房的大佬们),

先来说下观察者模式,其实在Android开发中,我们使用观察者模式的时候还是非常多的,无论是广播使用的发布-订阅模式,还是listview的notifyDataSetChanged,或者是RxJava的使用,都是观察者模式的运用。今天就来重新看一下观察者模式。

概念

订阅模式,又称观察者模式。定义对象间一种一对多的依赖关系(注册),使得当一个对象改变状态后,则所有依赖它的对象都会得到通知并被自动更新(通知)。说白了就是个注册,通知的过程。

观察者模式的UML图

重学设计模式——线程安全的观察者模式
  • Subject主题,也就是被观察者Observable

简单使用观察者模式

先来定下一种场景,我每个月都会在微信上,收到房东的房租单(跪舔有房的大佬们),假设房东是被观察者,我是观察者,租客我把联系方式留给房东这就是一个注册过程,每个月月初,房东大佬都会通知我们交租(被观察者变化通知观察者)。

JDK为我们提供了Observer与Observable内置接口,我们可以很方便的使用它们。

public class TheOppressed implements Observer {
    public String name;
    public TheOppressed(String name){
        this.name = name;
    }
    @Override
    public void update(Observable o, Object arg) {
        System.out.println(name +" 收到要交租信息");
    }
}
public class Exploiter extends Observable {

    public void postChange(String content){
        setChanged();

        notifyObservers(content);
    }
}

    public static void main(String[] args) {
        Exploiter exploiter = new Exploiter();

        TheOppressed theOppressed1 = new TheOppressed("打工仔1");
        TheOppressed theOppressed2 = new TheOppressed("打工仔2");
        TheOppressed theOppressed3 = new TheOppressed("打工仔3");

        exploiter.addObserver(theOppressed1);
        exploiter.addObserver(theOppressed2);
        exploiter.addObserver(theOppressed3);

        exploiter.postChange("打工仔们快来交房租啦~~");

    }
复制代码

其实往往观察者模式我们自己也可以来写,而不用系统提供的方式

/**
 * 观察者
 */
public interface MyObserver {
    /**
     * 找我
     * @param content 找我啥事
     */
    void callMe(String content);
}
/**
 * 被观察者
 */
public interface MySubject {
    /**
     * 观察者注册
     */
    void registerObserver(MyObserver observer);

    /**
     * 删除观察者
     */
    void removeObserver(MyObserver observer);

    /**
     * 主题有变化时通知观察者
     */
    void notifyObserver();
}
复制代码

先定义两个接口,实现了观察者模式的基本方法

/**
 * 房东
 */
public class Exploiter implements MySubject {
    private String name;
    //存放观察者的集合
    private List<MyObserver> observers;
    private String content;

    public Exploiter(String name) {
        this.name = name;
        observers = new ArrayList<>();
        content = "";
    }

    @Override
    public void registerObserver(MyObserver observer) {
        this.observers.add(observer);
    }

    @Override
    public void removeObserver(MyObserver observer) {
        int index = observers.indexOf(observer);
        if (index > 0) {
            observers.remove(observer);
        }
    }

    @Override
    public void notifyObserver() {
        for (int i = 0; i < observers.size(); i++) {
            //遍历所有房客,并调用他们的通知方法
            MyObserver observer = observers.get(i);
            observer.callMe(content);
        }
    }

   /**
    * 房东要发信息啦
    */
    public void postMessage(String content) {
        if (null != content && !content.trim().equals("")) {
            this.content = content;
            this.notifyObserver();
        }
    }
}

/**
 * 劳动人民
 */
public class TheOppressed implements MyObserver {
    public String name;

    public TheOppressed(String name) {
        this.name = name;
    }

    @Override
    public void callMe(String content) {
        System.out.println(name + " " + content);
    }
}
复制代码

调用代码用下

public class Test {
    public static void main(String[] args) {
        Exploiter exploiter = new Exploiter("房东");

        TheOppressed theOppressed1 = new TheOppressed("打工仔1");
        TheOppressed theOppressed2 = new TheOppressed("打工仔2");
        TheOppressed theOppressed3 = new TheOppressed("打工仔3");

        exploiter.registerObserver(theOppressed1);
        exploiter.registerObserver(theOppressed2);
        exploiter.registerObserver(theOppressed3);

        exploiter.postMessage("打工仔们快来交房租啦~~");
    }
}
//结果
打工仔1 打工仔们快来交房租啦~~
打工仔2 打工仔们快来交房租啦~~
打工仔3 打工仔们快来交房租啦~~
复制代码

其实在JDK9之后,Observer 这些都已经被废弃了,主要因为它

  • 不可序列化,Observable是个类,而不是一个接口,没有实现Serializable,所以,不能序列化和它的子类
  • 没有线程安全,方法可以被其子类覆盖,并且事件通知可以以不同的顺序并且可能在不同的线程上发生。
  • 可以使用PropertyChangeEvent和PropertyChangeListener,它是java.beans包下的类

手写线程安全的观察者模式

其实我们从上面很容易看出来,多观察者需要串行调用,被观察者发生动作,观察者要作出回应,如果观察霆太多,而且处理时间长怎么办?用异步,也许你会脱口而出,那么,异步的处理就要考虑到线程安全和队列的问题。

就是刚刚同样的场景,如果房东先发了一条涨租200,后来又发了一条,收房租(当然要多准备200),假设延迟性是非常的大的情况下,我们不可能单线程串行一直等,太费性能了,开了多线程的情况下,那么就会出现问题,可能某人会先收到交房租,这样就乱了。

现实中有好多这种并发场景,一个或者多个线程,要等待另一组线程执行完成后,才能继续执行的问题,jdk已的com.util.concurrent下为我们提供了很多多线程的类,以后有机会再细讲,今天先用一个, CountDownLatch

CountDownLatch

CountDownLatch就一个线程同步工具,它相当于一个倒序计数器,用来协调多个线程的执行。多个线程通过调用它们所共享的计数器CountDownLatch的countDown方法来让计数器减1。通过await方法来阻塞当前线程,直到计数器变成0。达到线程阻塞直至其他线程执行完成被重新唤醒。主要有三个方法:

  • 构造函数,初始化state的值,state等于同步线程数
  • await(),让线程阻塞
  • countDown(),计数器(state)减1的方法。

概念抄完,直接下手

先来定义一下Observer与Subject的接口

/**
 * 被观察者
 */
public interface MySubject {
    /**
     * 观察者注册
     */
    void registerObserver(MyObserver observer);

    /**
     * 删除观察者
     */
    void removeObserver(MyObserver observer);

    /**
     * 主题有变化时通知观察者
     */
    void notifyObserver();
}

/**
 * 观察者
 */
public interface MyObserver {
    /**
     * 找我
     * @param content 找我啥事
     */
    void callMe(String content);

    /**
     * 观察者名字
     */
    String getName();
}
复制代码

然后可以实现租客与房东的实现类

/**
 * 房东
 */
public class Exploiter implements MySubject {
    private String name;
    //    private List<MyObserver> observers;
    private ConcurrentMap<String, MyObserver> observers;
    private String content;

    public Exploiter(String name) {
        this.name = name;
//        observers = new ArrayList<>();
        observers = new ConcurrentHashMap<>();
        content = "";
    }

    @Override
    public void registerObserver(MyObserver observer) {
//        this.observers.add(observer);
        this.observers.put(observer.getName(), observer);
    }

    @Override
    public void removeObserver(MyObserver observer) {
        observers.remove(observer.getName());
//        int index = observers.indexOf(observer);
//        if (index > 0) {
//            observers.remove(observer);
//        }
    }

    @Override
    public void notifyObserver() {
        try {
            long beginTime = System.currentTimeMillis();
            CountDownLatch latch = new CountDownLatch(observers.size());
            int i = 0;
            for (MyObserver observer : observers.values()) {
                MessageSending messageSending = new MessageSending(latch, observer, content);
                messageSending.start();
                i++;
            }
//            for (int i = 0; i < observers.size(); i++) {
//            MyObserver observer = observers.get(i);
//            observer.callMe(content);
//            }
            latch.await();
            long endTime = System.currentTimeMillis();
            System.out.println(name + "消息发送完毕,耗时:" + (endTime - beginTime));
            System.out.println();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 房东要发信息啦
     */
    public void postMessage(String content) {
        if (null != content && !content.trim().equals("")) {
            this.content = content;
            System.out.println(name + "发布信息:" + content);
            this.notifyObserver();
        }
    }
}
/**
 * 劳动人民
 */
public class TheOppressed implements MyObserver {
    public String name;

    public TheOppressed(String name) {
        this.name = name;
    }

    @Override
    public void callMe(String content) {
        System.out.println(name + "接收到通知:" + content);
    }

    @Override
    public String getName() {
        return name;
    }
}
复制代码

最后在main方法调用如下:

public class Test {
    public static void main(String[] args) {
        Exploiter exploiter = new Exploiter("房东");

        TheOppressed theOppressed1 = new TheOppressed("打工仔1");
        TheOppressed theOppressed2 = new TheOppressed("打工仔2");
        TheOppressed theOppressed3 = new TheOppressed("打工仔3");

        exploiter.registerObserver(theOppressed1);
        exploiter.registerObserver(theOppressed2);
        exploiter.registerObserver(theOppressed3);

        exploiter.postMessage("这个月房租加200!!!");
        exploiter.postMessage("打工仔们快来交房租啦~~");
    }
}
复制代码

可以看到输出结果为:

房东发布信息:这个月房租加200!!!
打工仔2接收到通知:这个月房租加200!!!
打工仔1接收到通知:这个月房租加200!!!
往打工仔1消息发送完毕
往打工仔2消息发送完毕
打工仔3接收到通知:这个月房租加200!!!
往打工仔3消息发送完毕
房东消息发送完毕,耗时:2004

房东发布信息:打工仔们快来交房租啦~~
打工仔1接收到通知:打工仔们快来交房租啦~~
往打工仔1消息发送完毕
打工仔2接收到通知:打工仔们快来交房租啦~~
往打工仔2消息发送完毕
打工仔3接收到通知:打工仔们快来交房租啦~~
往打工仔3消息发送完毕
房东消息发送完毕,耗时:2001
复制代码

可以看到,使用多线程往观察者发送信息,观察者都可以很迅速的接收到信息,这是并行的,但是又保证了被观察者的多个消息之间是有先后顺序的。

总结

观察者模式主要是对象的解耦,将观察者与被观察者之间完全隔离。jdk提供的默认观察者Observer/Observable在多线程下有安全性问题,需要自己手写,JDK9之后已经废弃了。

PS:今天看的比较简单,却也挺重要吧,以前从来没注意过关于观察者模式的 多线程安全性问题,也算是查漏补缺吧。

参考

《Android源码设计模式》

深入理解设计模式(八):观察者模式

观察者模式(二)——多线程与CountDownLatch浅析

我的CSDN

下面是我的公众号,欢迎大家关注我

重学设计模式——线程安全的观察者模式

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Types and Programming Languages

Types and Programming Languages

Benjamin C. Pierce / The MIT Press / 2002-2-1 / USD 95.00

A type system is a syntactic method for automatically checking the absence of certain erroneous behaviors by classifying program phrases according to the kinds of values they compute. The study of typ......一起来看看 《Types and Programming Languages》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换