package discord4j.voice;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iwebpp.crypto.TweetNaclFast;
import discord4j.common.JacksonResources;
import discord4j.common.LogUtil;
import discord4j.common.ResettableInterval;
import discord4j.common.close.CloseException;
import discord4j.common.close.CloseStatus;
import discord4j.common.close.DisconnectBehavior;
import discord4j.common.retry.ReconnectContext;
import discord4j.common.retry.ReconnectOptions;
import discord4j.common.util.Snowflake;
import discord4j.voice.VoiceConnection;
import discord4j.voice.json.Heartbeat;
import discord4j.voice.json.Hello;
import discord4j.voice.json.Identify;
import discord4j.voice.json.Ready;
import discord4j.voice.json.Resume;
import discord4j.voice.json.Resumed;
import discord4j.voice.json.SelectProtocol;
import discord4j.voice.json.SentSpeaking;
import discord4j.voice.json.SessionDescription;
import discord4j.voice.json.VoiceGatewayPayload;
import discord4j.voice.retry.VoiceGatewayException;
import discord4j.voice.retry.VoiceGatewayReconnectException;
import discord4j.voice.retry.VoiceGatewayRetrySpec;
import discord4j.voice.retry.VoiceServerUpdateReconnectException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.function.TupleUtils;
import reactor.netty.ConnectionObserver;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.WebsocketClientSpec;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;
import reactor.util.retry.Retry;
import reactor.util.retry.RetrySpec;

/* loaded from: input_file:discord4j/voice/DefaultVoiceGatewayClient.class */
public class DefaultVoiceGatewayClient {
    private static final Logger log = Loggers.getLogger((Class<?>) DefaultVoiceGatewayClient.class);
    private static final Logger senderLog = Loggers.getLogger("discord4j.voice.protocol.sender");
    private static final Logger receiverLog = Loggers.getLogger("discord4j.voice.protocol.receiver");
    private final FluxSink<ByteBuf> receiverSink;
    private final FluxSink<VoiceGatewayPayload<?>> outboundSink;
    private final FluxSink<VoiceGatewayEvent> eventSink;
    private final Snowflake guildId;
    private final Snowflake selfId;
    private final Function<VoiceGatewayPayload<?>, Mono<ByteBuf>> payloadWriter;
    private final Function<ByteBuf, Mono<? super VoiceGatewayPayload<?>>> payloadReader;
    private final VoiceReactorResources reactorResources;
    private final ReconnectOptions reconnectOptions;
    private final ReconnectContext reconnectContext;
    private final AudioProvider audioProvider;
    private final AudioReceiver audioReceiver;
    private final VoiceSendTaskFactory sendTaskFactory;
    private final VoiceReceiveTaskFactory receiveTaskFactory;
    private final VoiceDisconnectTask disconnectTask;
    private final VoiceServerUpdateTask serverUpdateTask;
    private final VoiceChannelRetrieveTask channelRetrieveTask;
    private final Duration ipDiscoveryTimeout;
    private final RetrySpec ipDiscoveryRetrySpec;
    private final VoiceSocket voiceSocket;
    private final ResettableInterval heartbeat;
    private final ReplayProcessor<VoiceConnection.State> state;
    private final FluxSink<VoiceConnection.State> stateChanges;
    private volatile int ssrc;
    private volatile MonoProcessor<CloseStatus> disconnectNotifier;
    private volatile Context currentContext;
    private volatile VoiceWebsocketHandler sessionHandler;
    private final EmitterProcessor<ByteBuf> receiver = EmitterProcessor.create(false);
    private final EmitterProcessor<VoiceGatewayPayload<?>> outbound = EmitterProcessor.create(false);
    private final EmitterProcessor<VoiceGatewayEvent> events = EmitterProcessor.create(false);
    private final Disposable.Swap cleanup = Disposables.swap();
    private final AtomicReference<VoiceServerOptions> serverOptions = new AtomicReference<>();
    private final AtomicReference<String> session = new AtomicReference<>();

