RxBus升级篇

之前写了一篇用RxJava实现事件总线RxBus并实现同类型事件的区分用RxJava实现了事件总线RxBus,并实现了通过code进行相同事件类型的区分。但是发现使用起来还是不怎么方便,没有eventbus那样用起来方便。

我们来看一下之前是怎么用的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//注册观察者
Subscription subscription =RxBus.getDefault()
.toObservable(100,String.class)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("CM", s);
}
});

//发送事件或消息:
RxBus.getDefault().post(100, "123456");

//取消订阅时间
@Override
protected void onDestroy() {
super.onDestroy();
if(!subscription .isUnsubscribed()) {
subscription .unsubscribe();
}
}

确实没有EventBus用起来方便。于是对之前的RxBus又进行了改造,参考了EventBus的使用方法。

添加了注解类@Subscribe对方法进行注解:

1
2
3
4
5
6
7
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe {
int code() default -1;
ThreadMode threadMode() default ThreadMode.CURRENT_THREAD;
}

code进行相同事件的区分,threadMode用来设置事件处理所在的线程.

ThreadMode代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public enum  ThreadMode {

/**
* current thread
*/
CURRENT_THREAD,

/**
* android main thread
*/
MAIN,

/**
* new thread
*/
NEW_THREAD

}

另外添加了一个SubscriberMethod类,用于保存@Subscribe注解的方法的相关信息(参考EventBus的SubscriberMethod).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class SubscriberMethod {
public Method method;
public ThreadMode threadMode;
public Class<?> eventType;
public Object subscriber;
public int code;

public SubscriberMethod(Object subscriber, Method method, Class<?> eventType, int code,ThreadMode threadMode) {
this.method = method;
this.threadMode = threadMode;
this.eventType = eventType;
this.subscriber = subscriber;
this.code = code;
}

/**
* 调用方法
* @param o 参数
*/
public void invoke(Object o){
try {
method.invoke(subscriber, o);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}

}

接下来看看重点RxBus的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
package com.cm.rxbus;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/**
* RxBus
* Created by CM on 2016-4-22 19:30:48.
*/
public class RxBus {
private static volatile RxBus defaultInstance;

private Map<Class,List<Subscription>> subscriptionsByEventType = new HashMap<>();

private Map<Object,List<Class>> eventTypesBySubscriber = new HashMap<>();

private Map<Class,List<SubscriberMethod>> subscriberMethodByEventType = new HashMap<>();

// 主题
private final Subject bus;
// PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
// 单例RxBus
public static RxBus getDefault() {
RxBus rxBus = defaultInstance;
if (defaultInstance == null) {
synchronized (RxBus.class) {
rxBus = defaultInstance;
if (defaultInstance == null) {
rxBus = new RxBus();
defaultInstance = rxBus;
}
}
}
return rxBus;
}

/**
* 提供了一个新的事件,单一类型
* @param o 事件数据
*/
public void post (Object o) {
bus.onNext(o);
}

/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
* @param eventType 事件类型
* @param <T>
* @return
*/
public <T> Observable<T> toObservable(Class<T> eventType) {
return bus.ofType(eventType);
}

/**
* 提供了一个新的事件,根据code进行分发
* @param code 事件code
* @param o
*/
public void post(int code, Object o){
bus.onNext(new Message(code,o));

}

/**
* 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
* @param code 事件code
* @param eventType 事件类型
* @param <T>
* @return
*/
public <T> Observable<T> toObservable(final int code, final Class<T> eventType) {
return bus.ofType(Message.class)
.filter(new Func1<Message,Boolean>() {
@Override
public Boolean call(Message o) {
//过滤code和eventType都相同的事件
return o.getCode() == code &amp;&amp; eventType.isInstance(o.getObject());
}
}).map(new Func1<Message,Object>() {
@Override
public Object call(Message o) {
return o.getObject();
}
}).cast(eventType);
}

/**
* 注册
* @param subscriber 订阅者
*/
public void register(Object subscriber){
Class<?> subClass = subscriber.getClass();
Method[] methods = subClass.getDeclaredMethods();
for(Method method : methods){
if(method.isAnnotationPresent(Subscribe.class)){
//获得参数类型
Class[] parameterType = method.getParameterTypes();
//参数不为空 且参数个数为1
if(parameterType != null &amp;&amp; parameterType.length == 1){

Class eventType = parameterType[0];

addEventTypeToMap(subscriber,eventType);
Subscribe sub = method.getAnnotation(Subscribe.class);
int code = sub.code();
ThreadMode threadMode = sub.threadMode();

SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber,method,eventType, code,threadMode);
addSubscriberToMap(eventType, subscriberMethod);

addSubscriber(subscriberMethod);
}
}
}
}

/**
* 将event的类型以订阅中subscriber为key保存到map里
* @param subscriber 订阅者
* @param eventType event类型
*/
private void addEventTypeToMap(Object subscriber, Class eventType){
List<Class> eventTypes = eventTypesBySubscriber.get(subscriber);
if(eventTypes == null){
eventTypes = new ArrayList<>();
eventTypesBySubscriber.put(subscriber,eventTypes);
}

if(!eventTypes.contains(eventType)){
eventTypes.add(eventType);
}
}

/**
* 将注解方法信息以event类型为key保存到map中
* @param eventType event类型
* @param subscriberMethod 注解方法信息
*/
private void addSubscriberToMap(Class eventType, SubscriberMethod subscriberMethod){
List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType);
if(subscriberMethods == null){
subscriberMethods = new ArrayList<>();
subscriberMethodByEventType.put(eventType,subscriberMethods);
}

if(!subscriberMethods.contains(subscriberMethod)){
subscriberMethods.add(subscriberMethod);
}
}

