package com.networknt.eventuate.server.test.util;

import com.networknt.config.Config;
import com.networknt.eventuate.common.EntityIdAndType;
import com.networknt.eventuate.common.Int128;
import com.networknt.eventuate.common.impl.EntityIdVersionAndEventIds;
import com.networknt.eventuate.common.impl.EventTypeAndData;
import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.eventuate.kafka.KafkaConfig;
import com.networknt.eventuate.server.common.BinlogFileOffset;
import com.networknt.eventuate.server.common.CdcConfig;
import com.networknt.eventuate.server.common.PublishedEvent;
import com.networknt.eventuate.server.jdbckafkastore.EventuateLocalAggregateCrud;
import com.networknt.eventuate.test.domain.Account;
import com.networknt.eventuate.test.domain.AccountCreatedEvent;
import com.networknt.eventuate.test.domain.AccountDebitedEvent;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;

/* loaded from: input_file:com/networknt/eventuate/server/test/util/AbstractCdcTest.class */
public class AbstractCdcTest {
    public static String CDC_CONFIG_NAME = "cdc";
    public static CdcConfig cdcConfig = (CdcConfig) Config.getInstance().getJsonObjectConfig(CDC_CONFIG_NAME, CdcConfig.class);
    public static String KAFKA_CONFIG_NAME = "kafka";
    public static KafkaConfig kafkaConfig = (KafkaConfig) Config.getInstance().getJsonObjectConfig(KAFKA_CONFIG_NAME, KafkaConfig.class);

    public String generateAccountCreatedEvent() {
        return JSonMapper.toJson(new AccountCreatedEvent(new BigDecimal(System.currentTimeMillis())));
    }

    public String generateAccountDebitedEvent() {
        return JSonMapper.toJson(new AccountDebitedEvent(new BigDecimal(System.currentTimeMillis()), (String) null));
    }

    public BinlogFileOffset generateBinlogFileOffset() {
        long currentTimeMillis = System.currentTimeMillis();
        return new BinlogFileOffset("binlog.filename." + currentTimeMillis, currentTimeMillis);
    }

    public String generateUniqueTopicName() {
        return "test_topic_" + System.currentTimeMillis();
    }

    public String getEventTopicName() {
        return Account.class.getTypeName();
    }

    public EntityIdVersionAndEventIds saveEvent(EventuateLocalAggregateCrud eventuateLocalAggregateCrud, String str) {
        return eventuateLocalAggregateCrud.save(Account.class.getTypeName(), Collections.singletonList(new EventTypeAndData(AccountCreatedEvent.class.getTypeName(), str, Optional.empty())), Optional.empty());
    }

    public EntityIdVersionAndEventIds updateEvent(String str, Int128 int128, EventuateLocalAggregateCrud eventuateLocalAggregateCrud, String str2) {
        return eventuateLocalAggregateCrud.update(new EntityIdAndType(str, Account.class.getTypeName()), int128, Collections.singletonList(new EventTypeAndData(AccountCreatedEvent.class.getTypeName(), str2, Optional.empty())), Optional.empty());
    }

    public KafkaConsumer<String, String> createConsumer(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("auto.offset.reset", "earliest");
        properties.put("group.id", UUID.randomUUID().toString());
        properties.put("enable.auto.commit", "false");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<>(properties);
    }

    public Producer<String, String> createProducer(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer(properties);
    }

    public PublishedEvent waitForEvent(BlockingQueue<PublishedEvent> blockingQueue, Int128 int128, LocalDateTime localDateTime, String str) throws InterruptedException {
        while (LocalDateTime.now().isBefore(localDateTime)) {
            PublishedEvent poll = blockingQueue.poll(ChronoUnit.MILLIS.between(localDateTime, LocalDateTime.now()), TimeUnit.MILLISECONDS);
            if (poll != null && poll.getId().equals(int128.asString()) && str.equals(poll.getEventData())) {
                return poll;
            }
        }
        throw new RuntimeException("event not found: " + int128);
    }

    public void waitForEventInKafka(KafkaConsumer<String, String> kafkaConsumer, String str, LocalDateTime localDateTime) throws InterruptedException {
        while (LocalDateTime.now().isBefore(localDateTime)) {
            ConsumerRecords poll = kafkaConsumer.poll(ChronoUnit.MILLIS.between(LocalDateTime.now(), localDateTime));
            if (!poll.isEmpty()) {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    if (((String) ((ConsumerRecord) it.next()).key()).equals(str)) {
                        return;
                    }
                }
            }
        }
        throw new RuntimeException("entity not found: " + str);
    }
}