    public DefaultVoiceGatewayClient(VoiceGatewayOptions voiceGatewayOptions) {
        this.guildId = voiceGatewayOptions.getGuildId();
        this.selfId = voiceGatewayOptions.getSelfId();
        ObjectMapper objectMapper = ((JacksonResources) Objects.requireNonNull(voiceGatewayOptions.getJacksonResources())).getObjectMapper();
        this.payloadWriter = voiceGatewayPayload -> {
            return Mono.fromCallable(() -> {
                return Unpooled.wrappedBuffer(objectMapper.writeValueAsBytes(voiceGatewayPayload));
            });
        };
        this.payloadReader = byteBuf -> {
            return Mono.fromCallable(() -> {
                return (VoiceGatewayPayload) objectMapper.readValue(new ByteBufInputStream(byteBuf), new TypeReference<VoiceGatewayPayload<?>>() { // from class: discord4j.voice.DefaultVoiceGatewayClient.1
                });
            });
        };
        this.reactorResources = (VoiceReactorResources) Objects.requireNonNull(voiceGatewayOptions.getReactorResources());
        this.reconnectOptions = (ReconnectOptions) Objects.requireNonNull(voiceGatewayOptions.getReconnectOptions());
        this.reconnectContext = new ReconnectContext(this.reconnectOptions.getFirstBackoff(), this.reconnectOptions.getMaxBackoffInterval());
        this.audioProvider = (AudioProvider) Objects.requireNonNull(voiceGatewayOptions.getAudioProvider());
        this.audioReceiver = (AudioReceiver) Objects.requireNonNull(voiceGatewayOptions.getAudioReceiver());
        this.sendTaskFactory = (VoiceSendTaskFactory) Objects.requireNonNull(voiceGatewayOptions.getSendTaskFactory());
        this.receiveTaskFactory = (VoiceReceiveTaskFactory) Objects.requireNonNull(voiceGatewayOptions.getReceiveTaskFactory());
        this.disconnectTask = (VoiceDisconnectTask) Objects.requireNonNull(voiceGatewayOptions.getDisconnectTask());
        this.serverUpdateTask = (VoiceServerUpdateTask) Objects.requireNonNull(voiceGatewayOptions.getServerUpdateTask());
        this.channelRetrieveTask = (VoiceChannelRetrieveTask) Objects.requireNonNull(voiceGatewayOptions.getChannelRetrieveTask());
        this.ipDiscoveryTimeout = (Duration) Objects.requireNonNull(voiceGatewayOptions.getIpDiscoveryTimeout());
        this.ipDiscoveryRetrySpec = (RetrySpec) Objects.requireNonNull(voiceGatewayOptions.getIpDiscoveryRetrySpec());
        this.voiceSocket = new VoiceSocket(this.reactorResources.getUdpClient());
        this.heartbeat = new ResettableInterval(this.reactorResources.getTimerTaskScheduler());
        this.state = ReplayProcessor.cacheLastOrDefault(VoiceConnection.State.CONNECTING);
        this.stateChanges = this.state.sink(FluxSink.OverflowStrategy.LATEST);
        this.receiverSink = this.receiver.sink(FluxSink.OverflowStrategy.BUFFER);
        this.outboundSink = this.outbound.sink(FluxSink.OverflowStrategy.ERROR);
        this.eventSink = this.events.sink(FluxSink.OverflowStrategy.LATEST);
    }

    public Mono<VoiceConnection> start(VoiceServerOptions voiceServerOptions, String str) {
        return Mono.create(monoSink -> {
            monoSink.onRequest(j -> {
                monoSink.onCancel(connect(voiceServerOptions, str, monoSink).subscriberContext(monoSink.currentContext()).subscribe(null, th -> {
                    log.debug(LogUtil.format(monoSink.currentContext(), "Voice gateway error: {}"), th.toString());
                }, () -> {
                    log.debug(LogUtil.format(monoSink.currentContext(), "Voice gateway completed"));
                }));
            });
        });
    }

