package com.ververica.cdc.connectors.postgres.source;

import com.ververica.cdc.connectors.postgres.PostgresTestBase;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.connectors.postgres.testutils.UniqueDatabase;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.testcontainers.containers.PostgreSQLContainer;

/* loaded from: input_file:com/ververica/cdc/connectors/postgres/source/PostgresSourceExampleTest.class */
public class PostgresSourceExampleTest extends PostgresTestBase {
    private static final String DB_NAME_PREFIX = "postgres";
    private static final String SCHEMA_NAME = "inventory";
    private static final String TABLE_ID = "inventory.products";
    private static final String SLOT_NAME = "flink";
    private static final String PLUGIN_NAME = "decoderbufs";
    private static final long CHECKPOINT_INTERVAL_MS = 3000;
    private static final int DEFAULT_PARALLELISM = 2;

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
    private final UniqueDatabase inventoryDatabase = new UniqueDatabase(POSTGRES_CONTAINER, "postgres", SCHEMA_NAME, POSTGRES_CONTAINER.getUsername(), POSTGRES_CONTAINER.getPassword());

    @Test
    @Ignore("Test ignored because it won't stop and is used for manual test")
    public void testConsumingScanEvents() throws Exception {
        this.inventoryDatabase.createAndInitialize();
        PostgresSourceBuilder.PostgresIncrementalSource build = PostgresSourceBuilder.PostgresIncrementalSource.builder().hostname(POSTGRES_CONTAINER.getHost()).port(POSTGRES_CONTAINER.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()).intValue()).database(this.inventoryDatabase.getDatabaseName()).schemaList(new String[]{SCHEMA_NAME}).tableList(new String[]{TABLE_ID}).username(POSTGRES_CONTAINER.getUsername()).password(POSTGRES_CONTAINER.getPassword()).slotName(SLOT_NAME).decodingPluginName(PLUGIN_NAME).deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).splitSize(DEFAULT_PARALLELISM).build();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
        executionEnvironment.fromSource(build, WatermarkStrategy.noWatermarks(), "PostgresParallelSource").setParallelism(DEFAULT_PARALLELISM).print();
        executionEnvironment.execute("Output Postgres Snapshot");
    }

    @Test
    @Ignore
    public void testConsumingAllEvents() throws Exception {
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("description", DataTypes.STRING()), DataTypes.FIELD("weight", DataTypes.FLOAT())});
        this.inventoryDatabase.createAndInitialize();
        Properties properties = new Properties();
        properties.setProperty("snapshot.mode", "never");
        PostgresSourceBuilder.PostgresIncrementalSource build = PostgresSourceBuilder.PostgresIncrementalSource.builder().hostname(POSTGRES_CONTAINER.getHost()).port(POSTGRES_CONTAINER.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()).intValue()).database(this.inventoryDatabase.getDatabaseName()).schemaList(new String[]{SCHEMA_NAME}).tableList(new String[]{TABLE_ID}).username(POSTGRES_CONTAINER.getUsername()).password(POSTGRES_CONTAINER.getPassword()).slotName(SLOT_NAME).decodingPluginName(PLUGIN_NAME).deserializer(buildRowDataDebeziumDeserializeSchema(ROW)).includeSchemaChanges(true).splitSize(DEFAULT_PARALLELISM).debeziumProperties(properties).build();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
        CloseableIterator executeAndCollect = executionEnvironment.fromSource(build, WatermarkStrategy.noWatermarks(), "PostgresParallelSource").setParallelism(DEFAULT_PARALLELISM).executeAndCollect();
        String[] strArr = {"+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]", "+I[105, hammer, 14oz carpenter's hammer, 0.875]", "+I[106, hammer, 16oz carpenter's hammer, 1.0]", "+I[107, rocks, box of assorted rocks, 5.3]", "+I[108, jacket, water resistent black wind breaker, 0.1]", "+I[109, spare tire, 24 inch spare tire, 22.2]"};
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < strArr.length && executeAndCollect.hasNext(); i++) {
            arrayList.add(executeAndCollect.next());
        }
        assertEqualsInAnyOrder(Arrays.asList(strArr), formatResult(arrayList, ROW));
        this.log.info("All snapshot data consumed!");
        makeWalEvents(getConnection(), TABLE_ID);
        String[] strArr2 = {"-U[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+U[103, cart, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[110, spare tire, 28 inch spare tire, 26.2]", "-D[110, spare tire, 28 inch spare tire, 26.2]", "-U[103, cart, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+U[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]"};
        List<RowData> arrayList2 = new ArrayList<>();
        for (int i2 = 0; i2 < strArr2.length && executeAndCollect.hasNext(); i2++) {
            RowData rowData = (RowData) executeAndCollect.next();
            this.log.info("step 3: consume wal event: {}", rowData);
            arrayList2.add(rowData);
        }
        assertEqualsInAnyOrder(Arrays.asList(strArr2), formatResult(arrayList2, ROW));
        this.log.info("All streaming events consumed!");
        executeAndCollect.close();
    }

    private DebeziumDeserializationSchema<RowData> buildRowDataDebeziumDeserializeSchema(DataType dataType) {
        return RowDataDebeziumDeserializeSchema.newBuilder().setPhysicalRowType(dataType.getLogicalType()).setResultTypeInfo(InternalTypeInfo.of(TypeConversions.fromDataToLogicalType(dataType))).build();
    }

    private List<String> formatResult(List<RowData> list, DataType dataType) {
        RowRowConverter create = RowRowConverter.create(dataType);
        create.open(Thread.currentThread().getContextClassLoader());
        Stream<RowData> stream = list.stream();
        create.getClass();
        return (List) stream.map(create::toExternal).map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList());
    }

    private PostgresConnection getConnection() throws SQLException {
        HashMap hashMap = new HashMap();
        hashMap.put("hostname", POSTGRES_CONTAINER.getHost());
        hashMap.put("port", String.valueOf(POSTGRES_CONTAINER.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue())));
        hashMap.put("dbname", this.inventoryDatabase.getDatabaseName());
        hashMap.put("user", this.inventoryDatabase.getUsername());
        hashMap.put("password", this.inventoryDatabase.getPassword());
        PostgresConnection createConnection = createConnection(hashMap);
        createConnection.connect();
        return createConnection;
    }

    public static void assertEqualsInAnyOrder(List<String> list, List<String> list2) {
        Assert.assertTrue((list == null || list2 == null) ? false : true);
        assertEqualsInOrder((List) list.stream().sorted().collect(Collectors.toList()), (List) list2.stream().sorted().collect(Collectors.toList()));
    }

    public static void assertEqualsInOrder(List<String> list, List<String> list2) {
        Assert.assertTrue((list == null || list2 == null) ? false : true);
        Assert.assertEquals(list.size(), list2.size());
        Assert.assertArrayEquals(list.toArray(new String[0]), list2.toArray(new String[0]));
    }

    private void makeWalEvents(PostgresConnection postgresConnection, String str) throws SQLException {
        waitForReplicationSlotReady(postgresConnection);
        try {
            postgresConnection.setAutoCommit(false);
            postgresConnection.execute(new String[]{"UPDATE " + str + " SET name = 'cart' where id = 103", "INSERT INTO " + str + " VALUES(110,'spare tire','28 inch spare tire','26.2')", "DELETE FROM " + str + " where id = 110", "UPDATE " + str + " SET name = '12-pack drill bits' where id = 103"});
            postgresConnection.commit();
        } finally {
            postgresConnection.close();
        }
    }

    private void waitForReplicationSlotReady(PostgresConnection postgresConnection) throws SQLException {
        SlotState replicationSlotState = postgresConnection.getReplicationSlotState(SLOT_NAME, PLUGIN_NAME);
        while (replicationSlotState == null) {
            this.log.info("Waiting until the replication slot is ready ...");
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            replicationSlotState = postgresConnection.getReplicationSlotState(SLOT_NAME, PLUGIN_NAME);
        }
    }
}
