码迷,mamicode.com
首页 > 编程语言 > 详细

自定义RxJava

时间:2016-07-07 06:22:40      阅读:477      评论:0      收藏:0      [点我收藏+]

标签:

0-任务

我们有个 Web API,获取指定标签的所有新闻列表,每条新闻包含时间和内容等。
我们的任务就是下载新闻列表,选择最新的新闻,然后保存在本地。

假设第三方提供的jar里面提供了Api和ApiImpl,不可再更改:
假设getNewsList耗时1.5秒,save耗时0.5秒
1.接口

public interface Api {

    //同步方式
    List<News> getNewsList(String tag);//获取新闻列表

    Uri save(News news);//保存新闻到本地

    //异步方式
    void getNewsList(String tag, IGetNewsList callback);//获取新闻列表

    void save(News news, ISave callback);//保存新闻到本地

    interface IGetNewsList {
        void onSuccess(List<News> newsList);

        void onFailure(Exception e);
    }

    interface ISave {
        void onSuccess(Uri uri);

        void onFailure(Exception e);
    }
}

2.接口实现类

public class ApiImpl implements Api {

    private Random random = new Random();
    //线程池:核心有两个线程,最大线程数量可无限,存活时间60s
    private ExecutorService threadPoolExecutor =
            new ThreadPoolExecutor(2, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

    @Override
    public List<News> getNewsList(String tag) {
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        List<News> list = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            list.add(new News(random.nextInt(100), "这是" + tag + "标签的新闻" + i));
        }
        return list;
    }

    @Override
    public Uri save(News news) {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new Uri("地址" + news.hashCode());
    }

    @Override
    public void getNewsList(String tag, IGetNewsList callback) {
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    callback.onSuccess(getNewsList(tag));
                } catch (Exception e) {
                    callback.onFailure(e);
                }
            }
        });
    }

    @Override
    public void save(News news, ISave callback) {
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    callback.onSuccess(save(news));
                } catch (Exception e) {
                    callback.onFailure(e);
                }
            }
        });
    }

}

3.一些实体类:

public class News implements Comparable<News> {
    private int time;//发布时间
    private String content;//内容

    public News(int time, String content) {
        this.time = time;
        this.content = content;
    }

    public int getTime() {
        return time;
    }

    public void setTime(int time) {
        this.time = time;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    @Override
    public int compareTo(News another) {
        return Integer.compare(time, another.time);
    }

    @Override
    public String toString() {
        return "News{" +
                "time=" + time +
                ", content=‘" + content + ‘\‘‘ +
                ‘}‘;
    }
}
public class Uri {
    String path;

    public Uri(String path) {
        this.path = path;
    }

    public String getPath() {
        return path;
    }

    public void setPath(String path) {
        this.path = path;
    }

    @Override
    public String toString() {
        return "Uri{" +
                "path=‘" + path + ‘\‘‘ +
                ‘}‘;
    }
}

1-同步方式

    //1.同步方式
    public Uri getLatestUri(String tag) {
        Api api = new ApiImpl();//原始接口
        List<News> newsList = api.getNewsList(tag);
        LogUtil.print("获取新闻列表:" + newsList.toString());
        News latestNews = getLatestNews(newsList);
        LogUtil.print("获取最新的新闻:" + latestNews.toString());
        Uri uri = api.save(latestNews);
        LogUtil.print("保存到本地:" + uri.toString());
        return uri;
    }

    private void testSync() {
        LogUtil.print("START-同步方式");
        Uri uri = client.getLatestUri("编程");
        LogUtil.print("END-同步方式:" + uri.toString());
    }

技术分享

这里的getNewsList和save都是耗时操作,会阻塞主线程,显然同步方式不适用!

