/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.confidence;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.spotify.confidence.Clock;
import com.spotify.confidence.ConfidenceValue;
import com.spotify.confidence.EventSenderEngine;
import com.spotify.confidence.EventSenderEngineImpl;
import com.spotify.confidence.EventUploader;
import com.spotify.confidence.FakeClock;
import com.spotify.confidence.FakeUploader;
import com.spotify.confidence.events.v1.Event;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class EventSenderEngineTest {
    private final FakeClock clock = new FakeClock();

    @Test
    public void testEngineRejectsEventsAfterClosed() throws IOException {
        FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
        int maxBatchSize = 6;
        EventSenderEngineImpl engine = new EventSenderEngineImpl(6, (EventUploader)alwaysSucceedUploader, (Clock)this.clock, EventSenderEngineImpl.DEFAULT_MAX_FLUSH_INTERVAL, 0x40000000L);
        engine.emit("navigate", ConfidenceValue.of((Map)ImmutableMap.of((Object)"key", (Object)ConfidenceValue.of((String)"size"))), Optional.empty());
        engine.close();
        engine.emit("navigate", ConfidenceValue.of((Map)ImmutableMap.of((Object)"key", (Object)ConfidenceValue.of((String)"size"))), Optional.empty());
        Assertions.assertThat((int)alwaysSucceedUploader.uploadCalls.size()).isEqualTo(1);
    }

    @Test
    public void testEngineUploads() throws IOException {
        FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
        int maxBatchSize = 6;
        int numEvents = 14;
        EventSenderEngineImpl engine = new EventSenderEngineImpl(6, (EventUploader)alwaysSucceedUploader, (Clock)this.clock, EventSenderEngineImpl.DEFAULT_MAX_FLUSH_INTERVAL, 0x40000000L);
        int size = 0;
        while (size++ < 14) {
            engine.emit("navigate", ConfidenceValue.of((Map)ImmutableMap.of((Object)"key", (Object)ConfidenceValue.of((String)"size"))), Optional.empty());
        }
        engine.close();
        boolean additionalBatch = true;
        int uploadCallsCount = alwaysSucceedUploader.uploadCalls.size();
        int fullBatchCount = (int)alwaysSucceedUploader.uploadCalls.stream().filter(batch -> batch.size() == 6).count();
        int eventsCount = alwaysSucceedUploader.uploadCalls.stream().mapToInt(batch -> batch.size()).sum();
        Assertions.assertThat((int)uploadCallsCount).isEqualTo(3);
        Assertions.assertThat((int)eventsCount).isEqualTo(14);
        Assertions.assertThat((int)(uploadCallsCount - fullBatchCount)).isEqualTo(1);
    }

    @Test
    public void testSendingEventsAfterManualFlush() throws Exception {
        FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
        int maxBatchSize = 6;
        int numEvents = 14;
        EventSenderEngineImpl engine = new EventSenderEngineImpl(6, (EventUploader)alwaysSucceedUploader, (Clock)this.clock, EventSenderEngineImpl.DEFAULT_MAX_FLUSH_INTERVAL, 0x40000000L);
        engine.emit("navigate", ConfidenceValue.of((Map)ImmutableMap.of((Object)"key", (Object)ConfidenceValue.of((String)"size"))), Optional.empty());
        engine.flush();
        int timeoutCalls = 5;
        int calls = 0;
        while (calls++ <= 5) {
            Thread.sleep(200L);
            if (alwaysSucceedUploader.uploadCalls.size() != 1) continue;
        }
        Assertions.assertThat((int)alwaysSucceedUploader.uploadCalls.size()).isEqualTo(1);
        engine.emit("navigate", ConfidenceValue.of((Map)ImmutableMap.of((Object)"key", (Object)ConfidenceValue.of((String)"size"))), Optional.empty());
        engine.close();
        Assertions.assertThat((int)alwaysSucceedUploader.uploadCalls.size()).isEqualTo(2);
    }

    @Test
    public void testOverlappingKeysInPayload() throws InterruptedException {
        FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
        EventSenderEngineImpl engine = new EventSenderEngineImpl(1, (EventUploader)alwaysSucceedUploader, (Clock)this.clock, EventSenderEngineImpl.DEFAULT_MAX_FLUSH_INTERVAL, 0x40000000L);
        engine.emit("my_event", ConfidenceValue.of((Map)ImmutableMap.of((Object)"a", (Object)ConfidenceValue.of((int)2), (Object)"message", (Object)ConfidenceValue.of((int)3))), Optional.of(ConfidenceValue.Struct.of(Map.of("a", ConfidenceValue.of((int)0), "message", ConfidenceValue.of((int)1)))));
        Thread.sleep(300L);
        Assertions.assertThat((Object)alwaysSucceedUploader.uploadCalls.peek().get(0).getPayload()).isEqualTo((Object)Struct.newBuilder().putAllFields(Map.of("a", Value.newBuilder().setNumberValue(0.0).build(), "message", Value.newBuilder().setNumberValue(1.0).build(), "context", Value.newBuilder().setStructValue(Struct.newBuilder().putAllFields(Map.of("a", Value.newBuilder().setNumberValue(2.0).build(), "message", Value.newBuilder().setNumberValue(3.0).build())).build()).build())).build());
    }

    @Test
    public void testEngineCloseSuccessfullyWithoutEventsQueued() throws IOException {
        FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
        int maxBatchSize = 6;
        EventSenderEngineImpl engine = new EventSenderEngineImpl(6, (EventUploader)alwaysSucceedUploader, (Clock)this.clock, EventSenderEngineImpl.DEFAULT_MAX_FLUSH_INTERVAL, 0x40000000L);
        engine.close();
        Assertions.assertThat((int)alwaysSucceedUploader.uploadCalls.size()).isEqualTo(0);
    }

    @Test
    public void testEngineUploadsTriggeredByFlushTimeout() throws IOException, InterruptedException {
        FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
        int maxBatchSize = 6;
        EventSenderEngineImpl engine = new EventSenderEngineImpl(6, (EventUploader)alwaysSucceedUploader, (Clock)this.clock, Duration.ofMillis(100L), 0x40000000L);
        engine.emit("navigate", ConfidenceValue.of((Map)ImmutableMap.of((Object)"key", (Object)ConfidenceValue.of((String)"size"))), Optional.empty());
        Thread.sleep(300L);
        Assertions.assertThat((int)alwaysSucceedUploader.uploadCalls.size()).isEqualTo(1);
        Assertions.assertThat((int)alwaysSucceedUploader.uploadCalls.peek().size()).isEqualTo(1);
        engine.close();
    }

    @Test
    public void testFlushForcesUploadsDespiteBatchLimits() throws IOException, InterruptedException {
        FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
        EventSenderEngineImpl engine = new EventSenderEngineImpl(10000, (EventUploader)alwaysSucceedUploader, (Clock)this.clock, Duration.ofMillis(10000L), 0x40000000L);
        engine.emit("navigate", ConfidenceValue.of((Map)ImmutableMap.of((Object)"key", (Object)ConfidenceValue.of((String)"size"))), Optional.empty());
        engine.flush();
        Thread.sleep(100L);
        Assertions.assertThat((int)alwaysSucceedUploader.uploadCalls.size()).isEqualTo(1);
        Assertions.assertThat((int)alwaysSucceedUploader.uploadCalls.peek().size()).isEqualTo(1);
        engine.close();
    }

    @Test
    public void testEngineUploadsWhenIntermittentErrorWillRetry() throws IOException {
        int maxBatchSize = 3;
        int numEvents = 14;
        List<Integer> failAtUploadWithIndex = List.of(Integer.valueOf(2), Integer.valueOf(5));
        FakeUploader fakeUploader = new FakeUploader(failAtUploadWithIndex);
        EventSenderEngineImpl engine = new EventSenderEngineImpl(3, (EventUploader)fakeUploader, (Clock)this.clock, EventSenderEngineImpl.DEFAULT_MAX_FLUSH_INTERVAL, 0x40000000L);
        for (int i = 0; i < 14; ++i) {
            engine.emit("test", ConfidenceValue.Struct.EMPTY, Optional.of(ConfidenceValue.of((Map)ImmutableMap.of((Object)"id", (Object)ConfidenceValue.of((int)i)))));
        }
        engine.close();
        boolean additionalBatch = true;
        Assertions.assertThat((int)fakeUploader.uploadCalls.size()).isEqualTo(5 + failAtUploadWithIndex.size());
        Set uniqueEventIds = fakeUploader.uploadCalls.stream().flatMap(Collection::stream).map(event -> (Value)event.getPayload().getFieldsMap().get("id")).collect(Collectors.toSet());
        Assertions.assertThat((int)uniqueEventIds.size()).isEqualTo(14);
    }

    @Test
    public void multiThreadTest() throws IOException {
        int numberOfEvents = 100000;
        int maxBatchSize = 30;
        FakeUploader alwaysSucceedUploader = new FakeUploader();
        EventSenderEngineImpl engine = new EventSenderEngineImpl(30, (EventUploader)alwaysSucceedUploader, (Clock)this.clock, EventSenderEngineImpl.DEFAULT_MAX_FLUSH_INTERVAL, 0x40000000L);
        CompletableFuture[] eventTasks = new CompletableFuture[100000];
        Stopwatch timer = Stopwatch.createStarted();
        for (int i = 0; i < 100000; ++i) {
            eventTasks[i] = CompletableFuture.runAsync(() -> EventSenderEngineTest.lambda$multiThreadTest$3((EventSenderEngine)engine));
        }
        CompletableFuture.allOf(eventTasks).join();
        engine.close();
        System.out.println("Finished in (ms): " + timer.elapsed(TimeUnit.MILLISECONDS));
        int uploadedEventCount = alwaysSucceedUploader.uploadCalls.stream().mapToInt(batch -> batch.size()).sum();
        Assertions.assertThat((int)uploadedEventCount).isEqualTo(100000);
        boolean additionalBatch = true;
        int expectedNumberOfBatches = 3334;
        Assertions.assertThat((int)alwaysSucceedUploader.uploadCalls.size()).isEqualTo(3334);
    }

    @Test
    public void testUnsentEventsAreCancelledOnThreadInterrupted() throws Exception {
        CompletableFuture batchResult = new CompletableFuture();
        CompletableFuture isUploadCalled = new CompletableFuture();
        EventUploader fakeUploader = events -> {
            isUploadCalled.complete(null);
            return batchResult;
        };
        EventSenderEngineImpl engine = new EventSenderEngineImpl(10, fakeUploader, (Clock)this.clock, Duration.ofMillis(10L), 1024L);
        engine.emit("fake", ConfidenceValue.Struct.EMPTY, Optional.empty());
        isUploadCalled.join();
        Thread.currentThread().interrupt();
        engine.close();
        Assertions.assertThat((boolean)batchResult.isCancelled()).isTrue();
    }

    @Test
    public void testEngineWillRejectEventsIfOverMemoryThreshold() throws IOException {
        Event expectedEvent = EventUploader.event((String)"navigate", (ConfidenceValue.Struct)ConfidenceValue.Struct.EMPTY, Optional.empty()).setEventTime(this.clock.getTimestamp()).build();
        FakeUploader fakeUploader = new FakeUploader();
        EventSenderEngineImpl engine = new EventSenderEngineImpl(10, (EventUploader)fakeUploader, (Clock)this.clock, EventSenderEngineImpl.DEFAULT_MAX_FLUSH_INTERVAL, (long)expectedEvent.getSerializedSize());
        engine.emit("navigate", ConfidenceValue.of(Map.of()), Optional.empty());
        Assertions.assertThat((long)engine.getEstimatedMemoryConsumption()).isEqualTo((long)expectedEvent.getSerializedSize());
        engine.emit("navigate", ConfidenceValue.of(Map.of()), Optional.empty());
        engine.close();
        Assertions.assertThat((int)fakeUploader.uploadCalls.size()).isEqualTo(1);
    }

    @Test
    public void testEngineThrowsExceptionWhenMaxFlushIntervalIsZero() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> new EventSenderEngineImpl(10, (EventUploader)new FakeUploader(), (Clock)this.clock, Duration.ZERO, 0x40000000L)).isInstanceOf(IllegalArgumentException.class)).hasMessage("maxFlushInterval must be positive");
    }

    private static /* synthetic */ void lambda$multiThreadTest$3(EventSenderEngine engine) {
        engine.emit("navigate", ConfidenceValue.of((Map)ImmutableMap.of((Object)"key", (Object)ConfidenceValue.of((String)"size"))), Optional.empty());
    }
}

