package me.snow.chat.stompclient;

import defpackage.XA;
import jam.common.util.LidUtils;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import me.snow.chat.ConnectionProvider;
import me.snow.chat.LifecycleEvent;
import me.snow.chat.stomp.StompCommand;
import me.snow.chat.stomp.StompHeader;
import me.snow.chat.stomp.StompMessage;
import me.snow.chat.stompclient.RxNettyClientConnectionProvider;
import me.snow.utils.struct.IsNotEmpty;
import rx.Completable;
import rx.Observable;
import rx.functions.Action1;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import rx.subscriptions.CompositeSubscription;

/* loaded from: classes2.dex */
public class RxNettyClientConnectionProvider implements ConnectionProvider {
    public CompositeSubscription compositeSubscription;
    public final ConnectionConfig config;
    public boolean haveConnection;
    public BehaviorSubject<LifecycleEvent> lifecycleSubject;
    public PublishSubject<StompMessage> publishSubject;
    public volatile RxNettyClient rxNettyClient;

    public RxNettyClientConnectionProvider(String str, int i, boolean z) {
        this(new ConnectionConfig().setHost(str).setPort(i).setUseSsl(z));
    }

    public RxNettyClientConnectionProvider(ConnectionConfig connectionConfig) {
        this.publishSubject = PublishSubject.create();
        this.lifecycleSubject = BehaviorSubject.create(new LifecycleEvent(LifecycleEvent.Type.CLOSED));
        this.compositeSubscription = new CompositeSubscription();
        this.config = connectionConfig;
    }

    public final StompMessage a(String str) {
        ArrayList arrayList = new ArrayList();
        if (IsNotEmpty.string(str)) {
            arrayList.add(new StompHeader(StompHeader.RECEIPT, str));
        }
        return new StompMessage(StompCommand.DISCONNECT, arrayList);
    }

    public final void a() {
        if (this.haveConnection) {
            throw new IllegalStateException("Already have connection");
        }
        try {
            if (this.rxNettyClient != null) {
                this.rxNettyClient.close();
            }
            this.rxNettyClient = new RxNettyClient(this.config, new XA(this));
            this.rxNettyClient.connect();
            this.haveConnection = true;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public /* synthetic */ void a(Throwable th) {
        b();
    }

    public /* synthetic */ void a(Void r1) {
        b();
    }

    public final void a(LifecycleEvent lifecycleEvent) {
        BehaviorSubject<LifecycleEvent> behaviorSubject;
        if (lifecycleEvent == null || (behaviorSubject = this.lifecycleSubject) == null) {
            return;
        }
        behaviorSubject.onNext(lifecycleEvent);
    }

    public final void a(StompMessage stompMessage) {
        if (StompCommand.ALIVE.equals(stompMessage.getStompCommand())) {
            return;
        }
        this.publishSubject.onNext(stompMessage);
        if (StompCommand.CONNECTED.equals(stompMessage.getStompCommand())) {
            a(new LifecycleEvent(LifecycleEvent.Type.CONNECTED));
        }
    }

    public final void b() {
        if (!this.compositeSubscription.isUnsubscribed()) {
            this.compositeSubscription.unsubscribe();
        }
        if (this.rxNettyClient != null) {
            this.rxNettyClient.close();
            this.rxNettyClient = null;
        }
    }

    public final void c() {
        if (this.rxNettyClient == null) {
            return;
        }
        send(a(LidUtils.generateMessageId())).onErrorResumeNext(Observable.empty()).timeout(10L, TimeUnit.SECONDS).subscribe(new Action1() { // from class: UA
            @Override // rx.functions.Action1
            public final void call(Object obj) {
                RxNettyClientConnectionProvider.this.a((Void) obj);
            }
        }, new Action1() { // from class: TA
            @Override // rx.functions.Action1
            public final void call(Object obj) {
                RxNettyClientConnectionProvider.this.a((Throwable) obj);
            }
        });
    }

    @Override // me.snow.chat.ConnectionProvider
    public void close() {
        this.haveConnection = false;
        c();
    }

    @Override // me.snow.chat.ConnectionProvider
    public Observable<LifecycleEvent> getLifecycleReceiver() {
        return this.lifecycleSubject;
    }

    @Override // me.snow.chat.ConnectionProvider
    public boolean isConnected() {
        return this.rxNettyClient != null && this.rxNettyClient.isConnected();
    }

    @Override // me.snow.chat.ConnectionProvider
    public Observable<StompMessage> messages() {
        a();
        return this.publishSubject;
    }

    @Override // me.snow.chat.ConnectionProvider
    public void publishCid(long j) {
        this.config.getCidObservable().onNext(Long.valueOf(j));
    }

    @Override // me.snow.chat.ConnectionProvider
    public void publishEpisodeId(long j) {
        this.config.getEpisodeIdObservable().onNext(Long.valueOf(j));
    }

    @Override // me.snow.chat.ConnectionProvider
    public void reconnect() {
        RxNettyClient rxNettyClient = this.rxNettyClient;
    }

    @Override // me.snow.chat.ConnectionProvider
    public Observable<Void> send(StompMessage stompMessage) {
        if (this.rxNettyClient != null) {
            return this.rxNettyClient.send(stompMessage);
        }
        return Observable.error(new IllegalStateException("Not connected yet message : " + stompMessage));
    }

    @Override // me.snow.chat.ConnectionProvider
    public Completable writeKeep() {
        return this.rxNettyClient == null ? Completable.error(new IllegalStateException("Channel is inactivated")) : this.rxNettyClient.writeKeep();
    }
}
