首页 > 关于Rxjava的tack操作符?

关于Rxjava的tack操作符?

tack操作符,取出N个结果数,但我发现其还有另外一个:

.take(1 * 1000, TimeUnit.SECONDS, new Scheduler() {
                            @Override
                            public Worker createWorker() {
                                Worker wo = new Worker() {
                                    @Override
                                    public Subscription schedule(Action0 action0) {
                                        Log.e("1","action01");
                                        try {
                                            Thread.sleep(2232);
                                        } catch (InterruptedException e) {
                                            e.printStackTrace();
                                        }
                                        Log.e("1","action02");
                                        return null;
                                    }

                                    @Override
                                    public Subscription schedule(Action0 action0, long l, TimeUnit timeUnit) {
                                        Log.e("2",timeUnit.name());
                                        Log.e("2",String.valueOf(l));
                                        Log.e("2","action0");
                                        return null;
                                    }

                                    @Override
                                    public void unsubscribe() {
                                        Log.e("3","---");
                                    }

                                    @Override
                                    public boolean isUnsubscribed() {
                                        Log.e("4","---");
                                        return false;
                                    }
                                };
                                return wo;
                            }
                        })

这段代码,
在public Subscription schedule(Action0 action0)中我线程休眠了2232毫秒,
结果打印了:

06-19 11:41:37.771 7239-7239/com.example.kid.test_sql E/4: ---
06-19 11:41:37.773 7239-7239/com.example.kid.test_sql E/2: SECONDS
06-19 11:41:37.773 7239-7239/com.example.kid.test_sql E/2: 1000
06-19 11:41:37.773 7239-7239/com.example.kid.test_sql E/2: action0
06-19 11:41:37.836 7239-7583/com.example.kid.test_sql E/结果:: 1
06-19 11:41:37.837 7239-7583/com.example.kid.test_sql E/结果:: 2
06-19 11:41:37.837 7239-7583/com.example.kid.test_sql E/结果:: 3
06-19 11:41:37.837 7239-7583/com.example.kid.test_sql E/3: ---

结果是我的观察者最后打印的。
大家能帮我解释一下吗?网上好像没有这个的解释。


不太懂你构造的这个 Scheduler 的用途,不过可以看一下 take 的源码,三个参数的方法,最终构造出来的 Scheduler 只调用了第二个 schedule 方法,所以不会执行第一个中的代码。

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
    Worker worker = scheduler.createWorker();
    child.add(worker);
    
    TakeSubscriber<T> ts = new TakeSubscriber<T>(new SerializedSubscriber<T>(child));
    worker.schedule(ts, time, unit);
    return ts;
}

take(long,TimeUnit,Scheduler):获取发射的数据序列中,从头部开始某个时间段内的数据。
long : 是时间长度
TimeUnit : 是时间单位
scheduler : take操作运行的调度器,RxJava内置多个调度器, 这些调度器已经满足日常的使用,直接拿过来用。

举个例子吧,我要在工作线程中拿到前5秒发射的数据:

Observable.from(xxx)
          .take(5, TimeUnit.SECONDS, Schedulers.newThread())
          .subscribe(new Subscriber<Object>) {
              
              ...
              
          } 
【热门文章】
【热门文章】