Android RxJava系列二: 常用拓展操作符

栏目: Android · 发布时间: 5年前

内容简介:本篇文章主要介绍Rxjava 2.x的一些常用的操作符,对Rxjava不熟悉的朋友可以先去看下我之前的两篇介绍使用just()方法创建Observable对象,Observable会将事件逐个发送flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable。现在用一个map()的例子和flatMap()的例子来对比说明 flatMap() 的用法。

前言

本篇文章主要介绍Rxjava 2.x的一些常用的操作符,对Rxjava不熟悉的朋友可以先去看下我之前的两篇介绍

创建操作符

  • create() 创建一个被观察者
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("This is Observer"); //通过 ObservableEmitter 发射器向观察者发送事件。
        e.onComplete();
    }
});
  • just() 创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。
public static <T> Observable<T> just(T item) 
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "-------onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "-------onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "-------onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "-------onComplete ");
    }
});

使用just()方法创建Observable对象,Observable会将事件逐个发送

  • From 操作符

    • fromArray() 这个方法和 just() 类似,只不过 fromArray 可以传入一个数组
    • fromCallable() Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值
    • fromIterable() 直接发送一个 List 集合数据给观察者
public static <T> Observable<T> fromArray(T... items)

Integer array[] = {1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "--------------onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "--------------onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "--------------onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "--------------onComplete ");
    }
});
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)

Observable.fromCallable(new Callable < Integer > () {

    @Override
    public Integer call() throws Exception {
        return 1;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "--------------accept " + integer);
    }
});
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)


List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "--------------onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "--------------onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "--------------onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "--------------onComplete ");
    }
});
  • empty() 直接发送 onComplete() 事件
Observable.empty()
.subscribe(new Observer < Object > () {

    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "---------------------onSubscribe");
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "---------------------onNext");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "---------------------onError " + e);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete");
    }
});

转换操作符

  • map() map 可以将被观察者发送的数据类型转变成其他的类型,是一对一的转换
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)


