package com.ververica.cdc.connectors.postgres.source;

import com.ververica.cdc.connectors.postgres.PostgresTestBase;
import com.ververica.cdc.connectors.postgres.testutils.UniqueDatabase;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConnection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.class */
public class PostgresSourceITCase extends PostgresTestBase {
    private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
    protected static final int DEFAULT_PARALLELISM = 4;
    private static final String DB_NAME_PREFIX = "postgres";
    private static final String SCHEMA_NAME = "customer";

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(300);

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
    private final UniqueDatabase customDatabase = new UniqueDatabase(POSTGRES_CONTAINER, "postgres", SCHEMA_NAME, POSTGRES_CONTAINER.getUsername(), POSTGRES_CONTAINER.getPassword());
    private final List<String> firstPartStreamEvents = Arrays.asList("-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]");
    private final List<String> secondPartStreamEvents = Arrays.asList("-U[1010, user_11, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]", "+U[1010, user_11, Hangzhou, 123567891234]");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase$FailoverPhase.class */
    public enum FailoverPhase {
        SNAPSHOT,
        STREAM,
        NEVER
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase$FailoverType.class */
    public enum FailoverType {
        TM,
        JM,
        NONE
    }

    @Test
    public void testReadSingleTableWithSingleParallelism() throws Exception {
        testPostgresParallelSource(1, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"customers"});
    }

