EventBus3.0源码分析(一)--订阅流程

前言

在上一篇博客中介绍了EventBus3.0的基本使用,感觉使用起来非常方便,本篇我们通过源码去了解一下EventBus的工作流程,就按照我们的使用过程来,先来看订阅流程。

EventBus#getDefault()

在使用过程中,首先我们要获取到EventBus的实例,点击该方法进入源码:

public static EventBus getDefault() {
    if (defaultInstance == null) {
        synchronized (EventBus.class) {
            if (defaultInstance == null) {
                defaultInstance = new EventBus();
            }
        }
    }
    return defaultInstance;
}

public EventBus() {
    this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
    subscriptionsByEventType = new HashMap<Class<?>, CopyOnWriteArrayList<Subscription>>();
    // 此处省略
    ...
}

在这里通过单例模式获取到一个EventBus的实例,通过getDefault()方法获取到的实例是默认的配置,这里需要注意的一点是,不同的EventBus的实例对象中的数据是不能共享的,通过一个EventBus的实例发布的事件,只能由同一个EventBus实例对象订阅该事件时,才能接收到该事件。

EventBus#register()

public void register(Object subscriber) {
    // 获取订阅者类型
    Class<?> subscriberClass = subscriber.getClass();
    // @Subscribe in anonymous classes is invisible to annotation processing, always fall back to reflection
    // 判断该类是否为匿名类
    boolean forceReflection = subscriberClass.isAnonymousClass();
    // 获取到该订阅者的全部响应函数
    List<SubscriberMethod> subscriberMethods =
            subscriberMethodFinder.findSubscriberMethods(subscriberClass, forceReflection);
    for (SubscriberMethod subscriberMethod : subscriberMethods) {
        subscribe(subscriber, subscriberMethod);
    }
}

获取到EventBus实例之后,我们需要注册订阅,以上代码中,首先我们获取到订阅者类的类型,判断该类是否为匿名类,获取该订阅类的所有响应函数,即我们使用注解@Subscribe定义的接收事件的方法(如果是匿名类的话,需要使用反射去获取该类中的响应函数),然后再调用subscribe()。

SubscriberMethodFinder#findSubscriberMethods()

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass, boolean forceReflection) {
    String key = subscriberClass.getName();
    List<SubscriberMethod> subscriberMethods;
    synchronized (METHOD_CACHE) {
        subscriberMethods = METHOD_CACHE.get(key);
    }
    if (subscriberMethods != null) {
        return subscriberMethods;
    }
    if (INDEX != null && !forceReflection) {
        subscriberMethods = findSubscriberMethodsWithIndex(subscriberClass);
        if (subscriberMethods.isEmpty()) {
            subscriberMethods = findSubscriberMethodsWithReflection(subscriberClass);
        }
    } else {
        subscriberMethods = findSubscriberMethodsWithReflection(subscriberClass);
    }
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
             + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        synchronized (METHOD_CACHE) {
            METHOD_CACHE.put(key, subscriberMethods);
        }
        return subscriberMethods;
    }
}

该方法用于获取订阅者中的所有响应事件方法,该方法中首先在Cache中去查找,如果缓存中有则直接获取,如果没有,继续往下查找。

如果索引不为空并且该类不是匿名类,则调用findSubscriberMethodsWithIndex()方法去获取响应函数,否则调用findSubscriberMethodsWithReflection()方法去查找。

最终如果没有查找到对应的方法,则抛出异常,该类及其父类没有通过@Subscribe定义的public方法。

最终将查找到的方法集合放入缓存中。

SubscriberMethodFinder#findSubscriberMethodsWithIndex()

private List<SubscriberMethod> findSubscriberMethodsWithIndex(Class<?> subscriberClass) {
    Class<?> clazz = subscriberClass;
    while (clazz != null) {
        SubscriberMethod[] array = INDEX.getSubscribersFor(clazz);
        if (array != null && array.length > 0) {
            List<SubscriberMethod> subscriberMethods = new ArrayList<SubscriberMethod>();
            for (SubscriberMethod subscriberMethod : array) {
                subscriberMethods.add(subscriberMethod);
            }
            return subscriberMethods;
        } else {
            String name = clazz.getName();
            if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) {
                // Skip system classes, this just degrades performance
                break;
            }
            clazz = clazz.getSuperclass();
        }
    }
    return Collections.EMPTY_LIST;
}

通过索引找到该订阅者类的响应函数放入一个数组中,如果该数组不为空,即该订阅者类中声明了响应事件函数,遍历该数组,将每个订阅方法放入订阅方法的集合中并返回。

SubscriberMethodFinder#findSubscriberMethodsWithReflection()

这个方法比较长,我们将其截取一段一段来看。

private List<SubscriberMethod> findSubscriberMethodsWithReflection(Class<?> subscriberClass) {
     ...
    while (clazz != null) {
        String name = clazz.getName();
        // 判断该类是否为public等
        ...         
        Method[] methods = clazz.getDeclaredMethods();
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            ...
        }
        clazz = clazz.getSuperclass();            
    }
    return subscriberMethods;
}

首先根据该类的类名获取到该类的所有方法,包括该类的public、protected、default (package) access还有private方法,但不包括继承的父类方法,然后遍历这些方法。

if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
    Class<?>[] parameterTypes = method.getParameterTypes();
    // 获取该类的参数信息
    ...
} else if (strictMethodVerification) {
    if (method.isAnnotationPresent(Subscribe.class)) {
        String methodName = name + "." + method.getName();
        throw new EventBusException(methodName + " is a illegal  @Subscribe method: must be public, non-static, and non- abstract");
    }
}

