package com.ververica.cdc.connectors.postgres;

import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import com.ververica.cdc.connectors.utils.AssertUtils;
import com.ververica.cdc.connectors.utils.TestSourceContext;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:com/ververica/cdc/connectors/postgres/PostgreSQLSourceTest.class */
public class PostgreSQLSourceTest extends PostgresTestBase {
    private static final String SLOT_NAME = "flink";
    private static final Logger LOG = LoggerFactory.getLogger(PostgreSQLSourceTest.class);
    private static final PostgreSQLContainer<?> POSTGRES_CONTAINER_OLD = new PostgreSQLContainer(DockerImageName.parse("debezium/postgres:9.6").asCompatibleSubstituteFor(PostgresTestBase.DEFAULT_DB)).withDatabaseName(PostgresTestBase.DEFAULT_DB).withUsername(PostgresTestBase.DEFAULT_DB).withPassword(PostgresTestBase.DEFAULT_DB).withLogConsumer(new Slf4jLogConsumer(LOG));

    /* loaded from: input_file:com/ververica/cdc/connectors/postgres/PostgreSQLSourceTest$BlockingSourceContext.class */
    private static class BlockingSourceContext<T> extends TestSourceContext<T> {
        private final Semaphore blocker;
        private final int expectedCount;
        private int currentCount;

        private BlockingSourceContext(int i) {
            this.blocker = new Semaphore(0);
            this.currentCount = 0;
            this.expectedCount = i;
        }