//将 Integer 类型的数据转换成 String。
Observable.just(1, 2, 3)
.map(new Function < Integer, String > () {
    @Override
    public String apply(Integer integer) throws Exception {
        return  integer+"rxjava";
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(TAG, "----------------------onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.e(TAG, "----------------------onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
      Log.d(TAG, "---------------------onError " + e);
    }

    @Override
    public void onComplete() {
 Log.d(TAG, "---------------------onComplete" );
    }
});
  • flatMap() 这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable。现在用一个map()的例子和flatMap()的例子来对比说明 flatMap() 的用法。

需求:我们现在需要通过学校拿到院系列表,然后在每个院系中拿到学生的信息.

传统的实现方式有很多种,我就不举例了,直接使用Rxjava实现:

//学校
 class School{
        private String name;
        private List<Department> departments;

        public School(){}
        public School(String name, List<Department> departments) {
            this.name = name;
            this.departments = departments;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public List<Department> getDepartments() {
            return departments;
        }

        public void setDepartments(List<Department> departments) {
            this.departments = departments;
        }
    }
//院系
 class Department{
        private String name;
        private List<Student> students;

        public Department(){}
        public Department(String name, List<Student> students) {
            this.name = name;
            this.students = students;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public List<Student> getStudents() {
            return students;
        }

        public void setStudents(List<Student> students) {
            this.students = students;
        }
    }
//学生
class Student {
        private String name;
        private String school;

        public Student(){}
        public Student(String name, String school) {
            this.name = name;
            this.school = school;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getSchool() {
            return school;
        }

        public void setSchool(String school) {
            this.school = school;
        }
    }

使用map()方法实现:

//使用map()实现方式
        Observable.fromIterable(departments)
                .map(new Function<Department, List<Student>>() {
                    @Override
                    public List<Student> apply(Department department) throws Exception {
                        return department.getStudents();
                    }
                })
                .subscribe(new Observer<List<Student>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(List<Student> students) {
                       for (Student student : students){
                           Log.d("----------", student.getName()+student.getSchool() );
                        //如果还需要获取学生所有课程以及成绩
                       ......................
                       }
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
//使用flatMap()实现
        Observable.fromIterable(departments)
                .flatMap(new Function<Department, ObservableSource<Student>>() {
                    @Override
                    public ObservableSource<Student> apply(Department department) throws Exception {
                        return Observable.fromIterable(department.getStudents());
                    }
                })
                .flatMap() //如果还需要获取学生所有课程以及成绩操作
                .subscribe(new Observer<Student>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        
                    }

                    @Override
                    public void onNext(Student student) {
                       Log.d("---------",student.getName()+student.getSchool());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

以上代码中map()方法实现中,可以看到我们在onNext()方法中使用了for循环.如果代码逻辑在复杂一些,就可能需要嵌套for循环来实现,那就真的迷之缩进了,而使用flatMap()方法实现,只需要实现一个flatMap()转换一下就好了,随着代码逻辑增加,代码依然清晰,这就是flatMap()的强大之处,也是很多人喜欢使用Rxjava的原因所在.

  • concatMap()

    concatMap() 和 flatMap() 基本上是一样的,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。

public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)


  Observable.fromIterable(departments)
                .concatMap(new Function<Department, ObservableSource<Student>>() {
                    @Override
                    public ObservableSource<Student> apply(Department department) throws Exception {
                        return Observable.fromIterable(department.getStudents());
                    }
                })

功能操作符

  • delay() 延迟一段事件发送事件。
相当于handler的延迟发送事件

handler.sendEmptyMessageDelayed(0,2000);
public final Observable<T> delay(long delay, TimeUnit unit)

Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS) //延迟两秒再发送事件
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d("------------onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d("------------"+integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        Log.d(TAG, "----------------onComplete");
    }
});
  • doOnSubscribe() Observable 每发送 onSubscribe() 之前都会回调这个方法。

    此方法通常用来做准备工作,例如弹一个ProgressDialog提示用户, But,这里有一个小坑,特别拿出来说明一下:

前方有坑,请集中注意力

Observable.doOnSubscribe()方法是在subscribe() 调用后而且在事件发送前执行。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onComplete();
            }
        })
          .subscribeOn(Schedulers.io()) //在io执行上述操作
          .doOnSubscribe(new Consumer<Disposable>() {
              @Override
              public void accept(Disposable disposable) throws Exception {
                  dialog.show(); //显示dialog
              }
          })
          .subscribeOn(AndroidSchedulers.mainThread()) //在UI线程执行上述准备操作
          .observeOn(AndroidSchedulers.mainThread())//在UI线程执行下面操作
          .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("----","开始了");
            }

            @Override
            public void onNext(String s) {
                Log.d("----", s);
                dialog.dismiss();
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.d("----", "complete");
            }
        });
  • subscribeOn() 指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效。
public final Observable<T> subscribeOn(Scheduler scheduler)
  • observeOn() 指定观察者的线程,每指定一次就会生效一次。
public final Observable<T> observeOn(Scheduler scheduler)

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

以上就是Rxjava常用的一些操作符介绍和使用方法实例了

关于Rxjava系列二就到此结束啦,后面有时间我还会写写与retrofit2的结合使用,欢迎关注订阅!

欢迎关注作者 darryrzhong ,更多干货等你来拿哟.

请赏个小红心!因为你的鼓励是我写作的最大动力!

更多精彩文章请关注


以上所述就是小编给大家介绍的《Android RxJava系列二: 常用拓展操作符》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Masterminds of Programming

Masterminds of Programming

Federico Biancuzzi、Chromatic / O'Reilly Media / 2009-03-27 / USD 39.99

Description Masterminds of Programming features exclusive interviews with the creators of several historic and highly influential programming languages. Think along with Adin D. Falkoff (APL), Jame......一起来看看 《Masterminds of Programming》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器