    private Mono<Void> connect(VoiceServerOptions voiceServerOptions, String str, MonoSink<VoiceConnection> monoSink) {
        return Mono.deferWithContext(context -> {
            this.serverOptions.compareAndSet(null, voiceServerOptions);
            this.session.compareAndSet(null, str);
            this.disconnectNotifier = MonoProcessor.create();
            this.currentContext = context;
            this.sessionHandler = new VoiceWebsocketHandler(this.receiverSink, this.outbound.flatMap(this.payloadWriter).doOnNext(byteBuf -> {
                logPayload(senderLog, context, byteBuf);
            }), context);
            Mono<VoiceConnection.State> doOnNext = this.state.next().doOnNext(state -> {
                if (state == VoiceConnection.State.RESUMING) {
                    log.info(LogUtil.format(context, "Attempting to resume"));
                    this.outboundSink.next(new Resume(this.guildId.asString(), this.session.get(), this.serverOptions.get().getToken()));
                } else {
                    this.stateChanges.next(VoiceConnection.State.CONNECTING);
                    log.info(LogUtil.format(context, "Identifying"));
                    this.outboundSink.next(new Identify(this.guildId.asString(), this.selfId.asString(), this.session.get(), this.serverOptions.get().getToken()));
                }
            });
            Disposable.Composite composite = Disposables.composite();
            Mono<Void> then = this.receiver.doOnNext(byteBuf2 -> {
                logPayload(receiverLog, context, byteBuf2);
            }).flatMap(this.payloadReader).doOnNext(obj -> {
                if (obj instanceof Hello) {
                    Duration ofMillis = Duration.ofMillis(((Hello) obj).getData().getHeartbeatInterval());
                    this.heartbeat.start(ofMillis, ofMillis);
                } else if (obj instanceof Ready) {
                    log.info(LogUtil.format(context, "Waiting for session description"));
                    Ready ready = (Ready) obj;
                    this.ssrc = ready.getData().getSsrc();
                    this.cleanup.update(composite);
                    composite.add(Mono.defer(() -> {
                        return this.voiceSocket.setup(ready.getData().getIp(), ready.getData().getPort());
                    }).zipWith(this.voiceSocket.performIpDiscovery(ready.getData().getSsrc())).timeout(this.ipDiscoveryTimeout).retryWhen(this.ipDiscoveryRetrySpec).subscriberContext(context).onErrorMap(th -> {
                        return new VoiceGatewayException(context, "UDP socket setup error", th);
                    }).subscribe(TupleUtils.consumer((connection, inetSocketAddress) -> {
                        composite.add(connection);
                        this.outboundSink.next(new SelectProtocol("udp", inetSocketAddress.getHostName(), inetSocketAddress.getPort(), "xsalsa20_poly1305"));
                    }), th2 -> {
                        monoSink.error(th2);
                        this.sessionHandler.close(DisconnectBehavior.stop(th2));
                    }, () -> {
                        log.debug(LogUtil.format(context, "Voice socket setup complete"));
                    }));
                } else if (obj instanceof SessionDescription) {
                    log.info(LogUtil.format(context, "Receiving events"));
                    this.stateChanges.next(VoiceConnection.State.CONNECTED);
                    this.reconnectContext.reset();
                    PacketTransformer packetTransformer = new PacketTransformer(this.ssrc, new TweetNaclFast.SecretBox(((SessionDescription) obj).getData().getSecretKey()));
                    Consumer<Boolean> consumer = bool -> {
                        this.outboundSink.next(new SentSpeaking(bool.booleanValue(), 0, this.ssrc));
                    };
                    composite.add(() -> {
                        log.debug(LogUtil.format(context, "Disposing voice tasks"));
                    });
                    VoiceSendTaskFactory voiceSendTaskFactory = this.sendTaskFactory;
                    Scheduler sendTaskScheduler = this.reactorResources.getSendTaskScheduler();
                    VoiceSocket voiceSocket = this.voiceSocket;
                    voiceSocket.getClass();
                    composite.add(voiceSendTaskFactory.create(sendTaskScheduler, consumer, voiceSocket::send, this.audioProvider, packetTransformer));
                    composite.add(this.receiveTaskFactory.create(this.reactorResources.getReceiveTaskScheduler(), this.voiceSocket.getInbound(), packetTransformer, this.audioReceiver));
                    composite.add(this.serverUpdateTask.onVoiceServerUpdate(this.guildId).subscribe(voiceServerOptions2 -> {
                        VoiceServerOptions voiceServerOptions2 = this.serverOptions.get();
                        if (voiceServerOptions2.getEndpoint().equals(voiceServerOptions2.getEndpoint())) {
                            return;
                        }
                        log.debug(LogUtil.format(context, "Voice server endpoint change: {}"), voiceServerOptions2.getEndpoint(), voiceServerOptions2.getEndpoint());
                        this.serverOptions.set(voiceServerOptions2);
                        this.sessionHandler.close(DisconnectBehavior.retryAbruptly(new VoiceServerUpdateReconnectException(context)));
                    }));
                    monoSink.success(acquireConnection());
                } else if (obj instanceof Resumed) {
                    log.info(LogUtil.format(context, "Resumed"));
                    this.stateChanges.next(VoiceConnection.State.CONNECTED);
                    this.reconnectContext.reset();
                }
                this.eventSink.next((VoiceGatewayEvent) obj);
            }).then();
            Flux<V> map = this.heartbeat.ticks().map((v1) -> {
                return new Heartbeat(v1);
            });
            FluxSink<VoiceGatewayPayload<?>> fluxSink = this.outboundSink;
            fluxSink.getClass();
            Mono doOnError = Mono.zip(((HttpClient.WebsocketSender) this.reactorResources.getHttpClient().headers(httpHeaders -> {
                httpHeaders.add(HttpHeaderNames.USER_AGENT, "DiscordBot(https://discord4j.com, 3)");
            }).observe(getObserver(context)).websocket(WebsocketClientSpec.builder().maxFramePayloadLength(Integer.MAX_VALUE).build()).uri(this.serverOptions.get().getEndpoint() + "?v=4")).handle((websocketInbound, websocketOutbound) -> {
                return doOnNext.then(this.sessionHandler.handle(websocketInbound, websocketOutbound));
            }).subscriberContext(LogUtil.clearContext()).flatMap(tuple2 -> {
                return handleClose((DisconnectBehavior) tuple2.getT1(), (CloseStatus) tuple2.getT2());
            }).then(), then, map.doOnNext((v1) -> {
                r1.next(v1);
            }).then()).doOnError(th -> {
                log.error(LogUtil.format(context, "{}"), th.toString());
            });
            ResettableInterval resettableInterval = this.heartbeat;
            resettableInterval.getClass();
            return doOnError.doOnTerminate(resettableInterval::stop).doOnCancel(() -> {
                this.sessionHandler.close();
            }).then();
        }).subscriberContext(context2 -> {
            return context2.put(LogUtil.KEY_GUILD_ID, this.guildId.asString());
        }).retryWhen(retryFactory()).then(Mono.defer(() -> {
            return this.disconnectNotifier.then();
        })).doOnSubscribe(subscription -> {
            if (this.disconnectNotifier != null) {
                throw new IllegalStateException("connect can only be subscribed once");
            }
        });
    }