获取到该类的标识,判断是否为public方法,并且该类不是static、abstract方法,然后获取该方法中的参数信息,最终添加到响应方法集合中。否则抛出异常,该方法必须是public、并且不是static和abstract。

if (parameterTypes.length == 1) {
    Subscribe subscribeAnnotation = method.getAnnotatio(Subscribe.class);
    if (subscribeAnnotation != null) {
        String methodName = method.getName();
        Class<?> eventType = parameterTypes[0];
        methodKeyBuilder.setLength(0);
        methodKeyBuilder.append(methodName);
        methodKeyBuilder.append('>').append(eventType.getName());

        String methodKey = methodKeyBuilder.toString();
        if (eventTypesFound.add(methodKey)) {
            // Only add if not already found in a sub class
            ThreadMode threadMode = subscribeAnnotation.threadMode();
            subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,subscribeAnnotation.priority(),subscribeAnnotation.sticky()));
        }
    }
} else if (strictMethodVerification) {
    if (method.isAnnotationPresent(Subscribe.class)) {
        String methodName = name + "." + method.getName();
        throw new EventBusException("@Subscribe method " + methodName +  "must have exactly 1 parameter but has " +  parameterTypes.length);
    }
}

获取到通过注解@Subscribe定义的方法,然后获取该方法的方法名、参数,并将该方法定义为字符串”方法名 > 参数类名”保存。

接着获取该方法的线程模型,将该方法封装为SubscriberMethod添加到响应函数集合中,通过上面的代码可以看出,SubscriberMethod类对象中包括:

  • 方法
  • 事件类型(即接收什么事件对象)
  • 线程模型
  • 优先级
  • 是否为粘性事件。

如果该函数中没有参数,则会抛出异常,@Subscribe定义的方法必须有一个参数。

EventBus#subscribe()

这个方法也比较长,我们还是将其分成三部分来看。

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<Subscription>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
        }
    }

    // Got to synchronize to avoid shifted positions when adding/removing concurrently
    synchronized (subscriptions) {
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
    }

    ...
}

首先通过subscriptionsByEventType获取该事件订阅者集合,将通过register()方法订阅的订阅者封装成Subscription对象,添加到subscriptionsByEventType中,如果该集合为空,则创建一个,如果该集合中已经包含该订阅者,则抛出异常,不能重复订阅。

然后将新的订阅者按照优先级插入到订阅者集合中。

List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<Class<?>>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
subscribedEvents.add(eventType);

第二步根据订阅者获取到该订阅者订阅的事件类型的集合,如果事件集合为空,则创建一个,并将新订阅的事件类型加入,如果不为空,则直接加入。

if (subscriberMethod.sticky) {
    if (eventInheritance) {
        Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
        for (Map.Entry<Class<?>, Object> entry : entries) {
            Class<?> candidateEventType = entry.getKey();
            if (eventType.isAssignableFrom(candidateEventType)) {
                Object stickyEvent = entry.getValue();
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    } else {
        Object stickyEvent = stickyEvents.get(eventType);
        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
    }
}

最后判断该事件是否为粘性事件,如果是粘性事件,则从stickyEvents事件保存队列中取出该事件类型的最后一个事件发送给当前订阅者。这里我们在前面博客中提到粘性事件中只能接收到最后一条发布的事件。

EventBus#checkPostStickyEventToSubscription()

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
    if (stickyEvent != null) {
        // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
        // --> Strange corner case, which we don't take care of here.
        postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());
    }
}

该方法中判断如果stickyEvent队列不为空,则调用postToSubscription()该方法去发送事件。将粘性事件的最后一条发送给订阅者。这样就保证在粘性事件的情况下,该订阅者即使在事件发送之后再订阅的,也能收到最后一条事件。

EventBus#postToSubscription()

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case PostThread:
            invokeSubscriber(subscription, event);
            break;
        case MainThread:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case BackgroundThread:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case Async:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

从stickyEvents中取出Event,根据他的ThreadMode去决定直接反射调用方法,还是将其插入队列。

EventBus#invokeSubscriber()

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

在这个方法中,直接反射调用订阅方法来发送事件。

HandlerPoster#enqueue()

void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);
        if (!handlerActive) {
            handlerActive = true;
            if (!sendMessage(obtainMessage())) {
                throw new EventBusException("Could not send handler message");
            }
        }
    }
}

@Override
public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
        long started = SystemClock.uptimeMillis();
        while (true) {
            PendingPost pendingPost = queue.poll();

            ...

            eventBus.invokeSubscriber(pendingPost);

            ...

        }
    } finally {
        handlerActive = rescheduled;
    }
}

利用HandlerPoster的enqueue()方法,插入事件,在HandlerPoster#handleMessage()方法中,会去调用EventBus的invokeSubscriber()方法,如下:

void invokeSubscriber(PendingPost pendingPost) {
    Object event = pendingPost.event;
    Subscription subscription = pendingPost.subscription;
    PendingPost.releasePendingPost(pendingPost);
    if (subscription.active) {
        invokeSubscriber(subscription, event);
    }
}

到此为止,就完成了订阅,接收事件,这个订阅流程就算基本完事了,但是还有一点需要注意的,在销毁该类时,记得取消注册,如下:

EventBus#unregister()

public synchronized void unregister(Object subscriber) {
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {
            unubscribeByEventType(subscriber, eventType);
        }
        typesBySubscriber.remove(subscriber);
    } else {
        Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}

private void unubscribeByEventType(Object subscriber, Class<?> eventType) {
    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions != null) {
        // Got to synchronize to avoid shifted positions when adding/removing concurrently
        synchronized (subscriptions) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }
}

这俩方法内容比较简单,就不再介绍,到此为止,整个订阅流程就算结束了。