RxBus升级篇

RxBus升级篇

作者:cmad 时间:2016-04-26 分类:Android 评论:3 条 浏览:808

之前写了一篇用RxJava实现事件总线RxBus并实现同类型事件的区分用RxJava实现了事件总线RxBus,并实现了通过code进行相同事件类型的区分。但是发现使用起来还是不怎么方便,没有eventbus那样用起来方便。
我们来看一下之前是怎么用的:

//注册观察者
Subscription subscription =RxBus.getDefault()
.toObservable(100,String.class)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("CM", s);
}
});


//发送事件或消息:
RxBus.getDefault().post(100, "123456");

//取消订阅时间
@Override
protected void onDestroy() {
super.onDestroy();
if(!subscription .isUnsubscribed()) {
subscription .unsubscribe();
}
}

确实没有EventBus用起来方便。于是对之前的RxBus又进行了改造,参考了EventBus的使用方法。
添加了注解类@Subscribe对方法进行注解:

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe {
int code() default -1;
ThreadMode threadMode() default ThreadMode.CURRENT_THREAD;
}

code进行相同事件的区分,threadMode用来设置事件处理所在的线程.
ThreadMode代码:

public enum  ThreadMode {

/**
* current thread
*/
CURRENT_THREAD,

/
**
* android main thread
*/
MAIN,


/
**
* new thread
*/
NEW_THREAD

}

另外添加了一个SubscriberMethod类,用于保存@Subscribe注解的方法的相关信息(参考EventBus的SubscriberMethod).

public class SubscriberMethod {
public Method method;
public ThreadMode threadMode;
public Class<?> eventType;
public Object subscriber;
public int code;

public SubscriberMethod(Object subscriber, Method method, Class<?> eventType, int code,ThreadMode threadMode) {
this.method = method;
this.threadMode = threadMode;
this.eventType = eventType;
this.subscriber = subscriber;
this.code = code;
}


/**
* 调用方法
* @param o 参数
*/
public void invoke(Object o){
try {
method.invoke(subscriber, o);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}

}

接下来看看重点RxBus的代码:

package com.cm.rxbus;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/**
* RxBus
* Created by CM on 2016-4-22 19:30:48.
*/

public class RxBus {
private static volatile RxBus defaultInstance;

private Map<Class,List<Subscription>> subscriptionsByEventType = new HashMap<>();


private Map<Object,List<Class>> eventTypesBySubscriber = new HashMap<>();


private Map<Class,List<SubscriberMethod>> subscriberMethodByEventType = new HashMap<>();

// 主题
private final Subject bus;
// PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
// 单例RxBus
public static RxBus getDefault() {
RxBus rxBus = defaultInstance;
if (defaultInstance == null) {
synchronized (RxBus.class) {
rxBus = defaultInstance;
if (defaultInstance == null) {
rxBus = new RxBus();
defaultInstance = rxBus;
}
}
}
return rxBus;
}

/**
* 提供了一个新的事件,单一类型
* @param o 事件数据
*/

public void post (Object o) {
bus.onNext(o);
}

/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
* @param eventType 事件类型
* @param <T>
* @return
*/

public <T> Observable<T> toObservable(Class<T> eventType) {
return bus.ofType(eventType);
}

/**
* 提供了一个新的事件,根据code进行分发
* @param code 事件code
* @param o
*/

public void post(int code, Object o){
bus.onNext(new Message(code,o));

}


/**
* 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
* @param code 事件code
* @param eventType 事件类型
* @param <T>
* @return
*/

public <T> Observable<T> toObservable(final int code, final Class<T> eventType) {
return bus.ofType(Message.class)
.filter(new Func1<Message,Boolean>() {
@Override
public Boolean call(Message o) {
//过滤code和eventType都相同的事件
return o.getCode() == code && eventType.isInstance(o.getObject());
}
}).map(new Func1<Message,Object>() {
@Override
public Object call(Message o) {
return o.getObject();
}
}).cast(eventType);
}


/**
* 注册
* @param subscriber 订阅者
*/

public void register(Object subscriber){
Class<?> subClass = subscriber.getClass();
Method[] methods = subClass.getDeclaredMethods();
for(Method method : methods){
if(method.isAnnotationPresent(Subscribe.class)){
//获得参数类型
Class[] parameterType = method.getParameterTypes();
//参数不为空 且参数个数为1
if(parameterType != null && parameterType.length == 1){

Class eventType = parameterType[0];

addEventTypeToMap(subscriber,eventType);
Subscribe sub = method.getAnnotation(Subscribe.class);
int code = sub.code();
ThreadMode threadMode = sub.threadMode();

SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber,method,eventType, code,threadMode);
addSubscriberToMap(eventType, subscriberMethod);

addSubscriber(subscriberMethod);
}
}
}
}


/**
* 将event的类型以订阅中subscriber为key保存到map里
* @param subscriber 订阅者
* @param eventType event类型
*/

private void addEventTypeToMap(Object subscriber, Class eventType){
List<Class> eventTypes = eventTypesBySubscriber.get(subscriber);
if(eventTypes == null){
eventTypes = new ArrayList<>();
eventTypesBySubscriber.put(subscriber,eventTypes);
}

if(!eventTypes.contains(eventType)){
eventTypes.add(eventType);
}
}

/**
* 将注解方法信息以event类型为key保存到map中
* @param eventType event类型
* @param subscriberMethod 注解方法信息
*/

private void addSubscriberToMap(Class eventType, SubscriberMethod subscriberMethod){
List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType);
if(subscriberMethods == null){
subscriberMethods = new ArrayList<>();
subscriberMethodByEventType.put(eventType,subscriberMethods);
}

if(!subscriberMethods.contains(subscriberMethod)){
subscriberMethods.add(subscriberMethod);
}
}


