用RxJava实现事件总线RxBus并实现同类型事件的区分

用RxJava实现事件总线RxBus并实现同类型事件的区分

作者:cmad 时间:2016-04-25 分类:Android 评论:2 条 浏览:1046

  之前事件总线都是用EventBus或者otto来做,现在RxJava越来越火了,用RxJava实现事件总线也是很方便的.不多说直接看代码:

/**
* RxBus
*/

public class RxBus {
private static volatile RxBus defaultInstance;
// 主题
private final Subject bus;
// PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
// 单例RxBus
public static RxBus getDefault() {
RxBus rxBus = defaultInstance;
if (defaultInstance == null) {
synchronized (RxBus.class) {
rxBus = defaultInstance;
if (defaultInstance == null) {
rxBus = new RxBus();
defaultInstance = rxBus;
}
}
}
return rxBus;
}
// 提供了一个新的事件
public void post (Object o) {
bus.onNext(o);
}
// 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
public <T> Observable<T> toObserverable (Class<T> eventType) {
return bus.ofType(eventType);
}
}

使用方法:

// rxSubscription是一个Subscription的全局变量,这段代码可以在onCreate/onStart等生命周期内
rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class)
.subscribe(new Action1<UserEvent>() {
@Override
public void call(UserEvent userEvent) {
long id = userEvent.getId();
String name = userEvent.getName();
...
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// TODO: 处理异常
}
});

上面RxBus实现代码引用自:

用RxJava实现事件总线(Event Bus) 更详细信息可以点击查看

  发现上面RxJava实现的RxBus确实很方便,代码也挺简洁.但是有个问题是我在使用EventBus以及otto的时候一直存在的疑问,就是当发送相同类型的事件或者消息的时候接收的时候怎么去区分?

  基于这个问题我对上面的RxBus进行了改造.代码如下:

package com.cm.rxbus;

import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/**
* RxBus
* Created by CM on 2016-4-22 19:30:48.
*/

public class RxBus {
private static volatile RxBus defaultInstance;
// 主题
private final Subject bus;
// PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
// 单例RxBus
public static RxBus getDefault() {
RxBus rxBus = defaultInstance;
if (defaultInstance == null) {
synchronized (RxBus.class) {
rxBus = defaultInstance;
if (defaultInstance == null) {
rxBus = new RxBus();
defaultInstance = rxBus;
}
}
}
return rxBus;
}

/**
* 提供了一个新的事件,单一类型
* @param o 事件数据
*/

public void post (Object o) {
bus.onNext(o);
}

/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
* @param eventType 事件类型
* @param <T>
* @return
*/

public <T> Observable<T> toObservable(Class<T> eventType) {
return bus.ofType(eventType);
}

/**
* 提供了一个新的事件,根据code进行分发
* @param code 事件code
* @param o
*/

public void post(int code, Object o){
bus.onNext(new Message(code,o));

}


/**
* 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
* @param code 事件code
* @param eventType 事件类型
* @param <T>
* @return
*/

public <T> Observable<T> toObservable(final int code, final Class<T> eventType) {
return bus.ofType(Message.class)
.filter(new Func1<Message,Boolean>() {
@Override
public Boolean call(Message o) {
//过滤code和eventType都相同的事件
return o.getCode() == code && eventType.isInstance(o.getObject());
}
}).map(new Func1<Message,Object>() {
@Override
public Object call(Message o) {
return o.getObject();
}
}).cast(eventType);
}
}

Message代码:

package com.cm.rxbus;

/**
* Created by CM on 2016/4/22.
*/

public class Message {
private int code;
private Object object;

public Message() {}

public Message(int code, Object o) {
this.code = code;
this.object = o;
}
//getter and setter
}

  RxBus里添加了一个post(int code, Object o)方法,里面调用了bus.onNext(new Message(code,o));将code跟object用Message类进行了封装.

  toObservable(final int code, final Class<T> eventType)对传入code的事件进行分发,先调用bus.ofType(Message.class)返回Message类的观察者,然后通过filter操作符返回Message里code跟Object类型跟传入的类型都匹配的观察者,再通过map操作符返回Message里的object对象,最后通过cast转化为特定类的观察者.

使用方法:

//注册观察者
Subscription subscription =RxBus.getDefault()
.toObservable(100,String.class)
.subscribe(new Action1<String>() {
@Override
public void call(String s)
{
Log.e("CM", s);
}
});


//发送事件或消息:
RxBus.getDefault().post(100, "123456");

这样如果有多个消息是相同类型的话就可以通过不同的code进行区分了.

最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

@Override
protected void onDestroy() {
super.onDestroy();
if(!subscription .isUnsubscribed()) {
subscription .unsubscribe();
}
}


标签:

相关推荐
更多

  1. 我大叔 2016-4-25 15:23 #1 回复TA

    不多说那你还说了这么多!

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>