package com.hivemq.mqtt.handler.publish;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.configuration.service.InternalConfigurations;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.message.publish.PublishWithFuture;
import com.hivemq.mqtt.message.publish.PubrelWithFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hivemq/mqtt/handler/publish/OrderedTopicService.class */
public class OrderedTopicService {

    @NotNull
    private static final Logger log = LoggerFactory.getLogger(OrderedTopicService.class);

    @NotNull
    private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();

    @NotNull
    private final Map<Integer, SettableFuture<PublishStatus>> messageIdToFutureMap = new ConcurrentHashMap();

    @VisibleForTesting
    @NotNull
    final Queue<QueuedMessage> queue = new ArrayDeque();

    @NotNull
    private final AtomicBoolean closedAlready = new AtomicBoolean(false);

    @NotNull
    private final Set<Integer> unacknowledgedMessages = ConcurrentHashMap.newKeySet();

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:com/hivemq/mqtt/handler/publish/OrderedTopicService$QueuedMessage.class */
    public static class QueuedMessage {

        @NotNull
        private final PUBLISH publish;

        @NotNull
        private final ChannelPromise promise;

        QueuedMessage(@NotNull PUBLISH publish, @NotNull ChannelPromise channelPromise) {
            this.publish = publish;
            this.promise = channelPromise;
        }

        @NotNull
        public PUBLISH getPublish() {
            return this.publish;
        }

        @NotNull
        public ChannelPromise getPromise() {
            return this.promise;
        }
    }

    public void messageFlowComplete(@NotNull ChannelHandlerContext channelHandlerContext, int i) {
        SettableFuture<PublishStatus> settableFuture = this.messageIdToFutureMap.get(Integer.valueOf(i));
        if (settableFuture != null) {
            this.messageIdToFutureMap.remove(Integer.valueOf(i));
            settableFuture.set(PublishStatus.DELIVERED);
        }
        if (this.unacknowledgedMessages.remove(Integer.valueOf(i)) && !this.queue.isEmpty()) {
            ClientConnection of = ClientConnection.of(channelHandlerContext.channel());
            int maxInflightWindow = of == null ? InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE_MESSAGES : of.getMaxInflightWindow(InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE_MESSAGES);
            do {
                QueuedMessage poll = this.queue.poll();
                if (poll == null) {
                    return;
                }
                this.unacknowledgedMessages.add(Integer.valueOf(poll.publish.getPacketIdentifier()));
                channelHandlerContext.writeAndFlush(poll.getPublish(), poll.getPromise());
            } while (this.unacknowledgedMessages.size() < maxInflightWindow);
        }
    }

    public boolean handlePublish(@NotNull Channel channel, @NotNull Object obj, @NotNull ChannelPromise channelPromise) {
        String clientId;
        if (obj instanceof PubrelWithFuture) {
            PubrelWithFuture pubrelWithFuture = (PubrelWithFuture) obj;
            this.messageIdToFutureMap.put(Integer.valueOf(pubrelWithFuture.getPacketIdentifier()), pubrelWithFuture.getFuture());
            return false;
        }
        if (!(obj instanceof PUBLISH)) {
            return false;
        }
        SettableFuture<PublishStatus> settableFuture = null;
        if (obj instanceof PublishWithFuture) {
            settableFuture = ((PublishWithFuture) obj).getFuture();
        }
        ClientConnection of = ClientConnection.of(channel);
        if (of == null || (clientId = of.getClientId()) == null) {
            return false;
        }
        PUBLISH publish = (PUBLISH) obj;
        int qosNumber = publish.getQoS().getQosNumber();
        if (log.isTraceEnabled()) {
            log.trace("Client {}: Sending PUBLISH QoS {} Message with packet id {}", new Object[]{clientId, Integer.valueOf(publish.getQoS().getQosNumber()), Integer.valueOf(publish.getPacketIdentifier())});
        }
        if (qosNumber < 1) {
            if (settableFuture == null) {
                return false;
            }
            settableFuture.set(PublishStatus.DELIVERED);
            return false;
        }
        if (settableFuture != null) {
            this.messageIdToFutureMap.put(Integer.valueOf(publish.getPacketIdentifier()), settableFuture);
        }
        if (this.closedAlready.get()) {
            channelPromise.setFailure(CLOSED_CHANNEL_EXCEPTION);
            return true;
        }
        if (this.unacknowledgedMessages.size() >= of.getMaxInflightWindow(InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE_MESSAGES)) {
            queueMessage(channelPromise, publish, clientId);
            return true;
        }
        this.unacknowledgedMessages.add(Integer.valueOf(publish.getPacketIdentifier()));
        return false;
    }

    public void handleInactive() {
        this.closedAlready.set(true);
        for (QueuedMessage queuedMessage : this.queue) {
            if (queuedMessage != null && !queuedMessage.getPromise().isDone()) {
                queuedMessage.getPromise().setFailure(CLOSED_CHANNEL_EXCEPTION);
            }
        }
        Iterator<Map.Entry<Integer, SettableFuture<PublishStatus>>> it = this.messageIdToFutureMap.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().set(PublishStatus.NOT_CONNECTED);
        }
    }

    private void queueMessage(@NotNull ChannelPromise channelPromise, @NotNull PUBLISH publish, @NotNull String str) {
        if (log.isTraceEnabled()) {
            log.trace("Buffered publish message with qos {} packetIdentifier {} and topic {} for client {}, because the receive maximum is exceeded", new Object[]{publish.getQoS().name(), Integer.valueOf(publish.getPacketIdentifier()), publish.getTopic(), str});
        }
        this.queue.add(new QueuedMessage(publish, channelPromise));
    }

    @NotNull
    public Set<Integer> unacknowledgedMessages() {
        return this.unacknowledgedMessages;
    }

    static {
        CLOSED_CHANNEL_EXCEPTION.setStackTrace(new StackTraceElement[0]);
    }
}
