package com.ververica.cdc.connectors.postgres.table;

import com.ververica.cdc.connectors.postgres.PostgresTestBase;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.util.ExceptionUtils;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.testcontainers.containers.PostgreSQLContainer;

/* loaded from: input_file:com/ververica/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.class */
public class PostgreSQLSavepointITCase extends PostgresTestBase {
    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
    }

    @Test
    public void testSavepoint() throws Exception {
        testRestartFromSavepoint();
    }

    private void testRestartFromSavepoint() throws Exception {
        initializePostgresTable(POSTGRES_CONTAINER, "inventory");
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String uri = temporaryFolder.newFolder().toURI().toString();
        StreamTableEnvironment create = StreamTableEnvironment.create(getStreamExecutionEnvironment(null, 4));
        String format = String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = 'true', 'scan.incremental.snapshot.chunk.size' = '2', 'slot.name' = '%s')", POSTGRES_CONTAINER.getHost(), POSTGRES_CONTAINER.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()), POSTGRES_CONTAINER.getUsername(), POSTGRES_CONTAINER.getPassword(), POSTGRES_CONTAINER.getDatabaseName(), "inventory", "products", getSlotName());
        create.executeSql(format);
        create.executeSql("CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)");
        JobClient jobClient = (JobClient) create.executeSql("INSERT INTO sink SELECT * FROM debezium_source").getJobClient().get();
        Connection jdbcConnection = getJdbcConnection(POSTGRES_CONTAINER);
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                    createStatement.execute("INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                    createStatement.execute("UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                    createStatement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;");
                    createStatement.execute("DELETE FROM inventory.products WHERE id=111;");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    Thread.sleep(10000L);
                    waitForSinkResult("sink", Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140]", "+I[102, car battery, 12V car battery, 8.100]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", "+I[104, hammer, 12oz carpenter's hammer, 0.750]", "+I[105, hammer, 14oz carpenter's hammer, 0.875]", "+I[106, hammer, 16oz carpenter's hammer, 1.000]", "+I[107, rocks, box of assorted rocks, 5.300]", "+I[108, jacket, water resistent black wind breaker, 0.100]", "+I[109, spare tire, 24 inch spare tire, 22.200]", "+I[110, jacket, new water resistent white wind breaker, 0.500]"));
                    String triggerSavepointWithRetry = triggerSavepointWithRetry(jobClient, uri);
                    jobClient.cancel().get();
                    StreamTableEnvironment create2 = StreamTableEnvironment.create(getStreamExecutionEnvironment(triggerSavepointWithRetry, 4));
                    Connection jdbcConnection2 = getJdbcConnection(POSTGRES_CONTAINER);
                    Throwable th4 = null;
                    try {
                        createStatement = jdbcConnection2.createStatement();
                        Throwable th5 = null;
                        try {
                            try {
                                createStatement.execute("INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                                createStatement.execute("INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                                createStatement.execute("UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=112;");
                                createStatement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=113;");
                                createStatement.execute("DELETE FROM inventory.products WHERE id=113;");
                                if (createStatement != null) {
                                    if (0 != 0) {
                                        try {
                                            createStatement.close();
                                        } catch (Throwable th6) {
                                            th5.addSuppressed(th6);
                                        }
                                    } else {
                                        createStatement.close();
                                    }
                                }
                                create2.executeSql(format);
                                create2.executeSql("CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)");
                                JobClient jobClient2 = (JobClient) create2.executeSql("INSERT INTO sink SELECT * FROM debezium_source").getJobClient().get();
                                waitForSinkSize("sink", 15);
                                Assert.assertThat(TestValuesTableFactory.getResults("sink"), Matchers.containsInAnyOrder(new String[]{"+I[101, scooter, Small 2-wheel scooter, 3.140]", "+I[102, car battery, 12V car battery, 8.100]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", "+I[104, hammer, 12oz carpenter's hammer, 0.750]", "+I[105, hammer, 14oz carpenter's hammer, 0.875]", "+I[106, hammer, 16oz carpenter's hammer, 1.000]", "+I[107, rocks, box of assorted rocks, 5.300]", "+I[108, jacket, water resistent black wind breaker, 0.100]", "+I[109, spare tire, 24 inch spare tire, 22.200]", "+I[110, jacket, new water resistent white wind breaker, 0.500]", "+I[112, jacket, new water resistent white wind breaker, 0.500]"}));
                                jobClient2.cancel().get();
                                temporaryFolder.delete();
                            } finally {
                            }
                        } finally {
                        }
                    } finally {
                        if (jdbcConnection2 != null) {
                            if (0 != 0) {
                                try {
                                    jdbcConnection2.close();
                                } catch (Throwable th7) {
                                    th4.addSuppressed(th7);
                                }
                            } else {
                                jdbcConnection2.close();
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    private StreamExecutionEnvironment getStreamExecutionEnvironment(String str, int i) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        if (str != null) {
            Field declaredField = Thread.currentThread().getContextClassLoader().loadClass("org.apache.flink.streaming.api.environment.StreamExecutionEnvironment").getDeclaredField("configuration");
            declaredField.setAccessible(true);
            ((Configuration) declaredField.get(executionEnvironment)).setString(SavepointConfigOptions.SAVEPOINT_PATH, str);
        }
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        return executionEnvironment;
    }

    private String triggerSavepointWithRetry(JobClient jobClient, String str) throws ExecutionException, InterruptedException {
        for (int i = 0; i < 600; i++) {
            try {
                return (String) jobClient.triggerSavepoint(str).get();
            } catch (Exception e) {
                Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
                if (!findThrowable.isPresent() || !((CheckpointException) findThrowable.get()).getMessage().contains("Checkpoint triggering task")) {
                    throw e;
                }
                Thread.sleep(100L);
            }
        }
        return null;
    }
}
