package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.class */
public class StandbyTaskEOSIntegrationTest {
    private static final int KEY_0 = 0;
    private static final int KEY_1 = 1;

    @Parameterized.Parameter
    public String eosConfig;
    private String appId;
    private String inputTopic;
    private String storeName;
    private String outputTopic;
    private KafkaStreams streamInstanceOne;
    private KafkaStreams streamInstanceTwo;
    private KafkaStreams streamInstanceOneRecovery;
    private static final long REBALANCE_TIMEOUT = Duration.ofMinutes(2).toMillis();
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);
    private final AtomicBoolean skipRecord = new AtomicBoolean(false);

    @Rule
    public TestName testName = new TestName();

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "{0}")
    public static Collection<String[]> data() {
        return Arrays.asList(new String[]{"exactly_once"}, new String[]{"exactly_once_v2"});
    }

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void createTopics() throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.appId = "app-" + safeUniqueTestName;
        this.inputTopic = "input-" + safeUniqueTestName;
        this.outputTopic = "output-" + safeUniqueTestName;
        this.storeName = "store-" + safeUniqueTestName;
        CLUSTER.deleteTopicsAndWait(this.inputTopic, this.outputTopic, this.appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
        CLUSTER.createTopic(this.inputTopic, 1, 3);
        CLUSTER.createTopic(this.outputTopic, 1, 3);
    }

    @After
    public void cleanUp() {
        if (this.streamInstanceOne != null) {
            this.streamInstanceOne.close();
        }
        if (this.streamInstanceTwo != null) {
            this.streamInstanceTwo.close();
        }
        if (this.streamInstanceOneRecovery != null) {
            this.streamInstanceOneRecovery.close();
        }
    }

    @Test
    public void shouldSurviveWithOneTaskAsStandby() throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue(Integer.valueOf(KEY_0), Integer.valueOf(KEY_0))), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, new Properties()), 10L);
        String path = TestUtils.tempDirectory(this.appId).getPath();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.streamInstanceOne = buildStreamWithDirtyStateDir(path + "/" + this.appId + "-1/", countDownLatch);
        this.streamInstanceTwo = buildStreamWithDirtyStateDir(path + "/" + this.appId + "-2/", countDownLatch);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(this.streamInstanceOne, this.streamInstanceTwo), Duration.ofSeconds(60L));
        Assert.assertTrue(countDownLatch.await(15L, TimeUnit.SECONDS));
        this.streamInstanceOne.close();
        this.streamInstanceTwo.close();
        this.streamInstanceOne.cleanUp();
        this.streamInstanceTwo.cleanUp();
    }

    private KafkaStreams buildStreamWithDirtyStateDir(String str, CountDownLatch countDownLatch) throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        TaskId taskId = new TaskId(KEY_0, KEY_0);
        Properties props = props(str);
        StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true, false);
        new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(taskId), ".checkpoint")).write(Collections.singletonMap(new TopicPartition("unknown-topic", KEY_0), 5L));
        Assert.assertTrue(new File(stateDirectory.getOrCreateDirectoryForTask(taskId), "rocksdb/KSTREAM-AGGREGATE-STATE-STORE-0000000001").mkdirs());
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.Integer())).groupByKey().count().toStream().peek((num, l) -> {
            countDownLatch.countDown();
        });
        return new KafkaStreams(streamsBuilder.build(), props);
    }

    @Test
    public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        String path = TestUtils.tempDirectory(this.appId).getPath();
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue(Integer.valueOf(KEY_0), Integer.valueOf(KEY_0))), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, new Properties()), Long.valueOf(10 + currentTimeMillis));
        this.streamInstanceOne = buildWithDeduplicationTopology(path + "-1");
        this.streamInstanceTwo = buildWithDeduplicationTopology(path + "-2");
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(this.streamInstanceOne);
        IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputTopic, 1);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(this.streamInstanceTwo);
        TestUtils.waitForCondition(() -> {
            return ((ReadOnlyKeyValueStore) this.streamInstanceTwo.store(StoreQueryParameters.fromNameAndType(this.storeName, QueryableStoreTypes.keyValueStore()).enableStaleStores())).get(Integer.valueOf(KEY_0)) != null;
        }, REBALANCE_TIMEOUT, "Could not get key from standby store");
        TestUtils.waitForCondition(() -> {
            return ((ReadOnlyKeyValueStore) this.streamInstanceOne.store(StoreQueryParameters.fromNameAndType(this.storeName, QueryableStoreTypes.keyValueStore()))).get(Integer.valueOf(KEY_0)) != null;
        }, "Could not get key from main store");
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue(1, Integer.valueOf(KEY_0))), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, new Properties()), Long.valueOf(10 + currentTimeMillis));
        TestUtils.waitForCondition(() -> {
            return this.streamInstanceOne.state() == KafkaStreams.State.ERROR;
        }, "Stream instance 1 did not go into error state");
        this.streamInstanceOne.close();
        IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputTopic, 2);
        this.streamInstanceOneRecovery = buildWithDeduplicationTopology(path + "-1");
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(this.streamInstanceOneRecovery);
        TestUtils.waitForCondition(() -> {
            return ((ReadOnlyKeyValueStore) this.streamInstanceOneRecovery.store(StoreQueryParameters.fromNameAndType(this.storeName, QueryableStoreTypes.keyValueStore()).enableStaleStores())).get(Integer.valueOf(KEY_0)) != null;
        }, "Could not get key from recovered standby store");
        this.streamInstanceTwo.close();
        TestUtils.waitForCondition(() -> {
            return ((ReadOnlyKeyValueStore) this.streamInstanceOneRecovery.store(StoreQueryParameters.fromNameAndType(this.storeName, QueryableStoreTypes.keyValueStore()))).get(Integer.valueOf(KEY_0)) != null;
        }, REBALANCE_TIMEOUT, "Could not get key from recovered main store");
        this.skipRecord.set(false);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue(1, Integer.valueOf(KEY_0))), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, new Properties()), Long.valueOf(10 + currentTimeMillis));
        TestUtils.waitForCondition(() -> {
            return this.streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR;
        }, "Stream instance 1 did not go into error state. Is in " + this.streamInstanceOneRecovery.state() + " state.");
    }

    private KafkaStreams buildWithDeduplicationTopology(String str) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(this.storeName), Serdes.Integer(), Serdes.Integer()));
        streamsBuilder.stream(this.inputTopic).transform(() -> {
            return new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() { // from class: org.apache.kafka.streams.integration.StandbyTaskEOSIntegrationTest.1
                private KeyValueStore store;

                public void init(ProcessorContext processorContext) {
                    this.store = processorContext.getStateStore(StandbyTaskEOSIntegrationTest.this.storeName);
                }

                public KeyValue<Integer, Integer> transform(Integer num, Integer num2) {
                    if (StandbyTaskEOSIntegrationTest.this.skipRecord.get()) {
                        return KeyValue.pair(num, num2);
                    }
                    if (this.store.get(num) != null) {
                        return null;
                    }
                    this.store.put(num, num2);
                    this.store.flush();
                    if (num.intValue() != 1) {
                        return KeyValue.pair(num, num2);
                    }
                    StandbyTaskEOSIntegrationTest.this.skipRecord.set(true);
                    throw new RuntimeException("Injected test error");
                }

                public void close() {
                }
            };
        }, new String[]{this.storeName}).to(this.outputTopic);
        return new KafkaStreams(streamsBuilder.build(), props(str));
    }

    private Properties props(String str) {
        Properties properties = new Properties();
        properties.put("application.id", this.appId);
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("statestore.cache.max.bytes", Integer.valueOf(KEY_0));
        properties.put("state.dir", str);
        properties.put("num.standby.replicas", 1);
        properties.put("processing.guarantee", this.eosConfig);
        properties.put("default.key.serde", Serdes.Integer().getClass());
        properties.put("default.value.serde", Serdes.Integer().getClass());
        properties.put("commit.interval.ms", 1000L);
        properties.put("acceptable.recovery.lag", Integer.valueOf(KEY_0));
        properties.put("auto.offset.reset", "earliest");
        return properties;
    }
}
