/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.confidence;

import com.google.common.annotations.VisibleForTesting;
import com.spotify.confidence.Clock;
import com.spotify.confidence.ConfidenceValue;
import com.spotify.confidence.EventSenderEngine;
import com.spotify.confidence.EventUploader;
import com.spotify.confidence.GrpcEventUploader;
import com.spotify.confidence.events.v1.Event;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeExecutor;
import dev.failsafe.Policy;
import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import io.grpc.ManagedChannel;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class EventSenderEngineImpl
implements EventSenderEngine {
    static final String EVENT_NAME_PREFIX = "eventDefinitions/";
    static final int DEFAULT_BATCH_SIZE = 25;
    static final Duration DEFAULT_MAX_FLUSH_INTERVAL = Duration.ofSeconds(60L);
    static final long DEFAULT_MAX_MEMORY_CONSUMPTION = 0x40000000L;
    private static final Logger log = LoggerFactory.getLogger(EventSenderEngineImpl.class);
    private final EventUploader eventUploader;
    private final Clock clock;
    private final int maxBatchSize;
    private final Duration maxFlushInterval;
    private final FailsafeExecutor<Boolean> uploadExecutor;
    private final ConcurrentLinkedQueue<Event> sendQueue = new ConcurrentLinkedQueue();
    private final Set<CompletableFuture<?>> pendingBatches = ConcurrentHashMap.newKeySet();
    private final Thread pollingThread = new Thread(this::pollLoop);
    private final long maxMemoryConsumption;
    private volatile boolean intakeClosed = false;
    private volatile boolean joinWasInterrupted = false;
    private final AtomicLong estimatedMemoryConsumption = new AtomicLong(0L);

    @VisibleForTesting
    EventSenderEngineImpl(int maxBatchSize, EventUploader eventUploader, Clock clock, Duration maxFlushInterval, long maxMemoryConsumption) {
        if (maxFlushInterval.isZero()) {
            throw new IllegalArgumentException("maxFlushInterval must be positive");
        }
        this.eventUploader = eventUploader;
        this.clock = clock;
        this.maxBatchSize = maxBatchSize;
        this.maxFlushInterval = maxFlushInterval;
        this.maxMemoryConsumption = maxMemoryConsumption;
        this.uploadExecutor = Failsafe.with((Policy)((RetryPolicyBuilder)RetryPolicy.builder().handleResult((Object)false)).withBackoff(1L, 10L, ChronoUnit.SECONDS).withJitter(0.1).withMaxAttempts(-1).withMaxDuration(Duration.ofMinutes(30L)).build(), (Policy[])new RetryPolicy[0]);
        this.pollingThread.start();
    }

    EventSenderEngineImpl(String clientSecret, ManagedChannel channel, Clock clock, int deadlineMillis) {
        this(25, new GrpcEventUploader(clientSecret, clock, channel, deadlineMillis), clock, DEFAULT_MAX_FLUSH_INTERVAL, 0x40000000L);
    }

    @Override
    public void emit(String name, ConfidenceValue.Struct context, Optional<ConfidenceValue.Struct> data) {
        if (this.intakeClosed) {
            log.warn("EventSenderEngine is closed, dropping event {}", (Object)name);
            return;
        }
        Event event = EventUploader.event(name, context, data).setEventTime(this.clock.getTimestamp()).build();
        if (this.estimatedMemoryConsumption.get() + (long)event.getSerializedSize() > this.maxMemoryConsumption) {
            log.warn("EventSenderEngine is overloaded, dropping event {}", (Object)name);
            return;
        }
        this.sendQueue.add(event);
        this.estimatedMemoryConsumption.addAndGet(event.getSerializedSize());
        LockSupport.unpark(this.pollingThread);
    }

    @Override
    public void flush() {
        this.sendQueue.add(Event.newBuilder().setEventDefinition("manual_flash").build());
        LockSupport.unpark(this.pollingThread);
    }

    private void pollLoop() {
        Instant latestFlushTime = Instant.now();
        ArrayList<Object> events = new ArrayList<Event>();
        while (true) {
            boolean passedMaxFlushInterval;
            Event event;
            if ((event = this.sendQueue.poll()) != null) {
                if ("manual_flash".equals(event.getEventDefinition())) {
                    log.debug("Starting events upload due to manual flush");
                    this.upload(events);
                    events = new ArrayList();
                    continue;
                }
                events.add(event);
            } else {
                if (this.intakeClosed) break;
                LockSupport.parkUntil(Instant.now().plus(this.maxFlushInterval).toEpochMilli());
            }
            boolean bl = passedMaxFlushInterval = !this.maxFlushInterval.isZero() && Duration.between(latestFlushTime, Instant.now()).compareTo(this.maxFlushInterval) > 0;
            if (events.size() != this.maxBatchSize && !passedMaxFlushInterval) continue;
            this.upload(events);
            events = new ArrayList();
            latestFlushTime = Instant.now();
        }
        this.upload(events);
    }

    private void upload(List<Event> events) {
        if (events.isEmpty()) {
            return;
        }
        CompletableFuture batchUploaded = this.uploadExecutor.getStageAsync(() -> {
            if (this.joinWasInterrupted) {
                return CompletableFuture.completedFuture(true);
            }
            return this.eventUploader.upload(events);
        });
        this.pendingBatches.add(batchUploaded);
        batchUploaded.whenComplete((res, err) -> {
            this.pendingBatches.remove(batchUploaded);
            this.estimatedMemoryConsumption.addAndGet(-events.stream().mapToLong(Event::getSerializedSize).sum());
        });
    }

    private void joinPollingThread() {
        try {
            LockSupport.unpark(this.pollingThread);
            this.pollingThread.join();
        }
        catch (InterruptedException e) {
            this.sendQueue.clear();
            this.joinWasInterrupted = true;
            Thread.currentThread().interrupt();
        }
    }

    private void awaitPending() {
        try {
            CompletableFuture[] pending = (CompletableFuture[])this.pendingBatches.stream().map(future -> future.exceptionally(throwable -> null)).toArray(CompletableFuture[]::new);
            CompletableFuture.allOf(pending).get(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException | TimeoutException exception) {
            // empty catch block
        }
    }

    @VisibleForTesting
    long getEstimatedMemoryConsumption() {
        return this.estimatedMemoryConsumption.get();
    }

    @Override
    public synchronized void close() throws IOException {
        if (this.intakeClosed) {
            return;
        }
        this.intakeClosed = true;
        this.joinPollingThread();
        this.awaitPending();
        this.pendingBatches.forEach(batch -> batch.cancel(true));
    }
}

