package com.networknt.eventuate.server.test.util;

import com.networknt.eventuate.common.Int128;
import com.networknt.eventuate.common.impl.EntityIdVersionAndEventIds;
import com.networknt.eventuate.jdbc.EventuateJdbcAccess;
import com.networknt.eventuate.server.common.CdcProcessor;
import com.networknt.eventuate.server.common.PublishedEvent;
import com.networknt.eventuate.server.jdbckafkastore.EventuateLocalAggregateCrud;
import com.networknt.service.SingletonServiceFactory;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:com/networknt/eventuate/server/test/util/CdcProcessorTest.class */
public abstract class CdcProcessorTest extends AbstractCdcTest {
    protected EventuateJdbcAccess eventuateJdbcAccess = (EventuateJdbcAccess) SingletonServiceFactory.getBean(EventuateJdbcAccess.class);
    protected EventuateLocalAggregateCrud localAggregateCrud;

    @Before
    public void init() {
        this.localAggregateCrud = new EventuateLocalAggregateCrud(this.eventuateJdbcAccess);
    }

    @Test
    public void shouldReadNewEventsOnly() throws InterruptedException {
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        CdcProcessor<PublishedEvent> createCdcProcessor = createCdcProcessor();
        createCdcProcessor.start(publishedEvent -> {
            linkedBlockingDeque.add(publishedEvent);
            onEventSent(publishedEvent);
        });
        String generateAccountCreatedEvent = generateAccountCreatedEvent();
        EntityIdVersionAndEventIds saveEvent = saveEvent(this.localAggregateCrud, generateAccountCreatedEvent);
        waitForEvent(linkedBlockingDeque, saveEvent.getEntityVersion(), LocalDateTime.now().plusSeconds(10L), generateAccountCreatedEvent);
        createCdcProcessor.stop();
        Thread.sleep(10000L);
        linkedBlockingDeque.clear();
        createCdcProcessor.start(publishedEvent2 -> {
            linkedBlockingDeque.add(publishedEvent2);
            onEventSent(publishedEvent2);
        });
        List<String> list = (List) saveEvent.getEventIds().stream().map((v0) -> {
            return v0.asString();
        }).collect(Collectors.toList());
        String generateAccountCreatedEvent2 = generateAccountCreatedEvent();
        waitForEventExcluding(linkedBlockingDeque, saveEvent(this.localAggregateCrud, generateAccountCreatedEvent2).getEntityVersion(), LocalDateTime.now().plusSeconds(10L), generateAccountCreatedEvent2, list);
        createCdcProcessor.stop();
    }

    @Test
    public void shouldReadUnprocessedEventsAfterStartup() throws InterruptedException {
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        String generateAccountCreatedEvent = generateAccountCreatedEvent();
        EntityIdVersionAndEventIds saveEvent = saveEvent(this.localAggregateCrud, generateAccountCreatedEvent);
        CdcProcessor<PublishedEvent> createCdcProcessor = createCdcProcessor();
        Objects.requireNonNull(linkedBlockingDeque);
        createCdcProcessor.start((v1) -> {
            r1.add(v1);
        });
        waitForEvent(linkedBlockingDeque, saveEvent.getEntityVersion(), LocalDateTime.now().plusSeconds(20L), generateAccountCreatedEvent);
        createCdcProcessor.stop();
    }

    private PublishedEvent waitForEventExcluding(BlockingQueue<PublishedEvent> blockingQueue, Int128 int128, LocalDateTime localDateTime, String str, List<String> list) throws InterruptedException {
        PublishedEvent publishedEvent = null;
        while (LocalDateTime.now().isBefore(localDateTime)) {
            PublishedEvent poll = blockingQueue.poll(ChronoUnit.MILLIS.between(localDateTime, LocalDateTime.now()), TimeUnit.MILLISECONDS);
            if (poll != null) {
                if (poll.getId().equals(int128.asString()) && str.equals(poll.getEventData())) {
                    publishedEvent = poll;
                }
                if (list.contains(poll.getId())) {
                    throw new RuntimeException("Wrong event found in the queue");
                }
            }
        }
        if (publishedEvent != null) {
            return publishedEvent;
        }
        throw new RuntimeException("event not found: " + int128);
    }

    protected abstract CdcProcessor<PublishedEvent> createCdcProcessor();

    protected void onEventSent(PublishedEvent publishedEvent) {
    }
}
