package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.testutil.ConsumerRecordUtil;
import org.apache.kafka.test.GlobalStateManagerStub;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.NoOpProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.class */
public class GlobalStateTaskTest {
    private final LogContext logContext = new LogContext();
    private final String topic1 = "t1";
    private final String topic2 = "t2";
    private final TopicPartition t1 = new TopicPartition("t1", 1);
    private final TopicPartition t2 = new TopicPartition("t2", 1);
    private final MockSourceNode<String, String> sourceOne = new MockSourceNode<>(new StringDeserializer(), new StringDeserializer());
    private final MockSourceNode<Integer, Integer> sourceTwo = new MockSourceNode<>(new IntegerDeserializer(), new IntegerDeserializer());
    private final MockProcessorNode<?, ?, ?, ?> processorOne = new MockProcessorNode<>();
    private final MockProcessorNode<?, ?, ?, ?> processorTwo = new MockProcessorNode<>();
    private final Map<TopicPartition, Long> offsets = new HashMap();
    private File testDirectory = TestUtils.tempDirectory("global-store");
    private final NoOpProcessorContext context = new NoOpProcessorContext();
    private ProcessorTopology topology;
    private GlobalStateManagerStub stateMgr;
    private GlobalStateUpdateTask globalStateTask;

    @Before
    public void before() {
        Set mkSet = Utils.mkSet(new String[]{"t1-store", "t2-store"});
        HashMap hashMap = new HashMap();
        hashMap.put("t1", this.sourceOne);
        hashMap.put("t2", this.sourceTwo);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("t1-store", "t1");
        hashMap2.put("t2-store", "t2");
        this.topology = ProcessorTopologyFactories.with(Arrays.asList(this.sourceOne, this.sourceTwo, this.processorOne, this.processorTwo), hashMap, Collections.emptyList(), hashMap2);
        this.offsets.put(this.t1, 50L);
        this.offsets.put(this.t2, 100L);
        this.stateMgr = new GlobalStateManagerStub(mkSet, this.offsets, this.testDirectory);
        this.globalStateTask = new GlobalStateUpdateTask(this.logContext, this.topology, this.context, this.stateMgr, new LogAndFailExceptionHandler());
    }

    @Test
    public void shouldInitializeStateManager() {
        Map initialize = this.globalStateTask.initialize();
        Assert.assertTrue(this.stateMgr.initialized);
        Assert.assertEquals(this.offsets, initialize);
    }

    @Test
    public void shouldInitializeContext() {
        this.globalStateTask.initialize();
        Assert.assertTrue(this.context.initialized);
    }

    @Test
    public void shouldInitializeProcessorTopology() {
        this.globalStateTask.initialize();
        Assert.assertTrue(this.sourceOne.initialized);
        Assert.assertTrue(this.sourceTwo.initialized);
        Assert.assertTrue(this.processorOne.initialized);
        Assert.assertTrue(this.processorTwo.initialized);
    }

    @Test
    public void shouldProcessRecordsForTopic() {
        this.globalStateTask.initialize();
        this.globalStateTask.update(ConsumerRecordUtil.record("t1", 1, 1L, "foo".getBytes(), "bar".getBytes()));
        Assert.assertEquals(1L, this.sourceOne.numReceived);
        Assert.assertEquals(0L, this.sourceTwo.numReceived);
    }

    @Test
    public void shouldProcessRecordsForOtherTopic() {
        byte[] serialize = new IntegerSerializer().serialize("foo", 1);
        this.globalStateTask.initialize();
        this.globalStateTask.update(ConsumerRecordUtil.record("t2", 1, 1L, serialize, serialize));
        Assert.assertEquals(1L, this.sourceTwo.numReceived);
        Assert.assertEquals(0L, this.sourceOne.numReceived);
    }

    private void maybeDeserialize(GlobalStateUpdateTask globalStateUpdateTask, byte[] bArr, byte[] bArr2, boolean z) {
        ConsumerRecord consumerRecord = new ConsumerRecord("t2", 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, bArr, bArr2, new RecordHeaders(), Optional.empty());
        globalStateUpdateTask.initialize();
        try {
            globalStateUpdateTask.update(consumerRecord);
            if (z) {
                Assert.fail("Should have failed to deserialize.");
            }
        } catch (StreamsException e) {
            if (z) {
                return;
            }
            Assert.fail("Shouldn't have failed to deserialize.");
        }
    }

    @Test
    public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
        maybeDeserialize(this.globalStateTask, new LongSerializer().serialize("t2", 1L), new IntegerSerializer().serialize("t2", 10), true);
    }

    @Test
    public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
        maybeDeserialize(this.globalStateTask, new IntegerSerializer().serialize("t2", 1), new LongSerializer().serialize("t2", 10L), true);
    }

    @Test
    public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
        maybeDeserialize(new GlobalStateUpdateTask(this.logContext, this.topology, this.context, this.stateMgr, new LogAndContinueExceptionHandler()), new LongSerializer().serialize("t2", 1L), new IntegerSerializer().serialize("t2", 10), false);
    }

    @Test
    public void shouldNotThrowStreamsExceptionWhenValueDeserializationFails() {
        maybeDeserialize(new GlobalStateUpdateTask(this.logContext, this.topology, this.context, this.stateMgr, new LogAndContinueExceptionHandler()), new IntegerSerializer().serialize("t2", 1), new LongSerializer().serialize("t2", 10L), false);
    }

    @Test
    public void shouldFlushStateManagerWithOffsets() {
        HashMap hashMap = new HashMap();
        hashMap.put(this.t1, 52L);
        hashMap.put(this.t2, 100L);
        this.globalStateTask.initialize();
        this.globalStateTask.update(ConsumerRecordUtil.record("t1", 1, 51L, "foo".getBytes(), "foo".getBytes()));
        this.globalStateTask.flushState();
        Assert.assertEquals(hashMap, this.stateMgr.changelogOffsets());
    }

    @Test
    public void shouldCheckpointOffsetsWhenStateIsFlushed() {
        HashMap hashMap = new HashMap();
        hashMap.put(this.t1, 102L);
        hashMap.put(this.t2, 100L);
        this.globalStateTask.initialize();
        this.globalStateTask.update(ConsumerRecordUtil.record("t1", 1, 101L, "foo".getBytes(), "foo".getBytes()));
        this.globalStateTask.flushState();
        MatcherAssert.assertThat(this.stateMgr.changelogOffsets(), CoreMatchers.equalTo(hashMap));
    }

    @Test
    public void shouldWipeGlobalStateDirectory() throws Exception {
        Assert.assertTrue(this.stateMgr.baseDir().exists());
        this.globalStateTask.close(true);
        Assert.assertFalse(this.stateMgr.baseDir().exists());
    }
}