    private ConnectionObserver getObserver(Context context) {
        return (connection, state) -> {
            log.debug(LogUtil.format(context, "{} {}"), state, connection);
        };
    }

    private VoiceConnection acquireConnection() {
        return new VoiceConnection() { // from class: discord4j.voice.DefaultVoiceGatewayClient.2
            @Override // discord4j.voice.VoiceConnection
            public Flux<VoiceGatewayEvent> events() {
                return DefaultVoiceGatewayClient.this.events;
            }

            @Override // discord4j.voice.VoiceConnection
            public Flux<VoiceConnection.State> stateEvents() {
                return DefaultVoiceGatewayClient.this.state;
            }

            @Override // discord4j.voice.VoiceConnection
            public Mono<Void> disconnect() {
                return onConnectOrDisconnect().flatMap(state -> {
                    return state.equals(VoiceConnection.State.CONNECTED) ? DefaultVoiceGatewayClient.this.stop() : Mono.empty();
                }).then();
            }

            @Override // discord4j.voice.VoiceConnection
            public Snowflake getGuildId() {
                return DefaultVoiceGatewayClient.this.guildId;
            }

            @Override // discord4j.voice.VoiceConnection
            public Mono<Snowflake> getChannelId() {
                return onConnectOrDisconnect().flatMap(state -> {
                    return state.equals(VoiceConnection.State.CONNECTED) ? DefaultVoiceGatewayClient.this.channelRetrieveTask.onRequest() : Mono.empty();
                });
            }

            @Override // discord4j.voice.VoiceConnection
            public Mono<Void> reconnect() {
                return reconnect(VoiceGatewayReconnectException::new);
            }

            @Override // discord4j.voice.VoiceConnection
            public Mono<Void> reconnect(Function<Context, Throwable> function) {
                return onConnectOrDisconnect().flatMap(state -> {
                    return state.equals(VoiceConnection.State.CONNECTED) ? Mono.fromRunnable(() -> {
                        DefaultVoiceGatewayClient.this.sessionHandler.close(DisconnectBehavior.retryAbruptly((Throwable) function.apply(DefaultVoiceGatewayClient.this.currentContext)));
                    }).then(stateEvents().filter(state -> {
                        return state.equals(VoiceConnection.State.CONNECTED);
                    }).next()) : Mono.error(new IllegalStateException("Voice connection has already disconnected"));
                }).then();
            }
        };
    }

