package com.ververica.cdc.connectors.postgres.table;

import com.ververica.cdc.connectors.postgres.PostgresTestBase;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.util.ExceptionUtils;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.testcontainers.containers.PostgreSQLContainer;

@RunWith(Parameterized.class)
/* loaded from: input_file:com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.class */
public class PostgreSQLConnectorITCase extends PostgresTestBase {
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create(this.env, EnvironmentSettings.newInstance().inStreamingMode().build());

    @ClassRule
    public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
    private final boolean parallelismSnapshot;

    public PostgreSQLConnectorITCase(boolean z) {
        this.parallelismSnapshot = z;
    }

    @Parameterized.Parameters(name = "parallelismSnapshot: {0}")
    public static Object[] parameters() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.env.setRestartStrategy(RestartStrategies.noRestart());
        if (!this.parallelismSnapshot) {
            this.env.setParallelism(1);
        } else {
            this.env.setParallelism(4);
            this.env.enableCheckpointing(200L);
        }
    }

    @Test
    public void testConsumingAllEvents() throws SQLException, ExecutionException, InterruptedException {
        initializePostgresTable(POSTGRES_CONTAINER, "inventory");
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3)) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'slot.name' = '%s')", POSTGRES_CONTAINER.getHost(), POSTGRES_CONTAINER.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()), POSTGRES_CONTAINER.getUsername(), POSTGRES_CONTAINER.getPassword(), POSTGRES_CONTAINER.getDatabaseName(), "inventory", "products", Boolean.valueOf(this.parallelismSnapshot), getSlotName()));
        this.tEnv.executeSql("CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
        waitForSnapshotStarted("sink");
        Thread.sleep(5000L);
        Connection jdbcConnection = getJdbcConnection(POSTGRES_CONTAINER);
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;");
                    createStatement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;");
                    createStatement.execute("INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                    createStatement.execute("INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                    createStatement.execute("UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                    createStatement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;");
                    createStatement.execute("DELETE FROM inventory.products WHERE id=111;");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    waitForSinkSize("sink", 20);
                    Assert.assertThat(TestValuesTableFactory.getResults("sink"), Matchers.containsInAnyOrder(new String[]{"scooter,3.140", "car battery,8.100", "12-pack drill bits,0.800", "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200"}));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testStartupFromLatestOffset() throws Exception {
        if (this.parallelismSnapshot) {
            initializePostgresTable(POSTGRES_CONTAINER, "inventory");
            this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'slot.name' = '%s', 'scan.startup.mode' = 'latest-offset')", POSTGRES_CONTAINER.getHost(), POSTGRES_CONTAINER.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()), POSTGRES_CONTAINER.getUsername(), POSTGRES_CONTAINER.getPassword(), POSTGRES_CONTAINER.getDatabaseName(), "inventory", "products", Boolean.valueOf(this.parallelismSnapshot), getSlotName()));
            this.tEnv.executeSql("CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)");
            TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
            Thread.sleep(10000L);
            Connection jdbcConnection = getJdbcConnection(POSTGRES_CONTAINER);
            Throwable th = null;
            try {
                Statement createStatement = jdbcConnection.createStatement();
                Throwable th2 = null;
                try {
                    try {
                        createStatement.execute("INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                        createStatement.execute("INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                        createStatement.execute("UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                        createStatement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;");
                        createStatement.execute("DELETE FROM inventory.products WHERE id=111;");
                        if (createStatement != null) {
                            if (0 != 0) {
                                try {
                                    createStatement.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                createStatement.close();
                            }
                        }
                        waitForSinkSize("sink", 5);
                        Assert.assertThat(TestValuesTableFactory.getResults("sink"), Matchers.containsInAnyOrder(new String[]{"110,jacket,new water resistent white wind breaker,0.500"}));
                        ((JobClient) executeSql.getJobClient().get()).cancel().get();
                    } finally {
                    }
                } catch (Throwable th4) {
                    if (createStatement != null) {
                        if (th2 != null) {
                            try {
                                createStatement.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    throw th4;
                }
            } finally {
                if (jdbcConnection != null) {
                    if (0 != 0) {
                        try {
                            jdbcConnection.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        jdbcConnection.close();
                    }
                }
            }
        }
    }

    @Test
    public void testExceptionForReplicaIdentity() throws Exception {
        initializePostgresTable(POSTGRES_CONTAINER, "replica_identity");
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3)) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'slot.name' = '%s')", POSTGRES_CONTAINER.getHost(), POSTGRES_CONTAINER.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()), POSTGRES_CONTAINER.getUsername(), POSTGRES_CONTAINER.getPassword(), POSTGRES_CONTAINER.getDatabaseName(), "inventory", "products", Boolean.valueOf(this.parallelismSnapshot), getSlotName()));
        this.tEnv.executeSql("CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
        waitForSnapshotStarted("sink");
        Thread.sleep(5000L);
        Connection jdbcConnection = getJdbcConnection(POSTGRES_CONTAINER);
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;");
                    createStatement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;");
                    createStatement.execute("INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                    createStatement.execute("INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                    createStatement.execute("UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                    createStatement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;");
                    createStatement.execute("DELETE FROM inventory.products WHERE id=111;");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    try {
                        executeSql.await();
                    } catch (Exception e) {
                        Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "The \"before\" field of UPDATE/DELETE message is null, please check the Postgres table has been set REPLICA IDENTITY to FULL level.").isPresent());
                    }
                } catch (Throwable th4) {
                    th2 = th4;
                    throw th4;
                }
            } catch (Throwable th5) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th5;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th7) {
                        th.addSuppressed(th7);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testAllTypes() throws Throwable {
        initializePostgresTable(POSTGRES_CONTAINER, "column_type_test");
        this.tEnv.executeSql(String.format("CREATE TABLE full_types (    id INTEGER NOT NULL,    bytea_c BYTES,    small_c SMALLINT,    int_c INTEGER,    big_c BIGINT,    real_c FLOAT,    double_precision DOUBLE,    numeric_c DECIMAL(10, 5),    decimal_c DECIMAL(10, 1),    boolean_c BOOLEAN,    text_c STRING,    char_c STRING,    character_c STRING,    character_varying_c STRING,    timestamp3_c TIMESTAMP(3),    timestamp6_c TIMESTAMP(6),    date_c DATE,    time_c TIME(0),    default_numeric_c DECIMAL,    geography_c STRING,    geometry_c STRING) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'slot.name' = '%s')", POSTGRES_CONTAINER.getHost(), POSTGRES_CONTAINER.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()), POSTGRES_CONTAINER.getUsername(), POSTGRES_CONTAINER.getPassword(), POSTGRES_CONTAINER.getDatabaseName(), "inventory", "full_types", Boolean.valueOf(this.parallelismSnapshot), getSlotName()));
        this.tEnv.executeSql("CREATE TABLE sink (    id INTEGER NOT NULL,    bytea_c BYTES,    small_c SMALLINT,    int_c INTEGER,    big_c BIGINT,    real_c FLOAT,    double_precision DOUBLE,    numeric_c DECIMAL(10, 5),    decimal_c DECIMAL(10, 1),    boolean_c BOOLEAN,    text_c STRING,    char_c STRING,    character_c STRING,    character_varying_c STRING,    timestamp3_c TIMESTAMP(3),    timestamp6_c TIMESTAMP(6),    date_c DATE,    time_c TIME(0),    default_numeric_c DECIMAL,    geography_c STRING,    geometry_c STRING,    PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM full_types");
        waitForSinkSize("sink", 1);
        Thread.sleep(5000L);
        Connection jdbcConnection = getJdbcConnection(POSTGRES_CONTAINER);
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("UPDATE inventory.full_types SET small_c=0 WHERE id=1;");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    waitForSinkSize("sink", 3);
                    Assert.assertEquals(Arrays.asList("+I(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})", "-D(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})", "+I(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})"), TestValuesTableFactory.getRawResults("sink"));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testMetadataColumns() throws Throwable {
        initializePostgresTable(POSTGRES_CONTAINER, "inventory");
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source  ( db_name STRING METADATA FROM 'database_name' VIRTUAL, schema_name STRING METADATA VIRTUAL, table_name STRING METADATA VIRTUAL, id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'slot.name' = '%s')", POSTGRES_CONTAINER.getHost(), POSTGRES_CONTAINER.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()), POSTGRES_CONTAINER.getUsername(), POSTGRES_CONTAINER.getPassword(), POSTGRES_CONTAINER.getDatabaseName(), "inventory", "products", Boolean.valueOf(this.parallelismSnapshot), getSlotName()));
        this.tEnv.executeSql("CREATE TABLE sink ( database_name STRING, schema_name STRING, table_name STRING, id INT, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        waitForSnapshotStarted("sink");
        Thread.sleep(5000L);
        Connection jdbcConnection = getJdbcConnection(POSTGRES_CONTAINER);
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;");
                    createStatement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;");
                    createStatement.execute("INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                    createStatement.execute("INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                    createStatement.execute("UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                    createStatement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;");
                    createStatement.execute("DELETE FROM inventory.products WHERE id=111;");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    waitForSinkSize("sink", 16);
                    String databaseName = POSTGRES_CONTAINER.getDatabaseName();
                    List asList = Arrays.asList("+I(" + databaseName + ",inventory,products,101,scooter,Small 2-wheel scooter,3.140)", "+I(" + databaseName + ",inventory,products,102,car battery,12V car battery,8.100)", "+I(" + databaseName + ",inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)", "+I(" + databaseName + ",inventory,products,104,hammer,12oz carpenter's hammer,0.750)", "+I(" + databaseName + ",inventory,products,105,hammer,14oz carpenter's hammer,0.875)", "+I(" + databaseName + ",inventory,products,106,hammer,16oz carpenter's hammer,1.000)", "+I(" + databaseName + ",inventory,products,107,rocks,box of assorted rocks,5.300)", "+I(" + databaseName + ",inventory,products,108,jacket,water resistent black wind breaker,0.100)", "+I(" + databaseName + ",inventory,products,109,spare tire,24 inch spare tire,22.200)", "+I(" + databaseName + ",inventory,products,110,jacket,water resistent white wind breaker,0.200)", "+I(" + databaseName + ",inventory,products,111,scooter,Big 2-wheel scooter ,5.180)", "+U(" + databaseName + ",inventory,products,106,hammer,18oz carpenter hammer,1.000)", "+U(" + databaseName + ",inventory,products,107,rocks,box of assorted rocks,5.100)", "+U(" + databaseName + ",inventory,products,110,jacket,new water resistent white wind breaker,0.500)", "+U(" + databaseName + ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)", "-D(" + databaseName + ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)");
                    List rawResults = TestValuesTableFactory.getRawResults("sink");
                    Collections.sort(rawResults);
                    Collections.sort(asList);
                    Assert.assertEquals(asList, rawResults);
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testUpsertMode() throws Exception {
        initializePostgresTable(POSTGRES_CONTAINER, "replica_identity");
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s', 'slot.name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'changelog-mode' = '%s')", POSTGRES_CONTAINER.getHost(), POSTGRES_CONTAINER.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()), POSTGRES_CONTAINER.getUsername(), POSTGRES_CONTAINER.getPassword(), POSTGRES_CONTAINER.getDatabaseName(), "inventory", "products", getSlotName(), Boolean.valueOf(this.parallelismSnapshot), "upsert"));
        this.tEnv.executeSql("CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
        waitForSnapshotStarted("sink");
        Thread.sleep(5000L);
        Connection jdbcConnection = getJdbcConnection(POSTGRES_CONTAINER);
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;");
                    createStatement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;");
                    createStatement.execute("INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                    createStatement.execute("INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                    createStatement.execute("UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                    createStatement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;");
                    createStatement.execute("DELETE FROM inventory.products WHERE id=111;");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    waitForSinkSize("sink", 20);
                    Assert.assertThat(TestValuesTableFactory.getResults("sink"), Matchers.containsInAnyOrder(new String[]{"scooter,3.140", "car battery,8.100", "12-pack drill bits,0.800", "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200"}));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }
}