2-异步方式

    //2.异步方式
    public void getLatestUriAsync(String tag, UpdateNewsCallback callback) {

        Api api = new ApiImpl();//原始接口
        api.getNewsList(tag, new Api.IGetNewsList() {

            @Override
            public void onSuccess(List<News> newsList) {
                LogUtil.print("获取新闻列表:" + newsList.toString());
                News latestNews = getLatestNews(newsList);
                LogUtil.print("获取最新的新闻:" + latestNews.toString());
                api.save(latestNews, new Api.ISave() {
                    @Override
                    public void onSuccess(Uri uri) {
                        LogUtil.print("保存到本地:" + uri.toString());
                        callback.onSuccess(uri);
                    }

                    @Override
                    public void onFailure(Exception e) {
                        callback.onFailure(e);
                    }
                });
            }

            @Override
            public void onFailure(Exception e) {
                callback.onFailure(e);
            }
        });
    }

    private void testAysnc() {
        LogUtil.print("START-异步方式");
        client.getLatestUriAsync("干货", new UpdateNewsCallback() {
            @Override
            public void onSuccess(Uri uri) {
                LogUtil.print("END-异步方式:" + uri.toString());
            }

            @Override
            public void onFailure(Exception e) {

            }
        });
    }

技术分享

这里我没有将最后得到的Uri切换回主线程,这涉及到线程间通讯的问题,详情可查看我的另一篇博文:自定义消息传递机制

由日志可知,主线程阻塞的问题解决了,获取新闻列表是在thread-1执行的,保存到本地是在thread-2执行的,但是,新的问题又来了:回调嵌套
这还好只有两层回调,那要是来个四五层回调的嵌套,那代码,想想也是醉了,相信没有人想看这样的代码,那么,怎么办呢,且听下文分解!

3-异步包装方式

首先引入两个辅助类:
1. AsyncWork:异步任务的模板类
2. CallBack:泛型回调类
3. ApiWrapper:接口的包装类

public abstract class AsyncWork<T> {
    public abstract void start(Callback<T> callback);
}
public interface Callback<T> {
    void onResult(T result);

    void onError(Exception e);
}
public class ApiWrapper {
    private Api api=new ApiImpl();

