package com.threshold.rxbus2;

import com.jakewharton.rxrelay2.PublishRelay;
import com.threshold.rxbus2.BaseBus;
import com.threshold.rxbus2.annotation.RxSubscribe;
import com.threshold.rxbus2.util.EventThread;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.Nullable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.schedulers.Schedulers;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: classes.dex */
public class RxBus extends BaseBus {
    private static volatile RxBus defaultBus;
    private final Map<Integer, List<Object>> stickyEventMap;
    private Map<Object, CompositeDisposable> subscriptions;

    public RxBus() {
        this(PublishRelay.create());
    }

    public RxBus(PublishRelay<Object> publishRelay) {
        super(publishRelay);
        this.subscriptions = new HashMap();
        this.stickyEventMap = new ConcurrentHashMap();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addSubscriptionMethod(final Object obj, final Method method) {
        Disposable subscribe = Observable.just(method.getParameterTypes()[0]).doOnNext(new Consumer<Class<?>>() { // from class: com.threshold.rxbus2.RxBus.16
            @Override // io.reactivex.functions.Consumer
            public void accept(Class<?> cls) throws Exception {
                BaseBus.LoggerUtil.debug("Origin: [method: %s ] , param[0] type: %s", method, cls);
            }
        }).map(new Function<Class<?>, Class<?>>() { // from class: com.threshold.rxbus2.RxBus.15
            @Override // io.reactivex.functions.Function
            public Class<?> apply(Class<?> cls) throws Exception {
                Class<?> eventType = RxBus.this.getEventType(cls);
                BaseBus.LoggerUtil.debug("Listen event type: %s", eventType);
                return eventType;
            }
        }).flatMap(new Function<Class<?>, ObservableSource<?>>() { // from class: com.threshold.rxbus2.RxBus.14
            @Override // io.reactivex.functions.Function
            public ObservableSource<?> apply(Class<?> cls) throws Exception {
                RxSubscribe rxSubscribe = (RxSubscribe) method.getAnnotation(RxSubscribe.class);
                BaseBus.LoggerUtil.debug("%s @RxSubscribe Annotation: %s", method, rxSubscribe.observeOnThread());
                return (rxSubscribe.isSticky() ? RxBus.this.ofStickyType(cls) : RxBus.this.ofType(cls)).observeOn(EventThread.getScheduler(rxSubscribe.observeOnThread()));
            }
        }).subscribe(new Consumer<Object>() { // from class: com.threshold.rxbus2.RxBus.12
            @Override // io.reactivex.functions.Consumer
            public void accept(Object obj2) throws Exception {
                BaseBus.LoggerUtil.debug("Subscriber:%s invoke Method:%s", obj, method);
                method.setAccessible(true);
                method.invoke(obj, obj2);
            }
        }, new Consumer<Throwable>() { // from class: com.threshold.rxbus2.RxBus.13
            @Override // io.reactivex.functions.Consumer
            public void accept(Throwable th) throws Exception {
                BaseBus.LoggerUtil.error(th, "[%s] invoke method:[%s] failed", obj, method);
                throw new RuntimeException(th);
            }
        });
        CompositeDisposable compositeDisposable = this.subscriptions.get(Integer.valueOf(obj.hashCode()));
        if (compositeDisposable == null) {
            compositeDisposable = new CompositeDisposable();
        }
        compositeDisposable.add(subscribe);
        this.subscriptions.put(Integer.valueOf(obj.hashCode()), compositeDisposable);
        BaseBus.LoggerUtil.debug("Registered method %s complete", method);
    }

    public static RxBus getDefault() {
        if (defaultBus == null) {
            synchronized (RxBus.class) {
                if (defaultBus == null) {
                    defaultBus = new RxBus();
                }
            }
        }
        return defaultBus;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Class<?> getEventType(Class<?> cls) {
        String name = cls.getName();
        return name.equals(Integer.TYPE.getName()) ? Integer.class : name.equals(Double.TYPE.getName()) ? Double.class : name.equals(Float.TYPE.getName()) ? Float.class : name.equals(Long.TYPE.getName()) ? Long.class : name.equals(Byte.TYPE.getName()) ? Byte.class : name.equals(Short.TYPE.getName()) ? Short.class : name.equals(Boolean.TYPE.getName()) ? Boolean.class : name.equals(Character.TYPE.getName()) ? Character.class : cls;
    }

    public void clearAllSticky() {
        synchronized (this.stickyEventMap) {
            this.stickyEventMap.clear();
        }
    }

    @Nullable
    public <T> List<T> getSticky(@NonNull Class<T> cls) {
        List<T> unmodifiableList;
        ObjectHelper.requireNonNull(cls, "eventType == null");
        synchronized (this.stickyEventMap) {
            List<Object> list = this.stickyEventMap.get(Integer.valueOf(cls.hashCode()));
            unmodifiableList = list == null ? null : Collections.unmodifiableList(list);
        }
        return unmodifiableList;
    }

    public synchronized boolean isRegistered(@NonNull Object obj) {
        ObjectHelper.requireNonNull(obj, "subscriber == null");
        return this.subscriptions.containsKey(Integer.valueOf(obj.hashCode()));
    }

    public <T> Observable<T> ofStickyType(@NonNull Class<T> cls) {
        synchronized (this.stickyEventMap) {
            List<Object> list = this.stickyEventMap.get(Integer.valueOf(cls.hashCode()));
            if (list == null || list.size() <= 0) {
                return ofType(cls);
            }
            return Observable.fromIterable(list).mergeWith(ofType(cls));
        }
    }

    public void postSticky(@NonNull Object obj) {
        ObjectHelper.requireNonNull(obj, "event == null");
        synchronized (this.stickyEventMap) {
            List<Object> list = this.stickyEventMap.get(Integer.valueOf(obj.getClass().hashCode()));
            boolean z = true;
            if (list == null) {
                list = new LinkedList<>();
                z = false;
            }
            list.add(obj);
            if (!z) {
                this.stickyEventMap.put(Integer.valueOf(obj.getClass().hashCode()), list);
            }
        }
        post(obj);
    }

    public void register(@NonNull final Object obj) {
        ObjectHelper.requireNonNull(obj, "subscriber == null");
        Observable.just(obj).filter(new Predicate<Object>() { // from class: com.threshold.rxbus2.RxBus.11
            @Override // io.reactivex.functions.Predicate
            public boolean test(Object obj2) throws Exception {
                boolean isRegistered = RxBus.this.isRegistered(obj2);
                if (isRegistered) {
                    BaseBus.LoggerUtil.warning("%s has already registered", obj2);
                }
                return !isRegistered;
            }
        }).flatMap(new Function<Object, ObservableSource<Method>>() { // from class: com.threshold.rxbus2.RxBus.10
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.reactivex.functions.Function
            public ObservableSource<Method> apply(Object obj2) throws Exception {
                BaseBus.LoggerUtil.debug("start to analyze subscriber: %s", obj2);
                return Observable.fromArray(obj2.getClass().getDeclaredMethods());
            }
        }).map(new Function<Method, Method>() { // from class: com.threshold.rxbus2.RxBus.9
            @Override // io.reactivex.functions.Function
            public Method apply(Method method) throws Exception {
                BaseBus.LoggerUtil.debug("set method can accessible: %s ", method);
                method.setAccessible(true);
                return method;
            }
        }).filter(new Predicate<Method>() { // from class: com.threshold.rxbus2.RxBus.8
            @Override // io.reactivex.functions.Predicate
            public boolean test(Method method) throws Exception {
                if (!method.isAnnotationPresent(RxSubscribe.class)) {
                    return false;
                }
                BaseBus.LoggerUtil.debug("%s present @RxSubscribe annotation", method.getName());
                if (method.getParameterTypes() != null && method.getParameterTypes().length == 1) {
                    return true;
                }
                throw new RuntimeException("Although [" + method + "] present @RxSubscribe annotation. But we expect ONLY ONE param in method.");
            }
        }).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(new Consumer<Method>() { // from class: com.threshold.rxbus2.RxBus.5
            @Override // io.reactivex.functions.Consumer
            public void accept(Method method) throws Exception {
                BaseBus.LoggerUtil.debug("now start to add subscription method: %s", method);
                RxBus.this.addSubscriptionMethod(obj, method);
            }
        }, new Consumer<Throwable>() { // from class: com.threshold.rxbus2.RxBus.6
            @Override // io.reactivex.functions.Consumer
            public void accept(Throwable th) throws Exception {
                BaseBus.LoggerUtil.error(th, "%s failed on register method", obj);
                throw new RuntimeException(obj + " failed on register method", th);
            }
        }, new Action() { // from class: com.threshold.rxbus2.RxBus.7
            @Override // io.reactivex.functions.Action
            public void run() throws Exception {
                BaseBus.LoggerUtil.debug("%s registered complete", obj);
            }
        });
    }

    public void removeStickyEvent(@NonNull Object obj) {
        ObjectHelper.requireNonNull(obj, "event == null");
        synchronized (this.stickyEventMap) {
            List<Object> list = this.stickyEventMap.get(Integer.valueOf(obj.getClass().hashCode()));
            if (list != null) {
                list.remove(obj);
            }
        }
    }

    public void removeStickyEventAt(@NonNull Class<?> cls, int i) {
        ObjectHelper.requireNonNull(cls, "eventType == null");
        synchronized (this.stickyEventMap) {
            List<Object> list = this.stickyEventMap.get(Integer.valueOf(cls.hashCode()));
            if (list != null) {
                list.remove(i);
            }
        }
    }

    public void removeStickyEventType(@NonNull Class<?> cls) {
        ObjectHelper.requireNonNull(cls, "eventType == null");
        synchronized (this.stickyEventMap) {
            this.stickyEventMap.remove(Integer.valueOf(cls.hashCode()));
        }
    }

    public void reset() {
        Observable.fromIterable(this.subscriptions.values()).filter(new Predicate<CompositeDisposable>() { // from class: com.threshold.rxbus2.RxBus.4
            @Override // io.reactivex.functions.Predicate
            public boolean test(CompositeDisposable compositeDisposable) throws Exception {
                return (compositeDisposable == null || compositeDisposable.isDisposed()) ? false : true;
            }
        }).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(new Consumer<CompositeDisposable>() { // from class: com.threshold.rxbus2.RxBus.1
            @Override // io.reactivex.functions.Consumer
            public void accept(CompositeDisposable compositeDisposable) throws Exception {
                compositeDisposable.clear();
            }
        }, new Consumer<Throwable>() { // from class: com.threshold.rxbus2.RxBus.2
            @Override // io.reactivex.functions.Consumer
            public void accept(Throwable th) throws Exception {
                BaseBus.LoggerUtil.error(th, "Dispose subscription", new Object[0]);
            }
        }, new Action() { // from class: com.threshold.rxbus2.RxBus.3
            @Override // io.reactivex.functions.Action
            public void run() throws Exception {
                RxBus.this.stickyEventMap.clear();
                RxBus.this.subscriptions.clear();
            }
        });
    }

    public void unregister(@NonNull final Object obj) {
        ObjectHelper.requireNonNull(obj, "subscriber == null");
        Flowable.just(obj).map(new Function<Object, CompositeDisposable>() { // from class: com.threshold.rxbus2.RxBus.19
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.reactivex.functions.Function
            public CompositeDisposable apply(Object obj2) throws Exception {
                return (CompositeDisposable) RxBus.this.subscriptions.get(Integer.valueOf(obj2.hashCode()));
            }
        }).filter(new Predicate<CompositeDisposable>() { // from class: com.threshold.rxbus2.RxBus.18
            @Override // io.reactivex.functions.Predicate
            public boolean test(CompositeDisposable compositeDisposable) throws Exception {
                return (compositeDisposable == null || compositeDisposable.isDisposed()) ? false : true;
            }
        }).subscribe(new Subscriber<CompositeDisposable>() { // from class: com.threshold.rxbus2.RxBus.17
            @Override // org.reactivestreams.Subscriber
            public void onComplete() {
                BaseBus.LoggerUtil.debug("%s unregister RxBus completed!", obj);
            }

            @Override // org.reactivestreams.Subscriber
            public void onError(Throwable th) {
                BaseBus.LoggerUtil.error(th, "%s unregister RxBus", obj);
            }

            @Override // org.reactivestreams.Subscriber
            public void onNext(CompositeDisposable compositeDisposable) {
                compositeDisposable.dispose();
                RxBus.this.subscriptions.remove(Integer.valueOf(obj.hashCode()));
                BaseBus.LoggerUtil.debug("remove subscription of %s", obj);
            }

            @Override // org.reactivestreams.Subscriber
            public void onSubscribe(Subscription subscription) {
                subscription.request(Long.MAX_VALUE);
            }
        });
    }
}
