package com.redis.spring.batch.test;

import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.api.sync.RedisModulesCommands;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.FlushingStepBuilder;
import com.redis.spring.batch.common.JobFactory;
import com.redis.spring.batch.common.PollableItemReader;
import com.redis.spring.batch.gen.GeneratorItemReader;
import com.redis.spring.batch.gen.Item;
import com.redis.spring.batch.gen.ItemToKeyValueFunction;
import com.redis.spring.batch.gen.Range;
import com.redis.spring.batch.gen.StreamOptions;
import com.redis.spring.batch.operation.Operation;
import com.redis.spring.batch.reader.StreamItemReader;
import com.redis.spring.batch.util.Await;
import com.redis.spring.batch.util.BatchUtils;
import com.redis.testcontainers.RedisServer;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.Consumer;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.function.BooleanSupplier;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestInstance;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
import org.testcontainers.lifecycle.Startable;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:com/redis/spring/batch/test/AbstractTestBase.class */
public abstract class AbstractTestBase {
    public static final int DEFAULT_CHUNK_SIZE = 50;
    public static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofMillis(500);
    public static final Duration DEFAULT_POLL_DELAY = Duration.ZERO;
    public static final Duration DEFAULT_AWAIT_POLL_INTERVAL = Duration.ofMillis(1);
    public static final Duration DEFAULT_AWAIT_TIMEOUT = Duration.ofSeconds(3);
    protected static final ItemProcessor<Item, KeyValue<String, Object>> genItemProcessor = new FunctionItemProcessor(new ItemToKeyValueFunction());
    private int chunkSize = 50;
    private Duration idleTimeout = DEFAULT_IDLE_TIMEOUT;
    private Duration pollDelay = DEFAULT_POLL_DELAY;
    private Duration awaitPollInterval = DEFAULT_AWAIT_POLL_INTERVAL;
    private Duration awaitTimeout = DEFAULT_AWAIT_TIMEOUT;
    protected RedisURI redisURI;
    protected AbstractRedisClient redisClient;
    protected StatefulRedisModulesConnection<String, String> redisConnection;
    protected RedisModulesCommands<String, String> redisCommands;
    protected JobFactory jobFactory;

    public static RedisURI redisURI(RedisServer redisServer) {
        return RedisURI.create(redisServer.getRedisURI());
    }

    public void setIdleTimeout(Duration duration) {
        this.idleTimeout = duration;
    }

    @BeforeAll
    void setup() throws Exception {
        Startable redisServer = getRedisServer();
        if (redisServer instanceof Startable) {
            redisServer.start();
        }
        this.redisURI = redisURI(redisServer);
        this.redisClient = client(redisServer);
        this.redisConnection = RedisModulesUtils.connection(this.redisClient);
        this.redisCommands = this.redisConnection.sync();
        this.jobFactory = new JobFactory();
        this.jobFactory.setName(UUID.randomUUID().toString());
        this.jobFactory.afterPropertiesSet();
    }

    @AfterAll
    void teardown() {
        if (this.redisConnection != null) {
            this.redisConnection.close();
        }
        if (this.redisClient != null) {
            this.redisClient.shutdown();
            this.redisClient.getResources().shutdown();
        }
        Startable redisServer = getRedisServer();
        if (redisServer instanceof Startable) {
            redisServer.stop();
        }
    }

    @BeforeEach
    void flushAll() throws InterruptedException {
        this.redisCommands.flushall();
        awaitUntil(() -> {
            return this.redisCommands.pubsubNumpat().longValue() == 0;
        });
    }

    public static TestInfo testInfo(TestInfo testInfo, String... strArr) {
        return new SimpleTestInfo(testInfo, strArr);
    }

    public static <T> List<T> readAllAndClose(ItemStreamReader<T> itemStreamReader) throws Exception {
        try {
            itemStreamReader.open(new ExecutionContext());
            return readAll(itemStreamReader);
        } finally {
            itemStreamReader.close();
        }
    }