/**
*将订阅事件以event类型为key保存到map,用于取消订阅时用
* @param eventType event类型
* @param subscription 订阅事件
*/
private void addSubscriptionToMap(Class eventType, Subscription subscription){
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if(subscriptions == null){
subscriptions = new ArrayList<>();
subscriptionsByEventType.put(eventType,subscriptions);
}

if(!subscriptions.contains(subscription)){
subscriptions.add(subscription);
}
}

/**
* 用RxJava添加订阅者
* @param subscriberMethod
*/
public void addSubscriber(final SubscriberMethod subscriberMethod){
Observable observable ;
if(subscriberMethod.code == -1){
observable = toObservable(subscriberMethod.eventType);
}else{
observable = toObservable(subscriberMethod.code,subscriberMethod.eventType);
}

Subscription subscription = postToObservable(observable,subscriberMethod)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
callEvent(subscriberMethod.code,o);
}
});
addSubscriptionToMap(subscriberMethod.eventType ,subscription);
}

/**
* 用于处理订阅事件在那个线程中执行
* @param observable
* @param subscriberMethod
* @return
*/
private Observable postToObservable(Observable observable, SubscriberMethod subscriberMethod) {

switch (subscriberMethod.threadMode) {
case MAIN:
observable.observeOn(AndroidSchedulers.mainThread());
break;

case NEW_THREAD:
observable.observeOn(Schedulers.newThread());
break;
case CURRENT_THREAD:
observable.observeOn(Schedulers.immediate());
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
}
return observable;
}

/**
* 回调到订阅者的方法中
* @param code code
* @param object obj
*/
private void callEvent(int code, Object object){
Class eventClass = object.getClass();
List<SubscriberMethod> methods = subscriberMethodByEventType.get(eventClass);
if(methods != null &amp;&amp; methods.size() > 0){
for(SubscriberMethod subscriberMethod : methods){

Subscribe sub = subscriberMethod.method.getAnnotation(Subscribe.class);
int c = sub.code();
if(c == code){
subscriberMethod.invoke(object);
}

}
}
}

/**
* 取消注册
* @param subscriber
*/
public void unRegister(Object subscriber){
List<Class> subscribedTypes = eventTypesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unSubscribeByEventType(eventType);
unSubscribeMethodByEventType(subscriber,eventType);
}
eventTypesBySubscriber.remove(subscriber);
}
}

/**
* subscriptions unsubscribe
* @param eventType
*/
private void unSubscribeByEventType(Class eventType){
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
Iterator<Subscription> iterator = subscriptions.iterator();
while(iterator.hasNext()){
Subscription subscription = iterator.next();
if(subscription !=null &amp;&amp; !subscription.isUnsubscribed()){
subscription.unsubscribe();
iterator.remove();
}
}
}
}

/**
* 移除subscriber对应的subscriberMethods
* @param subscriber
* @param eventType
*/
private void unSubscribeMethodByEventType(Object subscriber, Class eventType){
List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType);
if(subscriberMethods != null){
Iterator<SubscriberMethod> iterator = subscriberMethods.iterator();
while (iterator.hasNext()){
SubscriberMethod subscriberMethod = iterator.next();
if(subscriberMethod.subscriber.equals(subscriber)){
iterator.remove();
}
}
}
}

}

register的时候获取@Subscribe注解的方法的相关信息保存到map,post事件触发的时候调用@Subscribe注解的方法并传入参数.

unRegister的移除保存的subscriber、subscriberMethod已经Subscription取消订阅事件。

再来看使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.cm.simple;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;

import com.cm.rxbus.RxBus;
import com.cm.rxbus.Subscribe;
import com.cm.rxbus.ThreadMode;

import butterknife.ButterKnife;
import butterknife.OnClick;

public class MainActivity extends AppCompatActivity {

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
ButterKnife.bind(this);
RxBus.getDefault().register(this);
}

@Subscribe
public void onEvent(String s) {
Log.e("CMAD", "------>" + s);
}

@Subscribe
public void onEvent(EventA eventA) {
Log.e("CMAD", "---onEvent EventA-->" + eventA.text);
}

@Subscribe(code = 102)
public void onEventWithCode(EventA eventA) {
Log.e("CMAD", "---onEvent EventA 102-->" + eventA.text);
}

@Subscribe(code = 103, threadMode = ThreadMode.MAIN)
public void onEventWithCodeAndThreadMode(EventA eventA) {
Log.e("CMAD", "---onEvent EventB 103--->" + eventA.text);
}

@Override
protected void onDestroy() {
super.onDestroy();
RxBus.getDefault().unRegister(this);
}

@OnClick({R.id.btn1, R.id.btn2, R.id.btn3, R.id.btn4})
public void onClick(View view) {
switch (view.getId()) {
case R.id.btn1:
RxBus.getDefault().post("123456");
break;
case R.id.btn2:
RxBus.getDefault().post(new EventA());
break;
case R.id.btn3:
RxBus.getDefault().post(102, new EventA("event code 102"));
break;
case R.id.btn4:
RxBus.getDefault().post(103, new EventA("event code 103"));
break;
}
}

class EventA {
public String text = "BeanA";

public EventA() {

}

public EventA(String text) {
this.text = text;
}

@Override
public String toString() {
return "EventA{" +
"text='" + text + ''' +
'}';
}
}
}

使用方法是不是跟EventBus很像,同样使用@Subscribe注解,threadMode设置在什么线程执行.并添加了code进行相同类型事件的区分.

整个工程可以移步GitHub查看:RxBus

项目已上传到jcenter,在Gradle中可以直接使用compile 'com.cm:rxbus:1.0'

坚持原创技术分享,您的支持将鼓励我继续创作!