    public Mono<Void> stop() {
        return Mono.defer(() -> {
            if (this.sessionHandler == null || this.disconnectNotifier == null) {
                return Mono.error(new IllegalStateException("Gateway client is not active!"));
            }
            if (!this.disconnectNotifier.isTerminated()) {
                this.sessionHandler.close(DisconnectBehavior.stop(null));
            }
            return this.disconnectNotifier.then();
        });
    }

    private void logPayload(Logger logger, Context context, ByteBuf byteBuf) {
        logger.trace(LogUtil.format(context, byteBuf.toString(StandardCharsets.UTF_8).replaceAll("(\"token\": ?\")([A-Za-z0-9._-]*)(\")", "$1hunter2$3")));
    }

    private Retry retryFactory() {
        return VoiceGatewayRetrySpec.create(this.reconnectOptions, this.reconnectContext).doBeforeRetry(voiceGatewayRetrySignal -> {
            this.stateChanges.next(voiceGatewayRetrySignal.nextState());
            long iteration = voiceGatewayRetrySignal.iteration();
            log.debug(LogUtil.format(getContextFromException(voiceGatewayRetrySignal.failure()), "{} in {} (attempts: {})"), voiceGatewayRetrySignal.nextState(), voiceGatewayRetrySignal.nextBackoff(), Long.valueOf(iteration));
        });
    }

    private Context getContextFromException(Throwable th) {
        return th instanceof CloseException ? ((CloseException) th).getContext() : th instanceof VoiceGatewayException ? ((VoiceGatewayException) th).getContext() : Context.empty();
    }

    private Mono<CloseStatus> handleClose(DisconnectBehavior disconnectBehavior, CloseStatus closeStatus) {
        return Mono.deferWithContext(context -> {
            DisconnectBehavior stop = VoiceGatewayRetrySpec.NON_RETRYABLE_STATUS_CODES.contains(Integer.valueOf(closeStatus.getCode())) ? DisconnectBehavior.stop(disconnectBehavior.getCause()) : disconnectBehavior;
            log.debug(LogUtil.format(context, "Closing and {} with status {}"), stop, closeStatus);
            this.heartbeat.stop();
            if (stop.getAction() == DisconnectBehavior.Action.STOP) {
                this.cleanup.dispose();
            }
            switch (stop.getAction()) {
                case STOP_ABRUPTLY:
                case STOP:
                    return stop.getCause() != null ? Mono.just(new CloseException(closeStatus, context, stop.getCause())).flatMap(closeException -> {
                        this.stateChanges.next(VoiceConnection.State.DISCONNECTED);
                        this.disconnectNotifier.onError(closeException);
                        return this.disconnectTask.onDisconnect(this.guildId).then(closeStatus.getCode() == 4014 ? Mono.just(closeStatus) : Mono.error(closeException));
                    }) : Mono.just(closeStatus).flatMap(closeStatus2 -> {
                        this.stateChanges.next(VoiceConnection.State.DISCONNECTED);
                        this.disconnectNotifier.onNext(closeStatus);
                        return this.disconnectTask.onDisconnect(this.guildId).thenReturn(closeStatus);
                    });
                case RETRY_ABRUPTLY:
                case RETRY:
                default:
                    return Mono.error(new CloseException(closeStatus, context, stop.getCause()));
            }
        });
    }
}