/**
*将订阅事件以event类型为key保存到map,用于取消订阅时用
* @param eventType event类型
* @param subscription 订阅事件
*/

private void addSubscriptionToMap(Class eventType, Subscription subscription){
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if(subscriptions == null){
subscriptions = new ArrayList<>();
subscriptionsByEventType.put(eventType,subscriptions);
}

if(!subscriptions.contains(subscription)){
subscriptions.add(subscription);
}
}


/**
* 用RxJava添加订阅者
* @param subscriberMethod
*/

public void addSubscriber(final SubscriberMethod subscriberMethod){
Observable observable ;
if(subscriberMethod.code == -1){
observable = toObservable(subscriberMethod.eventType);
}else{
observable = toObservable(subscriberMethod.code,subscriberMethod.eventType);
}

Subscription subscription = postToObservable(observable,subscriberMethod)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
callEvent(subscriberMethod.code,o);
}
});
addSubscriptionToMap(subscriberMethod.eventType ,subscription);
}


/**
* 用于处理订阅事件在那个线程中执行
* @param observable
* @param subscriberMethod
* @return
*/

private Observable postToObservable(Observable observable, SubscriberMethod subscriberMethod) {

switch (subscriberMethod.threadMode) {
case MAIN:
observable.observeOn(AndroidSchedulers.mainThread());
break;

case NEW_THREAD:
observable.observeOn(Schedulers.newThread());
break;
case CURRENT_THREAD:
observable.observeOn(Schedulers.immediate());
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
}
return observable;
}


/**
* 回调到订阅者的方法中
* @param code code
* @param object obj
*/

private void callEvent(int code, Object object){
Class eventClass = object.getClass();
List<SubscriberMethod> methods = subscriberMethodByEventType.get(eventClass);
if(methods != null && methods.size() > 0){
for(SubscriberMethod subscriberMethod : methods){

Subscribe sub = subscriberMethod.method.getAnnotation(Subscribe.class);
int c = sub.code();
if(c == code){
subscriberMethod.invoke(object);
}

}
}
}


/**
* 取消注册
* @param subscriber
*/