    //包装异步操作
    public AsyncWork<List<News>> getNewsList(String tag) {
        return new AsyncWork<List<News>>() {
            @Override
            public void start(Callback<List<News>> callback) {
                api.getNewsList(tag, new Api.IGetNewsList() {
                    @Override
                    public void onSuccess(List<News> newsList) {
                        callback.onResult(newsList);
                    }

                    @Override
                    public void onFailure(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }

    public AsyncWork<Uri> save(News news) {
        return new AsyncWork<Uri>() {
            @Override
            public void start(Callback<Uri> callback) {
                api.save(news, new Api.ISave() {
                    @Override
                    public void onSuccess(Uri uri) {
                        callback.onResult(uri);
                    }

                    @Override
                    public void onFailure(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }

}

好了,辅助类已经搞定了,下面正式开始!

    //3.异步包装方式
    public AsyncWork<Uri> getLatestUriWrapper(String tag) {
        ApiWrapper apiWrapper = new ApiWrapper();//包装后的接口
        //先根据命令,得到异步操作,然后执行异步操作
        return new AsyncWork<Uri>() {
            @Override
            public void start(Callback<Uri> callback) {
                apiWrapper.getNewsList(tag)
                        .start(new Callback<List<News>>() {
                            @Override
                            public void onResult(List<News> newsList) {
                                LogUtil.print("获取新闻列表:" + newsList.toString());
                                News latestNews = getLatestNews(newsList);
                                LogUtil.print("获取最新的新闻:" + latestNews.toString());
                                apiWrapper.save(latestNews)
                                        .start(new Callback<Uri>() {
                                            @Override
                                            public void onResult(Uri result) {
                                                LogUtil.print("保存到本地:" + result.toString());
                                                callback.onResult(result);
                                            }

                                            @Override
                                            public void onError(Exception e) {
                                                callback.onError(e);
                                            }
                                        });
                            }

                            @Override
                            public void onError(Exception e) {
                                callback.onError(e);
                            }
                        });
            }
        };
    }

    private void testWrapper() {
        LogUtil.print("START-异步包装方式");
        AsyncWork<Uri> asyncWork = client.getLatestUriWrapper("Java");
        asyncWork.start(new Callback<Uri>() {
            @Override
            public void onResult(Uri result) {
                LogUtil.print("END-异步包装方式:" + result.toString());
            }

            @Override
            public void onError(Exception e) {

            }
        });
    }

技术分享

这里相比第2种异步方式,无非就是多了:
1. 组装:将所有的异步操作都使用异步模板类包装了一下
2. 卸装:AsyncWork.start即可执行异步任务,得到包装的数据

可是,这似乎并没有什么卵用啊,回调还是有两层嵌套,而且还多绕了几个弯,还不如以前好懂了呢,这不是吃饱了撑着么?别急,接下来见分晓!

4-异步拆分方式

    //4.异步包装方式-拆分
    public AsyncWork<Uri> getLatestUriSplit(String tag) {
        ApiWrapper apiWrapper = new ApiWrapper();//包装后的接口
        AsyncWork<List<News>> newsListWork = apiWrapper.getNewsList(tag);
        AsyncWork<News> latestNewsWork = new AsyncWork<News>() {
            @Override
            public void start(Callback<News> callback) {
                newsListWork.start(new Callback<List<News>>() {
                    @Override
                    public void onResult(List<News> result) {
                        LogUtil.print("获取新闻列表:" + result.toString());
                        News latestNews=getLatestNews(result);
                        LogUtil.print("获取最新的新闻:" + latestNews.toString());
                        callback.onResult(latestNews);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };

        AsyncWork<Uri> uriWork = new AsyncWork<Uri>() {
            @Override
            public void start(Callback<Uri> callback) {
                latestNewsWork.start(new Callback<News>() {
                    @Override
                    public void onResult(News cutest) {
                        apiWrapper.save(cutest)
                                .start(new Callback<Uri>() {
                                    @Override
                                    public void onResult(Uri result) {
                                        LogUtil.print("保存到本地:" + result.toString());
                                        callback.onResult(result);
                                    }

                                    @Override
                                    public void onError(Exception e) {
                                        callback.onError(e);
                                    }
                                });
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
        return uriWork;
    }

    private void testSplit() {
        LogUtil.print("START-异步拆分方式");
        AsyncWork<Uri> asyncWork = client.getLatestUriSplit("Java");
        asyncWork.start(new Callback<Uri>() {
            @Override
            public void onResult(Uri result) {
                LogUtil.print("END-异步拆分方式:" + result.toString());
            }

            @Override
            public void onError(Exception e) {

            }
        });
    }

技术分享
这里,我们将三步操作,分成了三个异步任务,开始有了一点链式调用的影子了,不过,在每个任务里面,还有着大量的业务无关的模板代码,显然还存在着很大的优化空间,是时候干掉这些模板代码了!

5-异步映射方式

首先需要拓展一些我们的辅助类:AsyncWork
给它添加了两个方法:map和flatMap
通过这两个方法,我们可以将AsyncWork < T > 转换成 AsyncWork< R >,实现任务的转换!
具体怎么用,等下看了测试类你就知道了!

public abstract class AsyncWork<T> {
    public abstract void start(Callback<T> callback);

    /**
     * map
     * 目标:AsyncWork<T> --> AsyncWork<R>
     * 要求:call中实现(T -> R)
     */
    public <R> AsyncWork<R> map(Func<T, R> func) {
        final AsyncWork<T> source = this;
        return new AsyncWork<R>() {
            @Override
            public void start(Callback<R> callback) {
                source.start(new Callback<T>() {
                    @Override
                    public void onResult(T result) {
                        R mapped = func.call(result);
                        callback.onResult(mapped);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }

    /**
     * flatMap
     * 目标:AsyncWork<T> -> AsyncWork<R>
     * 要求:call中实现(T -> AsyncWork<R>)
     */
    public <R> AsyncWork<R> flatMap(Func<T, AsyncWork<R>> func) {
        final AsyncWork<T> source = this;
        return new AsyncWork<R>() {
            @Override
            public void start(Callback<R> callback) {
                source.start(new Callback<T>() {
                    @Override
                    public void onResult(T result) {
                        AsyncWork<R> mapped = func.call(result);
                        mapped.start(new Callback<R>() {
                            @Override
                            public void onResult(R result) {
                                callback.onResult(result);
                            }

                            @Override
                            public void onError(Exception e) {
                                callback.onError(e);
                            }
                        });
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }
}

辅助类拓展好了,下面开始分析!参考上面的拆分方式,转换流程应该是:
1. 得到AsyncWork< List< News>>:新闻列表的任务包装类
2. 转换成AsyncWork< News>:最新新闻的任务包装类
3. 转换成AsyncWork< Uri>:最新新闻的URI的任务包装类

    //5.异步包装方式-映射
    public AsyncWork<Uri> getLatestUriMap(String tag) {
        ApiWrapper apiWrapper = new ApiWrapper();//包装后的接口
        AsyncWork<List<News>> newsListWork = apiWrapper.getNewsList(tag);
        //1.使用map
        //将AsyncWork<List<News>> 转 AsyncWork<News>
        //因为有接口:List<News> -> News
        AsyncWork<News> latestNewsWork = newsListWork.map(new Func<List<News>, News>() {
            @Override
            public News call(List<News> newsList) {
                LogUtil.print("获取新闻列表:" + newsList.toString());
                News latestNews = getLatestNews(newsList);
                LogUtil.print("获取最新的新闻:" + latestNews.toString());
                return latestNews;
            }
        });

//        //2.这里不能使用map了!!!
//        //因为没有接口:News -> Uri
//        //只有接口:News -> AsyncWork<Uri>
//        AsyncWork<Uri> uriWork = latestNewsWork.map(new Func<News, Uri>() {
//            @Override
//            public Uri call(News news) {
//                return apiWrapper.save(news);
//            }
//        });

        //3.使用flatMap
        //将AsyncWork<News> 转 AsyncWork<Uri>
        //只为有接口:News -> AsyncWork<Uri>
        AsyncWork<Uri> uriWork = latestNewsWork.flatMap(new Func<News, AsyncWork<Uri>>() {
            @Override
            public AsyncWork<Uri> call(News result) {
                LogUtil.print("保存到本地:" + result.toString());
                return apiWrapper.save(result);
            }
        });
        return uriWork;
    }

    private void testMap() {
        LogUtil.print("START-异步映射方式");
        AsyncWork<Uri> asyncWork = client.getLatestUriMap("Java");
        asyncWork.start(new Callback<Uri>() {
            @Override
            public void onResult(Uri result) {
                LogUtil.print("END-异步映射方式:" + result.toString());
            }

            @Override
            public void onError(Exception e) {

            }
        });
    }

现在我们再看,每个异步任务里面都没有嵌套的回调了,终于摆脱回调地狱了!
相信看了上面的代码之后,大家也都了解了map和flatMap的作用了:

想将AsyncWork< T>转成AsyncWork< R>:

  1. 如果已有方法:T -> R,则使用map
  2. 如果已有方法:T -> AsynWork,则使用flatMap

但是,现在的代码显然还是没有最初的同步方式简洁,所以,是时候玩Lambda表达式!

6-异步Lambda方式

Lambda表达式说明:

  1. Lambda表达式可以认为是匿名方法,左边是形参,右边是方法体,一般用于接口回调的实现
  2. 特别注意:这里接口中不要有相同入参、相同出参的方法,哪怕方法名不同也不行,因为Lambda表达式是在编译时自动根据入参和出参来寻找方法的
    //6.异步包装方式-Lambda
    public AsyncWork<Uri> getLatestUriLambda(String tag) {
        ApiWrapper apiWrapper = new ApiWrapper();//包装后的接口
        AsyncWork<List<News>> catsListAsyncWork = apiWrapper.getNewsList(tag);
        AsyncWork<News> cutestCatAsyncWork = catsListAsyncWork.map(cats -> getLatestNews(cats));
        AsyncWork<Uri> storedUriAsyncWork = cutestCatAsyncWork.flatMap(cat -> apiWrapper.save(cat));
        return storedUriAsyncWork;
    }

    private void testLambda() {
        LogUtil.print("START-异步Lambda方式");
        AsyncWork<Uri> asyncWork = client.getLatestUriLambda("Java");
        asyncWork.start(new Callback<Uri>() {
            @Override
            public void onResult(Uri result) {
                LogUtil.print("END-异步Lambda方式:" + result.toString());
            }

            @Override
            public void onError(Exception e) {

            }
        });
    }

技术分享

这里我就没打印中间步骤了,因为不忍心破坏Lambda的简洁的方法体,而又不知道log打印插到别的什么地方好,大家觉得在哪里打印中间步骤的数据好呢?

现在的异步方式,是不是和最开始的同步方式,看起来很相似了啊,这就是RxJava的好处,像写同步的代码一样去写异步的代码!好了,自定义的伪RxJava这里就完结了!

下面我们来看看真正的RxJava是怎么玩这个例子的吧!

7-目标:RxJava方式

同样的,首先定义一个辅助类:
1. ApiRx,将Api的接口转成Rx方式的接口

public class ApiRx {
    private Api api=new ApiImpl();

    public Observable<List<News>> queryCats(final String query) {
        return Observable.create(new Observable.OnSubscribe<List<News>>() {
            @Override
            public void call(final Subscriber<? super List<News>> subscriber) {
                api.getNewsList(query, new Api.IGetNewsList() {
                    @Override
                    public void onSuccess(List<News> aNewses) {
                        subscriber.onNext(aNewses);
                    }

                    @Override
                    public void onFailure(Exception e) {
                        subscriber.onError(e);
                    }
                });
            }
        });
    }

    public Observable<Uri> store(final News aNews) {
        return Observable.create(new Observable.OnSubscribe<Uri>() {
            @Override
            public void call(final Subscriber<? super Uri> subscriber) {
                api.save(aNews, new Api.ISave() {
                    @Override
                    public void onSuccess(Uri uri) {
                        subscriber.onNext(uri);
                    }

                    @Override
                    public void onFailure(Exception e) {
                        subscriber.onError(e);
                    }
                });
            }
        });
    }
}

辅助类搞定,下面正式开始!

    //7.目的地-RxJava方式
    public Observable<Uri> getLatestUriRxJava(String tag) {
        ApiRx apiRx=new ApiRx();//Rxjava的接口
        Observable<List<News>> catsListObservable = apiRx.queryCats(tag);
        Observable<News> cutestCatObservable = catsListObservable.map(new Func1<List<News>, News>() {
            @Override
            public News call(List<News> cats) {
                return getLatestNews(cats);
            }
        });
        Observable<Uri> storedUriObservable = cutestCatObservable.flatMap(new Func1<News, Observable<? extends Uri>>() {
            @Override
            public Observable<? extends Uri> call(News cat) {
                return apiRx.store(cat);
            }
        });
        return storedUriObservable;
    }

    private void testRxJava() {
        Observer<Uri> observer = new Observer<Uri>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(Uri uri) {
                LogUtil.print("END-RxJava方式:" + uri.toString());
            }
        };
        LogUtil.print("START-RxJava方式");
        Observable<Uri> uriObservable = client.getLatestUriRxJava("RxJava");
        uriObservable.subscribe(observer);
    }

技术分享

自定义RxJava

标签:

原文地址:http://blog.csdn.net/fisher0113/article/details/51843380

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!