    public static <T> List<T> readAll(ItemReader<T> itemReader) throws Exception {
        ArrayList arrayList = new ArrayList();
        while (true) {
            Object read = itemReader.read();
            if (read == null) {
                return arrayList;
            }
            arrayList.add(read);
        }
    }

    public static void assertDbNotEmpty(RedisModulesCommands<String, String> redisModulesCommands) {
        Assertions.assertTrue(redisModulesCommands.dbsize().longValue() > 0, "Redis database is empty");
    }

    protected GeneratorItemReader generator(int i, Item.Type... typeArr) {
        GeneratorItemReader generatorItemReader = new GeneratorItemReader();
        generatorItemReader.setMaxItemCount(i);
        if (!ObjectUtils.isEmpty(typeArr)) {
            generatorItemReader.setTypes(typeArr);
        }
        return generatorItemReader;
    }

    protected void live(RedisItemReader<?, ?, ?> redisItemReader) {
        redisItemReader.setMode(RedisItemReader.ReaderMode.LIVE);
        redisItemReader.setIdleTimeout(this.idleTimeout);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void configure(TestInfo testInfo, RedisItemReader<?, ?, ?> redisItemReader, String... strArr) {
        ArrayList arrayList = new ArrayList(Arrays.asList(strArr));
        arrayList.add("reader");
        redisItemReader.setName(name(testInfo(testInfo, (String[]) arrayList.toArray(new String[0]))));
        redisItemReader.setJobFactory(this.jobFactory);
        redisItemReader.setClient(this.redisClient);
    }

    protected RedisItemReader<byte[], byte[], KeyValue<byte[], byte[]>> dumpReader(TestInfo testInfo, String... strArr) {
        RedisItemReader<byte[], byte[], KeyValue<byte[], byte[]>> dump = RedisItemReader.dump();
        configure(testInfo, dump, strArr);
        return dump;
    }

    protected RedisItemReader<String, String, KeyValue<String, Object>> structReader(TestInfo testInfo, String... strArr) {
        RedisItemReader<String, String, KeyValue<String, Object>> struct = RedisItemReader.struct();
        configure(testInfo, struct, strArr);
        return struct;
    }

    protected int keyCount(String str) {
        return this.redisCommands.keys(str).size();
    }

    protected void awaitPubSub() {
        try {
            Await.await().until(() -> {
                return this.redisCommands.pubsubNumpat().longValue() > 0;
            });
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted", e);
        }
    }

    public int getChunkSize() {
        return this.chunkSize;
    }

    public void setChunkSize(int i) {
        this.chunkSize = i;
    }

    public Duration getPollDelay() {
        return this.pollDelay;
    }

    public Duration getAwaitPollInterval() {
        return this.awaitPollInterval;
    }

    public Duration getAwaitTimeout() {
        return this.awaitTimeout;
    }

    public Duration getIdleTimeout() {
        return this.idleTimeout;
    }

    public void setPollDelay(Duration duration) {
        this.pollDelay = duration;
    }

    public void setAwaitPollInterval(Duration duration) {
        this.awaitPollInterval = duration;
    }

    public void setAwaitTimeout(Duration duration) {
        this.awaitTimeout = duration;
    }

    protected abstract RedisServer getRedisServer();

    protected <I, O> SimpleStepBuilder<I, O> step(TestInfo testInfo, ItemReader<I> itemReader, ItemWriter<O> itemWriter) {
        return step(testInfo, itemReader, null, itemWriter);
    }

    protected <I, O> SimpleStepBuilder<I, O> step(TestInfo testInfo, ItemReader<I> itemReader, ItemProcessor<I, O> itemProcessor, ItemWriter<O> itemWriter) {
        return step(testInfo, this.chunkSize, itemReader, itemProcessor, itemWriter);
    }

    protected <I, O> SimpleStepBuilder<I, O> step(TestInfo testInfo, int i, ItemReader<I> itemReader, ItemProcessor<I, O> itemProcessor, ItemWriter<O> itemWriter) {
        SimpleStepBuilder<I, O> step = this.jobFactory.step(name(testInfo), i);
        step.reader(itemReader);
        step.processor(itemProcessor);
        step.writer(itemWriter);
        return step;
    }

    public static String name(TestInfo testInfo) {
        String replace = testInfo.getDisplayName().replace("(TestInfo)", "");
        if (testInfo.getTestClass().isPresent()) {
            replace = replace + "-" + ClassUtils.getShortName((Class) testInfo.getTestClass().get());
        }
        return replace;
    }

    public static AbstractRedisClient client(RedisServer redisServer) {
        return redisServer.isRedisCluster() ? RedisModulesClusterClient.create(redisServer.getRedisURI()) : RedisModulesClient.create(redisServer.getRedisURI());
    }

    public void awaitRunning(JobExecution jobExecution) throws InterruptedException {
        Objects.requireNonNull(jobExecution);
        awaitUntil(jobExecution::isRunning);
    }

    public void awaitTermination(JobExecution jobExecution) throws InterruptedException {
        Objects.requireNonNull(jobExecution);
        awaitUntilFalse(jobExecution::isRunning);
    }

    protected void awaitUntilFalse(BooleanSupplier booleanSupplier) throws InterruptedException {
        awaitUntil(() -> {
            return !booleanSupplier.getAsBoolean();
        });
    }

    protected void awaitUntil(BooleanSupplier booleanSupplier) throws InterruptedException {
        Await.await().initialDelay(this.pollDelay).delay(this.awaitPollInterval).timeout(this.awaitTimeout).until(booleanSupplier);
    }

    protected JobBuilder job(TestInfo testInfo) {
        return this.jobFactory.jobBuilder(name(testInfo));
    }

    protected <I, O> void generateAsync(TestInfo testInfo, GeneratorItemReader generatorItemReader) {
        Executors.newSingleThreadExecutor().execute(() -> {
            try {
                awaitPubSub();
                generate(testInfo, generatorItemReader);
            } catch (Exception e) {
                throw new RuntimeException("Could not run data gen", e);
            }
        });
    }

    protected void generate(TestInfo testInfo, GeneratorItemReader generatorItemReader) throws JobExecutionException, InterruptedException {
        generate(testInfo, this.redisClient, generatorItemReader);
    }

    protected void generate(TestInfo testInfo, AbstractRedisClient abstractRedisClient, GeneratorItemReader generatorItemReader) throws JobExecutionException, InterruptedException {
        TestInfo testInfo2 = testInfo(testInfo, "generate");
        RedisItemWriter struct = RedisItemWriter.struct(StringCodec.UTF8);
        struct.setClient(abstractRedisClient);
        run(testInfo2, generatorItemReader, (ItemWriter<KeyValue<String, Object>>) struct);
    }

    protected void run(TestInfo testInfo, GeneratorItemReader generatorItemReader, ItemWriter<KeyValue<String, Object>> itemWriter) throws JobExecutionException, InterruptedException {
        run(testInfo, generatorItemReader, genItemProcessor, itemWriter);
    }

    protected <T> JobExecution run(TestInfo testInfo, ItemReader<T> itemReader, ItemWriter<T> itemWriter) throws JobExecutionException, InterruptedException {
        return run(testInfo, itemReader, null, itemWriter);
    }

    protected <I, O> JobExecution run(TestInfo testInfo, ItemReader<I> itemReader, ItemProcessor<I, O> itemProcessor, ItemWriter<O> itemWriter) throws JobExecutionException, InterruptedException {
        return run(testInfo, step(testInfo, itemReader, itemProcessor, itemWriter));
    }

    protected <I, O> JobExecution run(TestInfo testInfo, SimpleStepBuilder<I, O> simpleStepBuilder) throws JobExecutionException, InterruptedException {
        return run(job(testInfo).start(faultTolerant(simpleStepBuilder).build()).build());
    }

    protected <I, O> FaultTolerantStepBuilder<I, O> faultTolerant(SimpleStepBuilder<I, O> simpleStepBuilder) {
        return simpleStepBuilder.faultTolerant().retryPolicy(new MaxAttemptsRetryPolicy()).retry(RedisCommandTimeoutException.class);
    }

    protected JobExecution run(Job job) throws JobExecutionException, InterruptedException {
        JobExecution run = this.jobFactory.run(job);
        Objects.requireNonNull(run);
        awaitUntilFalse(run::isRunning);
        return run;
    }

    protected void enableKeyspaceNotifications() {
        this.redisCommands.configSet("notify-keyspace-events", "AKE");
    }

    protected <I, O> FlushingStepBuilder<I, O> flushingStep(TestInfo testInfo, PollableItemReader<I> pollableItemReader, ItemWriter<O> itemWriter) {
        return new FlushingStepBuilder(step(testInfo, pollableItemReader, itemWriter)).idleTimeout(this.idleTimeout);
    }

    protected void generateStreams(TestInfo testInfo, int i) throws Exception {
        GeneratorItemReader generator = generator(3, Item.Type.STREAM);
        StreamOptions streamOptions = new StreamOptions();
        streamOptions.setMessageCount(Range.of(i));
        generator.setStreamOptions(streamOptions);
        generate(testInfo, generator);
    }

    protected StreamItemReader<String, String> streamReader(TestInfo testInfo, String str, Consumer<String> consumer) {
        StreamItemReader<String, String> streamItemReader = new StreamItemReader<>(this.redisClient, StringCodec.UTF8, str, consumer);
        streamItemReader.setName(name(testInfo(testInfo, "stream-reader")));
        return streamItemReader;
    }

    protected void assertMessageBody(List<? extends StreamMessage<String, String>> list) {
        for (StreamMessage<String, String> streamMessage : list) {
            Assertions.assertTrue(streamMessage.getBody().containsKey("field1"));
            Assertions.assertTrue(streamMessage.getBody().containsKey("field2"));
        }
    }

    protected void assertStreamEquals(String str, Map<String, String> map, String str2, StreamMessage<String, String> streamMessage) {
        Assertions.assertEquals(str, streamMessage.getId());
        Assertions.assertEquals(map, streamMessage.getBody());
        Assertions.assertEquals(str2, streamMessage.getStream());
    }

    protected Map<String, String> map(String... strArr) {
        Assert.notNull(strArr, "Args cannot be null");
        Assert.isTrue(strArr.length % 2 == 0, "Args length is not a multiple of 2");
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (int i = 0; i < strArr.length / 2; i++) {
            linkedHashMap.put(strArr[i * 2], strArr[(i * 2) + 1]);
        }
        return linkedHashMap;
    }

    protected byte[] toByteArray(String str) {
        return (byte[]) BatchUtils.toByteArrayKeyFunction(StringCodec.UTF8).apply(str);
    }

    protected String toString(byte[] bArr) {
        return (String) BatchUtils.toStringKeyFunction(ByteArrayCodec.INSTANCE).apply(bArr);
    }

    protected RedisItemReader<byte[], byte[], KeyValue<byte[], byte[]>> dumpReader(TestInfo testInfo) {
        RedisItemReader<byte[], byte[], KeyValue<byte[], byte[]>> dump = RedisItemReader.dump();
        configure(testInfo, dump, new String[0]);
        return dump;
    }

    protected <K, V> RedisItemReader<K, V, KeyValue<K, Object>> structReader(TestInfo testInfo, RedisCodec<K, V> redisCodec) {
        RedisItemReader<K, V, KeyValue<K, Object>> struct = RedisItemReader.struct(redisCodec);
        configure(testInfo, struct, new String[0]);
        return struct;
    }

    protected <T> RedisItemWriter<String, String, T> writer(Operation<String, String, T, Object> operation) {
        RedisItemWriter<String, String, T> operation2 = RedisItemWriter.operation(operation);
        operation2.setClient(this.redisClient);
        return operation2;
    }
}