public void unRegister(Object subscriber){
List<Class> subscribedTypes = eventTypesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unSubscribeByEventType(eventType);
unSubscribeMethodByEventType(subscriber,eventType);
}
eventTypesBySubscriber.remove(subscriber);
}
}


/**
* subscriptions unsubscribe
* @param eventType
*/

private void unSubscribeByEventType(Class eventType){
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
Iterator<Subscription> iterator = subscriptions.iterator();
while(iterator.hasNext()){
Subscription subscription = iterator.next();
if(subscription !=null && !subscription.isUnsubscribed()){
subscription.unsubscribe();
iterator.remove();
}
}
}
}

/**
* 移除subscriber对应的subscriberMethods
* @param subscriber
* @param eventType
*/

private void unSubscribeMethodByEventType(Object subscriber, Class eventType){
List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType);
if(subscriberMethods != null){
Iterator<SubscriberMethod> iterator = subscriberMethods.iterator();
while (iterator.hasNext()){
SubscriberMethod subscriberMethod = iterator.next();
if(subscriberMethod.subscriber.equals(subscriber)){
iterator.remove();
}
}
}
}

}

register的时候获取@Subscribe注解的方法的相关信息保存到map,post事件触发的时候调用@Subscribe注解的方法并传入参数.
unRegister的移除保存的subscriber、subscriberMethod已经Subscription取消订阅事件。

再来看使用方法:

package com.cm.simple;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;

import com.cm.rxbus.RxBus;
import com.cm.rxbus.Subscribe;
import com.cm.rxbus.ThreadMode;

import butterknife.ButterKnife;
import butterknife.OnClick;

public class MainActivity extends AppCompatActivity {


@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
ButterKnife.bind(this);
RxBus.getDefault().register(this);
}


@Subscribe
public void onEvent(String s) {
Log.e("CMAD", "------>" + s);
}

@Subscribe
public void onEvent(EventA eventA) {
Log.e("CMAD", "---onEvent EventA-->" + eventA.text);
}

@Subscribe(code = 102)
public void onEventWithCode(EventA eventA) {
Log.e("CMAD", "---onEvent EventA 102-->" + eventA.text);
}

@Subscribe(code = 103, threadMode = ThreadMode.MAIN)
public void onEventWithCodeAndThreadMode(EventA eventA) {
Log.e("CMAD", "---onEvent EventB 103--->" + eventA.text);
}


@Override
protected void onDestroy() {
super.onDestroy();
RxBus.getDefault().unRegister(this);
}


@OnClick({R.id.btn1, R.id.btn2, R.id.btn3, R.id.btn4})
public void onClick(View view) {
switch (view.getId()) {
case R.id.btn1:
RxBus.getDefault().post("123456");
break;
case R.id.btn2:
RxBus.getDefault().post(new EventA());
break;
case R.id.btn3:
RxBus.getDefault().post(102, new EventA("event code 102"));
break;
case R.id.btn4:
RxBus.getDefault().post(103, new EventA("event code 103"));
break;
}
}


class EventA {
public String text = "BeanA";

public EventA() {

}

public EventA(String text) {
this.text = text;
}

@Override
public String toString() {
return "EventA{" +
"text='" + text + ''' +
'}';
}
}
}

使用方法是不是跟EventBus很像,同样使用@Subscribe注解,threadMode设置在什么线程执行.并添加了code进行相同类型事件的区分.

整个工程可以移步GitHub查看:RxBus
项目已上传到jcenter,在Gradle中可以直接使用compile 'com.cm:rxbus:1.0'

标签:

相关推荐
更多

  1. 陈捷 2016-7-15 17:33 #1 回复TA

    当RxBus.getDefault().register(this); 多个的时候,分发的时候是多次的,这样是不大合理的

  2. 陈捷 2016-7-15 17:39 #2 回复TA

    @Retention(RetentionPolicy.CLASS) 编译时注解会不会比较好

    • 感谢你提出的意见。最近太忙等抽出时间再优化一下

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>