        public void collect(T t) {
            super.collect(t);
            this.currentCount++;
            if (this.currentCount == this.expectedCount) {
                try {
                    this.blocker.acquire();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/connectors/postgres/PostgreSQLSourceTest$ForwardDeserializeSchema.class */
    public static class ForwardDeserializeSchema implements DebeziumDeserializationSchema<SourceRecord> {
        private static final long serialVersionUID = 2975058057832211228L;

        private ForwardDeserializeSchema() {
        }

        public void deserialize(SourceRecord sourceRecord, Collector<SourceRecord> collector) throws Exception {
            collector.collect(sourceRecord);
        }

        public TypeInformation<SourceRecord> getProducedType() {
            return TypeInformation.of(SourceRecord.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/connectors/postgres/PostgreSQLSourceTest$MockFunctionInitializationContext.class */
    public static class MockFunctionInitializationContext implements FunctionInitializationContext {
        private final boolean isRestored;
        private final OperatorStateStore operatorStateStore;

        private MockFunctionInitializationContext(boolean z, OperatorStateStore operatorStateStore) {
            this.isRestored = z;
            this.operatorStateStore = operatorStateStore;
        }

        public boolean isRestored() {
            return this.isRestored;
        }

        public OptionalLong getRestoredCheckpointId() {
            throw new UnsupportedOperationException();
        }

        public OperatorStateStore getOperatorStateStore() {
            return this.operatorStateStore;
        }

        public KeyedStateStore getKeyedStateStore() {
            throw new UnsupportedOperationException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/connectors/postgres/PostgreSQLSourceTest$MockOperatorStateStore.class */
    public static class MockOperatorStateStore implements OperatorStateStore {
        private final ListState<?> restoredOffsetListState;
        private final ListState<?> restoredHistoryListState;

        private MockOperatorStateStore(ListState<?> listState, ListState<?> listState2) {
            this.restoredOffsetListState = listState;
            this.restoredHistoryListState = listState2;
        }

        public <S> ListState<S> getUnionListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
            if (listStateDescriptor.getName().equals("offset-states")) {
                return (ListState<S>) this.restoredOffsetListState;
            }
            if (listStateDescriptor.getName().equals("history-records-states")) {
                return (ListState<S>) this.restoredHistoryListState;
            }
            throw new IllegalStateException("Unknown state.");
        }

        public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> mapStateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        public <S> ListState<S> getListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        public Set<String> getRegisteredStateNames() {
            throw new UnsupportedOperationException();
        }

        public Set<String> getRegisteredBroadcastStateNames() {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:com/ververica/cdc/connectors/postgres/PostgreSQLSourceTest$TestingListState.class */
    private static final class TestingListState<T> implements ListState<T> {
        private final List<T> list;
        private boolean clearCalled;

        private TestingListState() {
            this.list = new ArrayList();
            this.clearCalled = false;
        }

        public void clear() {
            this.list.clear();
            this.clearCalled = true;
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Iterable<T> m1get() throws Exception {
            return this.list;
        }

        public void add(T t) throws Exception {
            Preconditions.checkNotNull(t, "You cannot add null to a ListState.");
            this.list.add(t);
        }

        public List<T> getList() {
            return this.list;
        }

        boolean isClearCalled() {
            return this.clearCalled;
        }

        public void update(List<T> list) throws Exception {
            clear();
            addAll(list);
        }

        public void addAll(List<T> list) throws Exception {
            if (list != null) {
                list.forEach(obj -> {
                    Preconditions.checkNotNull(obj, "You cannot add null to a ListState.");
                });
                this.list.addAll(list);
            }
        }
    }

    @BeforeClass
    public static void startAll() {
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(POSTGRES_CONTAINER_OLD)).join();
        LOG.info("Containers are started.");
    }

    @AfterClass
    public static void stopAll() {
        LOG.info("Stopping containers...");
        POSTGRES_CONTAINER_OLD.stop();
        LOG.info("Containers are stopped.");
    }

    @Before
    public void before() {
        initializePostgresTable(POSTGRES_CONTAINER_OLD, "inventory");
    }

    @Test
    public void testConsumingAllEvents() throws Exception {
        final DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatDisabled = createPostgreSqlSourceWithHeartbeatDisabled();
        final TestSourceContext testSourceContext = new TestSourceContext();
        setupSource(createPostgreSqlSourceWithHeartbeatDisabled);
        Connection jdbcConnection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    CheckedThread checkedThread = new CheckedThread() { // from class: com.ververica.cdc.connectors.postgres.PostgreSQLSourceTest.1
                        public void go() throws Exception {
                            createPostgreSqlSourceWithHeartbeatDisabled.run(testSourceContext);
                        }
                    };
                    checkedThread.start();
                    List drain = drain(testSourceContext, 9);
                    Assert.assertEquals(9L, drain.size());
                    for (int i = 0; i < drain.size(); i++) {
                        AssertUtils.assertRead((SourceRecord) drain.get(i), "id", 101 + i);
                    }
                    createStatement.execute("INSERT INTO inventory.products VALUES (default,'robot','Toy robot',1.304)");
                    AssertUtils.assertInsert((SourceRecord) drain(testSourceContext, 1).get(0), "id", 110);
                    createStatement.execute("INSERT INTO inventory.products VALUES (1001,'roy','old robot',1234.56)");
                    AssertUtils.assertInsert((SourceRecord) drain(testSourceContext, 1).get(0), "id", 1001);
                    createStatement.execute("UPDATE inventory.products SET id=2001, description='really old robot' WHERE id=1001");
                    List drain2 = drain(testSourceContext, 2);
                    AssertUtils.assertDelete((SourceRecord) drain2.get(0), "id", 1001);
                    AssertUtils.assertInsert((SourceRecord) drain2.get(1), "id", 2001);
                    createStatement.execute("UPDATE inventory.products SET weight=1345.67 WHERE id=2001");
                    AssertUtils.assertUpdate((SourceRecord) drain(testSourceContext, 1).get(0), "id", 2001);
                    createStatement.execute("ALTER TABLE inventory.products ADD COLUMN volume FLOAT, ADD COLUMN alias VARCHAR(30) NULL");
                    createStatement.execute("UPDATE inventory.products SET volume=13.5 WHERE id=2001");
                    AssertUtils.assertUpdate((SourceRecord) drain(testSourceContext, 1).get(0), "id", 2001);
                    createPostgreSqlSourceWithHeartbeatDisabled.cancel();
                    createPostgreSqlSourceWithHeartbeatDisabled.close();
                    checkedThread.sync();
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    if (jdbcConnection != null) {
                        if (0 == 0) {
                            jdbcConnection.close();
                            return;
                        }
                        try {
                            jdbcConnection.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testCheckpointAndRestore() throws Exception {
        Throwable th;
        int intValue;
        final DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatDisabled;
        final TestSourceContext testSourceContext;
        CheckedThread checkedThread;
        TestingListState testingListState = new TestingListState();
        TestingListState testingListState2 = new TestingListState();
        final DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatDisabled2 = createPostgreSqlSourceWithHeartbeatDisabled();
        final BlockingSourceContext blockingSourceContext = new BlockingSourceContext(8);
        setupSource(createPostgreSqlSourceWithHeartbeatDisabled2, false, testingListState, testingListState2, true, 0, 1);
        CheckedThread checkedThread2 = new CheckedThread() { // from class: com.ververica.cdc.connectors.postgres.PostgreSQLSourceTest.2
            public void go() throws Exception {
                createPostgreSqlSourceWithHeartbeatDisabled2.run(blockingSourceContext);
            }
        };
        checkedThread2.start();
        Assert.assertEquals(2L, drain(blockingSourceContext, 2).size());
        Assert.assertFalse(waitForCheckpointLock(blockingSourceContext.getCheckpointLock(), Duration.ofSeconds(3L)));
        blockingSourceContext.blocker.release();
        Assert.assertEquals(9L, drain(blockingSourceContext, 9 - r0).size() + r0);
        Assert.assertEquals(0L, testingListState.list.size());
        Assert.assertEquals(0L, testingListState2.list.size());
        synchronized (blockingSourceContext.getCheckpointLock()) {
            createPostgreSqlSourceWithHeartbeatDisabled2.snapshotState(new StateSnapshotContextSynchronousImpl(101L, 101L));
        }
        Assert.assertEquals(1L, testingListState.list.size());
        String str = new String((byte[]) testingListState.list.get(0), StandardCharsets.UTF_8);
        Assert.assertEquals("postgres_cdc_source", JsonPath.read(str, "$.sourcePartition.server", new Predicate[0]));
        Assert.assertEquals("557", JsonPath.read(str, "$.sourceOffset.txId", new Predicate[0]).toString());
        Assert.assertEquals("true", JsonPath.read(str, "$.sourceOffset.last_snapshot_record", new Predicate[0]).toString());
        Assert.assertEquals("true", JsonPath.read(str, "$.sourceOffset.snapshot", new Predicate[0]).toString());
        Assert.assertTrue(str.contains("ts_usec"));
        int intValue2 = ((Integer) JsonPath.read(str, "$.sourceOffset.lsn", new Predicate[0])).intValue();
        Assert.assertTrue(intValue2 > 0);
        createPostgreSqlSourceWithHeartbeatDisabled2.cancel();
        createPostgreSqlSourceWithHeartbeatDisabled2.close();
        checkedThread2.sync();
        final DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatDisabled3 = createPostgreSqlSourceWithHeartbeatDisabled();
        final TestSourceContext<?> testSourceContext2 = new TestSourceContext<>();
        setupSource(createPostgreSqlSourceWithHeartbeatDisabled3, true, testingListState, testingListState2, true, 0, 1);
        CheckedThread checkedThread3 = new CheckedThread() { // from class: com.ververica.cdc.connectors.postgres.PostgreSQLSourceTest.3
            public void go() throws Exception {
                createPostgreSqlSourceWithHeartbeatDisabled3.run(testSourceContext2);
            }
        };
        checkedThread3.start();
        Assert.assertFalse(waitForAvailableRecords(Duration.ofSeconds(5L), testSourceContext2));
        Connection jdbcConnection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
        Throwable th2 = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th3 = null;
            try {
                createStatement.execute("INSERT INTO inventory.products VALUES (default,'robot','Toy robot',1.304)");
                List drain = drain(testSourceContext2, 1);
                Assert.assertEquals(1L, drain.size());
                AssertUtils.assertInsert((SourceRecord) drain.get(0), "id", 110);
                synchronized (testSourceContext2.getCheckpointLock()) {
                    createPostgreSqlSourceWithHeartbeatDisabled3.snapshotState(new StateSnapshotContextSynchronousImpl(138L, 138L));
                }
                Assert.assertEquals(1L, testingListState.list.size());
                String str2 = new String((byte[]) testingListState.list.get(0), StandardCharsets.UTF_8);
                Assert.assertEquals("postgres_cdc_source", JsonPath.read(str2, "$.sourcePartition.server", new Predicate[0]));
                Assert.assertEquals("558", JsonPath.read(str2, "$.sourceOffset.txId", new Predicate[0]).toString());
                Assert.assertTrue(str2.contains("ts_usec"));
                Assert.assertFalse(str2.contains("snapshot"));
                int intValue3 = ((Integer) JsonPath.read(str2, "$.sourceOffset.lsn", new Predicate[0])).intValue();
                Assert.assertTrue(intValue3 > intValue2);
                createStatement.execute("INSERT INTO inventory.products VALUES (1001,'roy','old robot',1234.56)");
                createStatement.execute("UPDATE inventory.products SET weight=1345.67 WHERE id=1001");
                if (createStatement != null) {
                    if (0 != 0) {
                        try {
                            createStatement.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                createPostgreSqlSourceWithHeartbeatDisabled3.cancel();
                createPostgreSqlSourceWithHeartbeatDisabled3.close();
                checkedThread3.sync();
                final DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatDisabled4 = createPostgreSqlSourceWithHeartbeatDisabled();
                final TestSourceContext<?> testSourceContext3 = new TestSourceContext<>();
                setupSource(createPostgreSqlSourceWithHeartbeatDisabled4, true, testingListState, testingListState2, true, 0, 1);
                CheckedThread checkedThread4 = new CheckedThread() { // from class: com.ververica.cdc.connectors.postgres.PostgreSQLSourceTest.4
                    public void go() throws Exception {
                        createPostgreSqlSourceWithHeartbeatDisabled4.run(testSourceContext3);
                    }
                };
                checkedThread4.start();
                List drain2 = drain(testSourceContext3, 2);
                AssertUtils.assertInsert((SourceRecord) drain2.get(0), "id", 1001);
                AssertUtils.assertUpdate((SourceRecord) drain2.get(1), "id", 1001);
                Assert.assertFalse(waitForAvailableRecords(Duration.ofSeconds(3L), testSourceContext3));
                Connection jdbcConnection2 = getJdbcConnection(POSTGRES_CONTAINER_OLD);
                Throwable th5 = null;
                try {
                    Statement createStatement2 = jdbcConnection2.createStatement();
                    Throwable th6 = null;
                    try {
                        try {
                            createStatement2.execute("DELETE FROM inventory.products WHERE id=1001");
                            if (createStatement2 != null) {
                                if (0 != 0) {
                                    try {
                                        createStatement2.close();
                                    } catch (Throwable th7) {
                                        th6.addSuppressed(th7);
                                    }
                                } else {
                                    createStatement2.close();
                                }
                            }
                            AssertUtils.assertDelete((SourceRecord) drain(testSourceContext3, 1).get(0), "id", 1001);
                            synchronized (testSourceContext3.getCheckpointLock()) {
                                createPostgreSqlSourceWithHeartbeatDisabled4.snapshotState(new StateSnapshotContextSynchronousImpl(233L, 233L));
                            }
                            Assert.assertEquals(1L, testingListState.list.size());
                            String str3 = new String((byte[]) testingListState.list.get(0), StandardCharsets.UTF_8);
                            Assert.assertEquals("postgres_cdc_source", JsonPath.read(str3, "$.sourcePartition.server", new Predicate[0]));
                            Assert.assertEquals("561", JsonPath.read(str3, "$.sourceOffset.txId", new Predicate[0]).toString());
                            Assert.assertTrue(str3.contains("ts_usec"));
                            Assert.assertFalse(str3.contains("snapshot"));
                            Assert.assertTrue(((Integer) JsonPath.read(str3, "$.sourceOffset.lsn", new Predicate[0])).intValue() > intValue3);
                            createPostgreSqlSourceWithHeartbeatDisabled4.cancel();
                            createPostgreSqlSourceWithHeartbeatDisabled4.close();
                            checkedThread4.sync();
                            final DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatDisabled5 = createPostgreSqlSourceWithHeartbeatDisabled();
                            final TestSourceContext<?> testSourceContext4 = new TestSourceContext<>();
                            setupSource(createPostgreSqlSourceWithHeartbeatDisabled5, true, testingListState, testingListState2, true, 0, 1);
                            CheckedThread checkedThread5 = new CheckedThread() { // from class: com.ververica.cdc.connectors.postgres.PostgreSQLSourceTest.5
                                public void go() throws Exception {
                                    createPostgreSqlSourceWithHeartbeatDisabled5.run(testSourceContext4);
                                }
                            };
                            checkedThread5.start();
                            Assert.assertFalse(waitForAvailableRecords(Duration.ofSeconds(5L), testSourceContext4));
                            synchronized (testSourceContext4.getCheckpointLock()) {
                                createPostgreSqlSourceWithHeartbeatDisabled5.snapshotState(new StateSnapshotContextSynchronousImpl(254L, 254L));
                            }
                            Assert.assertEquals(1L, testingListState.list.size());
                            String str4 = new String((byte[]) testingListState.list.get(0), StandardCharsets.UTF_8);
                            Assert.assertEquals("postgres_cdc_source", JsonPath.read(str4, "$.sourcePartition.server", new Predicate[0]));
                            Assert.assertEquals("561", JsonPath.read(str4, "$.sourceOffset.txId", new Predicate[0]).toString());
                            Assert.assertTrue(str4.contains("ts_usec"));
                            Assert.assertFalse(str4.contains("snapshot"));
                            intValue = ((Integer) JsonPath.read(str4, "$.sourceOffset.lsn", new Predicate[0])).intValue();
                            Assert.assertTrue(intValue > intValue3);
                            createPostgreSqlSourceWithHeartbeatDisabled5.cancel();
                            createPostgreSqlSourceWithHeartbeatDisabled5.close();
                            checkedThread5.sync();
                            createPostgreSqlSourceWithHeartbeatDisabled = createPostgreSqlSourceWithHeartbeatDisabled();
                            testSourceContext = new TestSourceContext();
                            setupSource(createPostgreSqlSourceWithHeartbeatDisabled, true, testingListState, testingListState2, true, 0, 1);
                            checkedThread = new CheckedThread() { // from class: com.ververica.cdc.connectors.postgres.PostgreSQLSourceTest.6
                                public void go() throws Exception {
                                    createPostgreSqlSourceWithHeartbeatDisabled.run(testSourceContext);
                                }
                            };
                            checkedThread.start();
                            Connection jdbcConnection3 = getJdbcConnection(POSTGRES_CONTAINER_OLD);
                            Throwable th8 = null;
                            try {
                                createStatement2 = jdbcConnection3.createStatement();
                                th = null;
                            } finally {
                                if (jdbcConnection3 != null) {
                                    if (0 != 0) {
                                        try {
                                            jdbcConnection3.close();
                                        } catch (Throwable th9) {
                                            th8.addSuppressed(th9);
                                        }
                                    } else {
                                        jdbcConnection3.close();
                                    }
                                }
                            }
                        } finally {
                        }
                        try {
                            try {
                                createStatement2.execute("INSERT INTO inventory.products(id, description, weight) VALUES (default, 'Go go go', 111.1)");
                                createStatement2.execute("ALTER TABLE inventory.products ADD comment_col VARCHAR(100) DEFAULT 'cdc'");
                                AssertUtils.assertInsert((SourceRecord) drain(testSourceContext, 1).get(0), "id", 111);
                                if (createStatement2 != null) {
                                    if (0 != 0) {
                                        try {
                                            createStatement2.close();
                                        } catch (Throwable th10) {
                                            th.addSuppressed(th10);
                                        }
                                    } else {
                                        createStatement2.close();
                                    }
                                }
                                synchronized (testSourceContext.getCheckpointLock()) {
                                    createPostgreSqlSourceWithHeartbeatDisabled.snapshotState(new StateSnapshotContextSynchronousImpl(300L, 300L));
                                }
                                Assert.assertEquals(1L, testingListState.list.size());
                                String str5 = new String((byte[]) testingListState.list.get(0), StandardCharsets.UTF_8);
                                Assert.assertEquals("postgres_cdc_source", JsonPath.read(str5, "$.sourcePartition.server", new Predicate[0]));
                                Assert.assertEquals("562", JsonPath.read(str5, "$.sourceOffset.txId", new Predicate[0]).toString());
                                Assert.assertTrue(str5.contains("ts_usec"));
                                Assert.assertFalse(str5.contains("snapshot"));
                                Assert.assertTrue(((Integer) JsonPath.read(str5, "$.sourceOffset.lsn", new Predicate[0])).intValue() > intValue);
                                createPostgreSqlSourceWithHeartbeatDisabled.cancel();
                                createPostgreSqlSourceWithHeartbeatDisabled.close();
                                checkedThread.sync();
                                final DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatDisabled6 = createPostgreSqlSourceWithHeartbeatDisabled();
                                final TestSourceContext testSourceContext5 = new TestSourceContext();
                                setupSource(createPostgreSqlSourceWithHeartbeatDisabled6, true, testingListState, testingListState2, true, 0, 1);
                                CheckedThread checkedThread6 = new CheckedThread() { // from class: com.ververica.cdc.connectors.postgres.PostgreSQLSourceTest.7
                                    public void go() throws Exception {
                                        createPostgreSqlSourceWithHeartbeatDisabled6.run(testSourceContext5);
                                    }
                                };
                                checkedThread6.start();
                                Connection jdbcConnection4 = getJdbcConnection(POSTGRES_CONTAINER_OLD);
                                Throwable th11 = null;
                                try {
                                    Statement createStatement3 = jdbcConnection4.createStatement();
                                    Throwable th12 = null;
                                    try {
                                        try {
                                            createStatement3.execute("INSERT INTO inventory.products(id, description, weight) VALUES (default, 'Run!', 22.2)");
                                            AssertUtils.assertInsert((SourceRecord) drain(testSourceContext5, 1).get(0), "id", 112);
                                            if (createStatement3 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        createStatement3.close();
                                                    } catch (Throwable th13) {
                                                        th12.addSuppressed(th13);
                                                    }
                                                } else {
                                                    createStatement3.close();
                                                }
                                            }
                                            createPostgreSqlSourceWithHeartbeatDisabled6.cancel();
                                            createPostgreSqlSourceWithHeartbeatDisabled6.close();
                                            checkedThread6.sync();
                                        } finally {
                                        }
                                    } finally {
                                        if (createStatement3 != null) {
                                            if (th12 != null) {
                                                try {
                                                    createStatement3.close();
                                                } catch (Throwable th14) {
                                                    th12.addSuppressed(th14);
                                                }
                                            } else {
                                                createStatement3.close();
                                            }
                                        }
                                    }
                                } finally {
                                    if (jdbcConnection4 != null) {
                                        if (0 != 0) {
                                            try {
                                                jdbcConnection4.close();
                                            } catch (Throwable th15) {
                                                th11.addSuppressed(th15);
                                            }
                                        } else {
                                            jdbcConnection4.close();
                                        }
                                    }
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                    if (jdbcConnection2 != null) {
                        if (0 != 0) {
                            try {
                                jdbcConnection2.close();
                            } catch (Throwable th16) {
                                th5.addSuppressed(th16);
                            }
                        } else {
                            jdbcConnection2.close();
                        }
                    }
                }
            } catch (Throwable th17) {
                if (createStatement != null) {
                    if (0 != 0) {
                        try {
                            createStatement.close();
                        } catch (Throwable th18) {
                            th3.addSuppressed(th18);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th17;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th19) {
                        th2.addSuppressed(th19);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testFlushLsn() throws Exception {
        TestingListState testingListState = new TestingListState();
        TestingListState testingListState2 = new TestingListState();
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        final DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatEnabled = createPostgreSqlSourceWithHeartbeatEnabled();
        final TestSourceContext<SourceRecord> testSourceContext = new TestSourceContext<>();
        setupSource(createPostgreSqlSourceWithHeartbeatEnabled, false, testingListState, testingListState2, true, 0, 1);
        CheckedThread checkedThread = new CheckedThread() { // from class: com.ververica.cdc.connectors.postgres.PostgreSQLSourceTest.8
            public void go() throws Exception {
                createPostgreSqlSourceWithHeartbeatEnabled.run(testSourceContext);
            }
        };
        checkedThread.start();
        Assert.assertEquals(9L, drain(testSourceContext, 9).size());
        synchronized (testSourceContext.getCheckpointLock()) {
            createPostgreSqlSourceWithHeartbeatEnabled.snapshotState(new StateSnapshotContextSynchronousImpl(101L, 101L));
        }
        createPostgreSqlSourceWithHeartbeatEnabled.notifyCheckpointComplete(101L);
        Assert.assertTrue(linkedHashSet.add(getConfirmedFlushLsn()));
        batchInsertAndCheckpoint(5, createPostgreSqlSourceWithHeartbeatEnabled, testSourceContext, 201L);
        Assert.assertEquals(1L, createPostgreSqlSourceWithHeartbeatEnabled.getPendingOffsetsToCommit().size());
        createPostgreSqlSourceWithHeartbeatEnabled.notifyCheckpointComplete(201L);
        Assert.assertEquals(0L, createPostgreSqlSourceWithHeartbeatEnabled.getPendingOffsetsToCommit().size());
        Assert.assertTrue(linkedHashSet.add(getConfirmedFlushLsn()));
        batchInsertAndCheckpoint(1, createPostgreSqlSourceWithHeartbeatEnabled, testSourceContext, 301L);
        Assert.assertFalse(linkedHashSet.add(getConfirmedFlushLsn()));
        Assert.assertFalse(waitForAvailableRecords(Duration.ofSeconds(3L), testSourceContext));
        createPostgreSqlSourceWithHeartbeatEnabled.cancel();
        createPostgreSqlSourceWithHeartbeatEnabled.close();
        checkedThread.sync();
        final DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatEnabled2 = createPostgreSqlSourceWithHeartbeatEnabled();
        final TestSourceContext<SourceRecord> testSourceContext2 = new TestSourceContext<>();
        setupSource(createPostgreSqlSourceWithHeartbeatEnabled2, true, testingListState, testingListState2, true, 0, 1);
        CheckedThread checkedThread2 = new CheckedThread() { // from class: com.ververica.cdc.connectors.postgres.PostgreSQLSourceTest.9
            public void go() throws Exception {
                createPostgreSqlSourceWithHeartbeatEnabled2.run(testSourceContext2);
            }
        };
        checkedThread2.start();
        Assert.assertFalse(linkedHashSet.add(getConfirmedFlushLsn()));
        batchInsertAndCheckpoint(0, createPostgreSqlSourceWithHeartbeatEnabled2, testSourceContext2, 401L);
        Thread.sleep(3000L);
        batchInsertAndCheckpoint(0, createPostgreSqlSourceWithHeartbeatEnabled2, testSourceContext2, 402L);
        createPostgreSqlSourceWithHeartbeatEnabled2.notifyCheckpointComplete(402L);
        Assert.assertTrue(linkedHashSet.add(getConfirmedFlushLsn()));
        Connection jdbcConnection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("CREATE TABLE dummy (a int)");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    Thread.sleep(3000L);
                    batchInsertAndCheckpoint(0, createPostgreSqlSourceWithHeartbeatEnabled2, testSourceContext2, 404L);
                    createPostgreSqlSourceWithHeartbeatEnabled2.notifyCheckpointComplete(404L);
                    Assert.assertTrue(linkedHashSet.add(getConfirmedFlushLsn()));
                    batchInsertAndCheckpoint(3, createPostgreSqlSourceWithHeartbeatEnabled2, testSourceContext2, 501L);
                    batchInsertAndCheckpoint(2, createPostgreSqlSourceWithHeartbeatEnabled2, testSourceContext2, 502L);
                    batchInsertAndCheckpoint(1, createPostgreSqlSourceWithHeartbeatEnabled2, testSourceContext2, 503L);
                    Assert.assertEquals(3L, createPostgreSqlSourceWithHeartbeatEnabled2.getPendingOffsetsToCommit().size());
                    createPostgreSqlSourceWithHeartbeatEnabled2.notifyCheckpointComplete(503L);
                    Assert.assertTrue(linkedHashSet.add(getConfirmedFlushLsn()));
                    Assert.assertEquals(0L, createPostgreSqlSourceWithHeartbeatEnabled2.getPendingOffsetsToCommit().size());
                    Assert.assertFalse(waitForAvailableRecords(Duration.ofSeconds(3L), testSourceContext2));
                    createPostgreSqlSourceWithHeartbeatEnabled2.cancel();
                    createPostgreSqlSourceWithHeartbeatEnabled2.close();
                    checkedThread2.sync();
                    Assert.assertEquals(5L, linkedHashSet.size());
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    private void batchInsertAndCheckpoint(int i, DebeziumSourceFunction<SourceRecord> debeziumSourceFunction, TestSourceContext<SourceRecord> testSourceContext, long j) throws Exception {
        Connection jdbcConnection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    try {
                        createStatement.execute("INSERT INTO inventory.products VALUES (default,'dummy','My Dummy',1.1)");
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (createStatement != null) {
                        if (th2 != null) {
                            try {
                                createStatement.close();
                            } catch (Throwable th4) {
                                th2.addSuppressed(th4);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    throw th3;
                }
            }
            if (createStatement != null) {
                if (0 != 0) {
                    try {
                        createStatement.close();
                    } catch (Throwable th5) {
                        th2.addSuppressed(th5);
                    }
                } else {
                    createStatement.close();
                }
            }
            Assert.assertEquals(i, drain(testSourceContext, i).size());
            synchronized (testSourceContext.getCheckpointLock()) {
                debeziumSourceFunction.snapshotState(new StateSnapshotContextSynchronousImpl(j, j));
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    private DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatDisabled() {
        return createPostgreSqlSource(0);
    }

    private DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatEnabled() {
        return createPostgreSqlSource(1000);
    }

    private DebeziumSourceFunction<SourceRecord> createPostgreSqlSource(int i) {
        Properties properties = new Properties();
        properties.setProperty("heartbeat.interval.ms", String.valueOf(i));
        return PostgreSQLSource.builder().hostname(POSTGRES_CONTAINER_OLD.getHost()).port(POSTGRES_CONTAINER_OLD.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()).intValue()).database(POSTGRES_CONTAINER_OLD.getDatabaseName()).username(POSTGRES_CONTAINER_OLD.getUsername()).password(POSTGRES_CONTAINER_OLD.getPassword()).schemaList(new String[]{"inventory"}).tableList(new String[]{"inventory.products"}).deserializer(new ForwardDeserializeSchema()).slotName(SLOT_NAME).debeziumProperties(properties).build();
    }

    private String getConfirmedFlushLsn() throws SQLException {
        Connection jdbcConnection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                ResultSet executeQuery = createStatement.executeQuery(String.format("select * from pg_replication_slots where slot_name = '%s' and database = '%s' and plugin = '%s'", SLOT_NAME, POSTGRES_CONTAINER_OLD.getDatabaseName(), "decoderbufs"));
                if (executeQuery.next()) {
                    String string = executeQuery.getString("confirmed_flush_lsn");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    return string;
                }
                Assert.fail("No replication slot info available");
                if (createStatement != null) {
                    if (0 != 0) {
                        try {
                            createStatement.close();
                        } catch (Throwable th4) {
                            th2.addSuppressed(th4);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                if (jdbcConnection != null) {
                    if (0 != 0) {
                        try {
                            jdbcConnection.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        jdbcConnection.close();
                    }
                }
                return null;
            } catch (Throwable th6) {
                if (createStatement != null) {
                    if (0 != 0) {
                        try {
                            createStatement.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th6;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    private <T> List<T> drain(TestSourceContext<T> testSourceContext, int i) throws Exception {
        ArrayList arrayList = new ArrayList();
        LinkedBlockingQueue collectedOutputs = testSourceContext.getCollectedOutputs();
        while (arrayList.size() < i) {
            StreamRecord streamRecord = (StreamRecord) collectedOutputs.poll(100L, TimeUnit.SECONDS);
            if (streamRecord == null) {
                throw new RuntimeException("Can't receive " + i + " elements before timeout.");
            }
            arrayList.add(streamRecord.getValue());
        }
        return arrayList;
    }

    private boolean waitForCheckpointLock(Object obj, Duration duration) throws Exception {
        Semaphore semaphore = new Semaphore(0);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.execute(() -> {
            synchronized (obj) {
                semaphore.release();
            }
        });
        boolean tryAcquire = semaphore.tryAcquire(duration.toMillis(), TimeUnit.MILLISECONDS);
        newSingleThreadExecutor.shutdownNow();
        return tryAcquire;
    }

    private boolean waitForAvailableRecords(Duration duration, TestSourceContext<?> testSourceContext) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        while (System.currentTimeMillis() < currentTimeMillis && testSourceContext.getCollectedOutputs().isEmpty()) {
            Thread.sleep(10L);
        }
        return !testSourceContext.getCollectedOutputs().isEmpty();
    }

    private static <T> void setupSource(DebeziumSourceFunction<T> debeziumSourceFunction) throws Exception {
        setupSource(debeziumSourceFunction, false, null, null, true, 0, 1);
    }

    private static <T, S1, S2> void setupSource(DebeziumSourceFunction<T> debeziumSourceFunction, boolean z, ListState<S1> listState, ListState<S2> listState2, boolean z2, int i, int i2) throws Exception {
        debeziumSourceFunction.setRuntimeContext(new MockStreamingRuntimeContext(z2, i2, i));
        debeziumSourceFunction.initializeState(new MockFunctionInitializationContext(z, new MockOperatorStateStore(listState, listState2)));
        debeziumSourceFunction.open(new Configuration());
    }
}
