package com.networknt.eventuate.server.test.util;

import com.networknt.config.Config;
import com.networknt.eventuate.common.impl.EntityIdVersionAndEventIds;
import com.networknt.eventuate.jdbc.EventuateJdbcAccess;
import com.networknt.eventuate.kafka.KafkaConfig;
import com.networknt.eventuate.server.common.CdcKafkaPublisher;
import com.networknt.eventuate.server.common.CdcProcessor;
import com.networknt.eventuate.server.common.PublishedEvent;
import com.networknt.eventuate.server.common.PublishingStrategy;
import com.networknt.eventuate.server.jdbckafkastore.EventuateLocalAggregateCrud;
import com.networknt.service.SingletonServiceFactory;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Objects;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:com/networknt/eventuate/server/test/util/CdcKafkaPublisherTest.class */
public abstract class CdcKafkaPublisherTest extends AbstractCdcTest {
    private static final String CONFIG_NAME = "kafka";
    private static final KafkaConfig config = (KafkaConfig) Config.getInstance().getJsonObjectConfig(CONFIG_NAME, KafkaConfig.class);
    protected EventuateJdbcAccess eventuateJdbcAccess = (EventuateJdbcAccess) SingletonServiceFactory.getBean(EventuateJdbcAccess.class);
    protected CdcProcessor<PublishedEvent> cdcProcessor = (CdcProcessor) SingletonServiceFactory.getBean(CdcProcessor.class);
    protected PublishingStrategy<PublishedEvent> publishingStrategy = (PublishingStrategy) SingletonServiceFactory.getBean(PublishingStrategy.class);
    protected EventuateLocalAggregateCrud localAggregateCrud;

    @Before
    public void init() {
        this.localAggregateCrud = new EventuateLocalAggregateCrud(this.eventuateJdbcAccess);
    }

    @Test
    public void shouldSendPublishedEventsToKafka() throws InterruptedException {
        CdcKafkaPublisher<PublishedEvent> createCdcKafkaPublisher = createCdcKafkaPublisher();
        createCdcKafkaPublisher.start();
        CdcProcessor<PublishedEvent> cdcProcessor = this.cdcProcessor;
        Objects.requireNonNull(createCdcKafkaPublisher);
        cdcProcessor.start((v1) -> {
            r1.handleEvent(v1);
        });
        EntityIdVersionAndEventIds saveEvent = saveEvent(this.localAggregateCrud, generateAccountCreatedEvent());
        KafkaConsumer<String, String> createConsumer = createConsumer(config.getBootstrapServers());
        createConsumer.partitionsFor(getEventTopicName());
        createConsumer.subscribe(Collections.singletonList(getEventTopicName()));
        waitForEventInKafka(createConsumer, saveEvent.getEntityId(), LocalDateTime.now().plusSeconds(20L));
        createCdcKafkaPublisher.stop();
    }

    protected abstract CdcKafkaPublisher<PublishedEvent> createCdcKafkaPublisher();
}