    @Test
    public void testReadSingleTableWithMultipleParallelism() throws Exception {
        testPostgresParallelSource(DEFAULT_PARALLELISM, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"customers"});
    }

    @Test
    public void testReadMultipleTableWithSingleParallelism() throws Exception {
        testPostgresParallelSource(1, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testReadMultipleTableWithMultipleParallelism() throws Exception {
        testPostgresParallelSource(DEFAULT_PARALLELISM, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverInSnapshotPhase() throws Exception {
        testPostgresParallelSource(FailoverType.TM, FailoverPhase.SNAPSHOT, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverInStreamPhase() throws Exception {
        testPostgresParallelSource(FailoverType.TM, FailoverPhase.STREAM, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testJobManagerFailoverInSnapshotPhase() throws Exception {
        testPostgresParallelSource(FailoverType.JM, FailoverPhase.SNAPSHOT, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testJobManagerFailoverInStreamPhase() throws Exception {
        testPostgresParallelSource(FailoverType.JM, FailoverPhase.STREAM, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverSingleParallelism() throws Exception {
        testPostgresParallelSource(1, FailoverType.TM, FailoverPhase.SNAPSHOT, new String[]{"customers"});
    }

    @Test
    public void testJobManagerFailoverSingleParallelism() throws Exception {
        testPostgresParallelSource(1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[]{"customers"});
    }

    @Test
    public void testConsumingTableWithoutPrimaryKey() {
        try {
            testPostgresParallelSource(1, DEFAULT_SCAN_STARTUP_MODE, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"customers_no_pk"}, RestartStrategies.noRestart());
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, String.format("Incremental snapshot for tables requires primary key, but table %s doesn't have primary key", "customer.customers_no_pk")).isPresent());
        }
    }

    @Test
    public void testDebeziumSlotDropOnStop() throws Exception {
        this.customDatabase.createAndInitialize();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        executionEnvironment.setParallelism(2);
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        create.executeSql(String.format("CREATE TABLE customers ( id BIGINT NOT NULL, name STRING, address STRING, phone_number STRING, primary key (id) not enforced) WITH ( 'connector' = 'postgres-cdc', 'scan.incremental.snapshot.enabled' = 'true', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s', 'scan.startup.mode' = '%s', 'scan.incremental.snapshot.chunk.size' = '100', 'slot.name' = '%s',  'debezium.slot.drop.on.stop' = 'true')", this.customDatabase.getHost(), Integer.valueOf(this.customDatabase.getDatabasePort()), this.customDatabase.getUsername(), this.customDatabase.getPassword(), this.customDatabase.getDatabaseName(), SCHEMA_NAME, "customers", DEFAULT_SCAN_STARTUP_MODE, getSlotName()));
        TableResult executeSql = create.executeSql("select * from customers");
        if (DEFAULT_SCAN_STARTUP_MODE.equals(DEFAULT_SCAN_STARTUP_MODE)) {
            checkSnapshotData(executeSql, FailoverType.JM, FailoverPhase.STREAM, new String[]{"customers"});
        }
        checkStreamDataWithDDLDuringFailover(executeSql, FailoverType.JM, FailoverPhase.STREAM, new String[]{"customers"});
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    private void testPostgresParallelSource(FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr) throws Exception {
        testPostgresParallelSource(DEFAULT_PARALLELISM, failoverType, failoverPhase, strArr);
    }

    private void testPostgresParallelSource(int i, FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr) throws Exception {
        testPostgresParallelSource(i, DEFAULT_SCAN_STARTUP_MODE, failoverType, failoverPhase, strArr, RestartStrategies.fixedDelayRestart(1, 0L));
    }

    private void testPostgresParallelSource(int i, String str, FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr, RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) throws Exception {
        this.customDatabase.createAndInitialize();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.setRestartStrategy(restartStrategyConfiguration);
        create.executeSql(String.format("CREATE TABLE customers ( id BIGINT NOT NULL, name STRING, address STRING, phone_number STRING, primary key (id) not enforced) WITH ( 'connector' = 'postgres-cdc', 'scan.incremental.snapshot.enabled' = 'true', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s', 'scan.startup.mode' = '%s', 'scan.incremental.snapshot.chunk.size' = '100', 'slot.name' = '%s')", this.customDatabase.getHost(), Integer.valueOf(this.customDatabase.getDatabasePort()), this.customDatabase.getUsername(), this.customDatabase.getPassword(), this.customDatabase.getDatabaseName(), SCHEMA_NAME, getTableNameRegex(strArr), str, getSlotName()));
        TableResult executeSql = create.executeSql("select * from customers");
        if (DEFAULT_SCAN_STARTUP_MODE.equals(str)) {
            checkSnapshotData(executeSql, failoverType, failoverPhase, strArr);
        }
        checkStreamData(executeSql, failoverType, failoverPhase, strArr);
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    private void checkSnapshotData(TableResult tableResult, FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr) throws Exception {
        String[] strArr2 = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < strArr.length; i++) {
            arrayList.addAll(Arrays.asList(strArr2));
        }
        CloseableIterator collect = tableResult.collect();
        JobID jobID = ((JobClient) tableResult.getJobClient().get()).getJobID();
        if (failoverPhase == FailoverPhase.SNAPSHOT && collect.hasNext()) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(3000L);
            });
        }
        assertEqualsInAnyOrder(arrayList, fetchRows(collect, arrayList.size()));
    }

    private void checkStreamData(TableResult tableResult, FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr) throws Exception {
        waitUntilJobRunning(tableResult);
        CloseableIterator<?> collect = tableResult.collect();
        JobID jobID = ((JobClient) tableResult.getJobClient().get()).getJobID();
        for (String str : strArr) {
            makeFirstPartStreamEvents(getConnection(), this.customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + str);
        }
        Thread.sleep(2000L);
        if (failoverPhase == FailoverPhase.STREAM) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(200L);
            });
            waitUntilJobRunning(tableResult);
        }
        for (String str2 : strArr) {
            makeSecondPartStreamEvents(getConnection(), this.customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + str2);
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < strArr.length; i++) {
            arrayList.addAll(this.firstPartStreamEvents);
            arrayList.addAll(this.secondPartStreamEvents);
        }
        Thread.sleep(2000L);
        assertEqualsInAnyOrder(arrayList, fetchRows(collect, arrayList.size()));
        Assert.assertTrue(!hasNextData(collect));
    }

    private void checkStreamDataWithDDLDuringFailover(TableResult tableResult, FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr) throws Exception {
        waitUntilJobRunning(tableResult);
        CloseableIterator<?> collect = tableResult.collect();
        JobID jobID = ((JobClient) tableResult.getJobClient().get()).getJobID();
        for (String str : strArr) {
            makeFirstPartStreamEvents(getConnection(), this.customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + str);
        }
        Thread.sleep(2000L);
        if (failoverPhase == FailoverPhase.STREAM) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                for (String str2 : strArr) {
                    try {
                        makeSecondPartStreamEvents(getConnection(), this.customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + str2);
                    } catch (SQLException e) {
                        throw new RuntimeException(e);
                    }
                }
                sleepMs(200L);
            });
            waitUntilJobRunning(tableResult);
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < strArr.length; i++) {
            arrayList.addAll(this.firstPartStreamEvents);
            arrayList.addAll(this.secondPartStreamEvents);
        }
        Thread.sleep(2000L);
        assertEqualsInAnyOrder(arrayList, fetchRows(collect, arrayList.size()));
        Assert.assertTrue(!hasNextData(collect));
    }

    private void sleepMs(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private String getTableNameRegex(String[] strArr) {
        Preconditions.checkState(strArr.length > 0);
        return strArr.length == 1 ? strArr[0] : String.format("(%s)", StringUtils.join(strArr, "|"));
    }

    private static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next().toString());
            i--;
        }
        return arrayList;
    }

    private void makeFirstPartStreamEvents(JdbcConnection jdbcConnection, String str) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            jdbcConnection.execute(new String[]{"UPDATE " + str + " SET address = 'Hangzhou' where id = 103", "DELETE FROM " + str + " where id = 102", "INSERT INTO " + str + " VALUES(102, 'user_2','Shanghai','123567891234')", "UPDATE " + str + " SET address = 'Shanghai' where id = 103"});
            jdbcConnection.commit();
        } finally {
            jdbcConnection.close();
        }
    }

    private void makeSecondPartStreamEvents(JdbcConnection jdbcConnection, String str) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            jdbcConnection.execute(new String[]{"UPDATE " + str + " SET address = 'Hangzhou' where id = 1010"});
            jdbcConnection.commit();
            jdbcConnection.execute(new String[]{"INSERT INTO " + str + " VALUES(2001, 'user_22','Shanghai','123567891234'), (2002, 'user_23','Shanghai','123567891234'),(2003, 'user_24','Shanghai','123567891234')"});
            jdbcConnection.commit();
        } finally {
            jdbcConnection.close();
        }
    }

    private PostgresConnection getConnection() {
        HashMap hashMap = new HashMap();
        hashMap.put("hostname", this.customDatabase.getHost());
        hashMap.put("port", String.valueOf(this.customDatabase.getDatabasePort()));
        hashMap.put("user", this.customDatabase.getUsername());
        hashMap.put("password", this.customDatabase.getPassword());
        hashMap.put("dbname", this.customDatabase.getDatabaseName());
        return createConnection(hashMap);
    }

    private static void triggerFailover(FailoverType failoverType, JobID jobID, MiniCluster miniCluster, Runnable runnable) throws Exception {
        switch (failoverType) {
            case TM:
                restartTaskManager(miniCluster, runnable);
                return;
            case JM:
                triggerJobManagerFailover(jobID, miniCluster, runnable);
                return;
            case NONE:
                return;
            default:
                throw new IllegalStateException("Unexpected value: " + failoverType);
        }
    }

    private static void triggerJobManagerFailover(JobID jobID, MiniCluster miniCluster, Runnable runnable) throws Exception {
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl) miniCluster.getHaLeadershipControl().get();
        haLeadershipControl.revokeJobMasterLeadership(jobID).get();
        runnable.run();
        haLeadershipControl.grantJobMasterLeadership(jobID).get();
    }

    private static void restartTaskManager(MiniCluster miniCluster, Runnable runnable) throws Exception {
        miniCluster.terminateTaskManager(0).get();
        runnable.run();
        miniCluster.startTaskManager();
    }

    public static void assertEqualsInAnyOrder(List<String> list, List<String> list2) {
        Assert.assertTrue((list == null || list2 == null) ? false : true);
        assertEqualsInOrder((List) list.stream().sorted().collect(Collectors.toList()), (List) list2.stream().sorted().collect(Collectors.toList()));
    }

    public static void assertEqualsInOrder(List<String> list, List<String> list2) {
        Assert.assertTrue((list == null || list2 == null) ? false : true);
        Assert.assertEquals(list.size(), list2.size());
        Assert.assertArrayEquals(list.toArray(new String[0]), list2.toArray(new String[0]));
    }

    private void waitUntilJobRunning(TableResult tableResult) throws InterruptedException, ExecutionException {
        do {
            Thread.sleep(5000L);
        } while (((JobClient) tableResult.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
    }

    private boolean hasNextData(CloseableIterator<?> closeableIterator) throws InterruptedException, ExecutionException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            closeableIterator.getClass();
            FutureTask futureTask = new FutureTask(closeableIterator::hasNext);
            newSingleThreadExecutor.execute(futureTask);
            boolean booleanValue = ((Boolean) futureTask.get(3L, TimeUnit.SECONDS)).booleanValue();
            newSingleThreadExecutor.shutdown();
            return booleanValue;
        } catch (TimeoutException e) {
            newSingleThreadExecutor.shutdown();
            return false;
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdown();
            throw th;
        }
    }
}
