package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockClientSupplier;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsNot;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(EasyMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.class */
public class ActiveTaskCreatorTest {

    @Mock(type = MockType.NICE)
    private InternalTopologyBuilder builder;

    @Mock(type = MockType.NICE)
    private StateDirectory stateDirectory;

    @Mock(type = MockType.NICE)
    private ChangelogReader changeLogReader;
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "latest", new MockTime());
    private final Map<String, Object> properties = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "appId"), Utils.mkEntry("bootstrap.servers", "dummy:1234")});
    final UUID uuid = UUID.randomUUID();
    private ActiveTaskCreator activeTaskCreator;

    @Test
    public void shouldConstructProducerMetricsWithEosDisabled() {
        shouldConstructThreadProducerMetric();
    }

    @Test
    public void shouldConstructClientIdWithEosDisabled() {
        createTasks();
        MatcherAssert.assertThat(this.activeTaskCreator.producerClientIds(), CoreMatchers.is(Collections.singleton("clientId-StreamThread-0-producer")));
    }

    @Test
    public void shouldCloseThreadProducerIfEosDisabled() {
        createTasks();
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        MatcherAssert.assertThat(Boolean.valueOf(this.mockClientSupplier.producers.get(0).closed()), CoreMatchers.is(true));
    }

    @Test
    public void shouldNoOpCloseTaskProducerIfEosDisabled() {
        createTasks();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1));
        MatcherAssert.assertThat(Boolean.valueOf(this.mockClientSupplier.producers.get(0).closed()), CoreMatchers.is(false));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldReturnBlockedTimeWhenThreadProducer() {
        createTasks();
        addMetric(this.mockClientSupplier.producers.get(0), "flush-time-ns-total", 123.0d);
        MatcherAssert.assertThat(Double.valueOf(this.activeTaskCreator.totalProducerBlockedTime()), Matchers.closeTo(123.0d, 0.01d));
    }

    @Test
    public void shouldFailOnStreamsProducerPerTaskIfEosDisabled() {
        createTasks();
        MatcherAssert.assertThat(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            this.activeTaskCreator.streamsProducerForTask((TaskId) null);
        })).getMessage(), CoreMatchers.is("Expected EXACTLY_ONCE to be enabled, but the processing mode was AT_LEAST_ONCE"));
    }

    @Test
    public void shouldFailOnGetThreadProducerIfEosDisabled() {
        createTasks();
        ActiveTaskCreator activeTaskCreator = this.activeTaskCreator;
        activeTaskCreator.getClass();
        MatcherAssert.assertThat(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, activeTaskCreator::threadProducer)).getMessage(), CoreMatchers.is("Expected EXACTLY_ONCE_V2 to be enabled, but the processing mode was AT_LEAST_ONCE"));
    }

    @Test
    public void shouldThrowStreamsExceptionOnErrorCloseThreadProducerIfEosDisabled() {
        createTasks();
        this.mockClientSupplier.producers.get(0).closeException = new RuntimeException("KABOOM!");
        ActiveTaskCreator activeTaskCreator = this.activeTaskCreator;
        activeTaskCreator.getClass();
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, activeTaskCreator::closeThreadProducerIfNeeded);
        MatcherAssert.assertThat(assertThrows.getMessage(), CoreMatchers.is("Thread producer encounter error trying to close."));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), CoreMatchers.is("KABOOM!"));
    }

    @Test
    public void shouldReturnStreamsProducerPerTaskIfEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        shouldReturnStreamsProducerPerTask();
    }

    @Test
    public void shouldConstructProducerMetricsWithEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        shouldConstructProducerMetricsPerTask();
    }

    @Test
    public void shouldConstructClientIdWithEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        MatcherAssert.assertThat(this.activeTaskCreator.producerClientIds(), CoreMatchers.is(Utils.mkSet(new String[]{"clientId-StreamThread-0-0_0-producer", "clientId-StreamThread-0-0_1-producer"})));
    }

    @Test
    public void shouldNoOpCloseThreadProducerIfEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        MatcherAssert.assertThat(Boolean.valueOf(this.mockClientSupplier.producers.get(0).closed()), CoreMatchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(this.mockClientSupplier.producers.get(1).closed()), CoreMatchers.is(false));
    }

    @Test
    public void shouldCloseTaskProducersIfEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 2));
        MatcherAssert.assertThat(Boolean.valueOf(this.mockClientSupplier.producers.get(0).closed()), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(this.mockClientSupplier.producers.get(1).closed()), CoreMatchers.is(true));
        this.mockClientSupplier.producers.get(0).closeException = new RuntimeException("KABOOM!");
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldReturnBlockedTimeWhenTaskProducers() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        double d = 0.0d;
        double d2 = 1.0d;
        Iterator<MockProducer<byte[], byte[]>> it = this.mockClientSupplier.producers.iterator();
        while (it.hasNext()) {
            addMetric(it.next(), "flush-time-ns-total", d2);
            d += d2;
            d2 += 1.0d;
        }
        MatcherAssert.assertThat(Double.valueOf(this.activeTaskCreator.totalProducerBlockedTime()), Matchers.closeTo(d, 0.01d));
    }

    @Test
    public void shouldFailForUnknownTaskOnStreamsProducerPerTaskIfEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        MatcherAssert.assertThat(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            this.activeTaskCreator.streamsProducerForTask((TaskId) null);
        })).getMessage(), CoreMatchers.is("Unknown TaskId: null"));
        MatcherAssert.assertThat(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            this.activeTaskCreator.streamsProducerForTask(new TaskId(0, 2));
        })).getMessage(), CoreMatchers.is("Unknown TaskId: 0_2"));
    }

    @Test
    public void shouldFailOnGetThreadProducerIfEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        ActiveTaskCreator activeTaskCreator = this.activeTaskCreator;
        activeTaskCreator.getClass();
        MatcherAssert.assertThat(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, activeTaskCreator::threadProducer)).getMessage(), CoreMatchers.is("Expected EXACTLY_ONCE_V2 to be enabled, but the processing mode was EXACTLY_ONCE_ALPHA"));
    }

    @Test
    public void shouldThrowStreamsExceptionOnErrorCloseTaskProducerIfEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        this.mockClientSupplier.producers.get(0).closeException = new RuntimeException("KABOOM!");
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
        });
        MatcherAssert.assertThat(assertThrows.getMessage(), CoreMatchers.is("[0_0] task producer encounter error trying to close."));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), CoreMatchers.is("KABOOM!"));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
    }

    @Test
    public void shouldReturnThreadProducerIfEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        StreamsProducer threadProducer = this.activeTaskCreator.threadProducer();
        MatcherAssert.assertThat(Integer.valueOf(this.mockClientSupplier.producers.size()), CoreMatchers.is(1));
        MatcherAssert.assertThat(threadProducer.kafkaProducer(), CoreMatchers.is(this.mockClientSupplier.producers.get(0)));
    }

    @Test
    public void shouldConstructProducerMetricsWithEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        shouldConstructThreadProducerMetric();
    }

    @Test
    public void shouldConstructClientIdWithEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        MatcherAssert.assertThat(this.activeTaskCreator.producerClientIds(), CoreMatchers.is(Collections.singleton("clientId-StreamThread-0-producer")));
    }

    @Test
    public void shouldCloseThreadProducerIfEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        MatcherAssert.assertThat(Boolean.valueOf(this.mockClientSupplier.producers.get(0).closed()), CoreMatchers.is(true));
    }

    @Test
    public void shouldNoOpCloseTaskProducerIfEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1));
        MatcherAssert.assertThat(Boolean.valueOf(this.mockClientSupplier.producers.get(0).closed()), CoreMatchers.is(false));
    }

    @Test
    public void shouldFailOnStreamsProducerPerTaskIfEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        MatcherAssert.assertThat(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            this.activeTaskCreator.streamsProducerForTask((TaskId) null);
        })).getMessage(), CoreMatchers.is("Expected EXACTLY_ONCE to be enabled, but the processing mode was EXACTLY_ONCE_V2"));
    }

    @Test
    public void shouldThrowStreamsExceptionOnErrorCloseThreadProducerIfEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        this.mockClientSupplier.producers.get(0).closeException = new RuntimeException("KABOOM!");
        ActiveTaskCreator activeTaskCreator = this.activeTaskCreator;
        activeTaskCreator.getClass();
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, activeTaskCreator::closeThreadProducerIfNeeded);
        MatcherAssert.assertThat(assertThrows.getMessage(), CoreMatchers.is("Thread producer encounter error trying to close."));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), CoreMatchers.is("KABOOM!"));
    }

    private void shouldReturnStreamsProducerPerTask() {
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        MatcherAssert.assertThat(this.activeTaskCreator.streamsProducerForTask(new TaskId(0, 0)), IsNot.not(CoreMatchers.is(this.activeTaskCreator.streamsProducerForTask(new TaskId(0, 1)))));
    }

    private void shouldConstructProducerMetricsPerTask() {
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        createTasks();
        MetricName metricName = new MetricName("test_metric_1", "", "", new HashMap());
        KafkaMetric kafkaMetric = new KafkaMetric(new Object(), metricName, (metricConfig, j) -> {
            return 0.0d;
        }, (MetricConfig) null, new MockTime());
        this.mockClientSupplier.producers.get(0).setMockMetrics(metricName, kafkaMetric);
        MetricName metricName2 = new MetricName("test_metric_2", "", "", new HashMap());
        KafkaMetric kafkaMetric2 = new KafkaMetric(new Object(), metricName2, (metricConfig2, j2) -> {
            return 0.0d;
        }, (MetricConfig) null, new MockTime());
        this.mockClientSupplier.producers.get(0).setMockMetrics(metricName2, kafkaMetric2);
        MatcherAssert.assertThat(this.activeTaskCreator.producerMetrics(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(metricName, kafkaMetric), Utils.mkEntry(metricName2, kafkaMetric2)})));
    }

    private void shouldConstructThreadProducerMetric() {
        createTasks();
        MetricName metricName = new MetricName("test_metric", "", "", new HashMap());
        KafkaMetric kafkaMetric = new KafkaMetric(new Object(), metricName, (metricConfig, j) -> {
            return 0.0d;
        }, (MetricConfig) null, new MockTime());
        this.mockClientSupplier.producers.get(0).setMockMetrics(metricName, kafkaMetric);
        MatcherAssert.assertThat(Integer.valueOf(this.mockClientSupplier.producers.size()), CoreMatchers.is(1));
        Map producerMetrics = this.activeTaskCreator.producerMetrics();
        MatcherAssert.assertThat(Integer.valueOf(producerMetrics.size()), CoreMatchers.is(1));
        MatcherAssert.assertThat(producerMetrics.get(metricName), CoreMatchers.is(kafkaMetric));
    }

    private void createTasks() {
        TaskId taskId = new TaskId(0, 0);
        TaskId taskId2 = new TaskId(0, 1);
        ProcessorTopology processorTopology = (ProcessorTopology) EasyMock.mock(ProcessorTopology.class);
        SourceNode sourceNode = (SourceNode) EasyMock.mock(SourceNode.class);
        EasyMock.reset(new Object[]{this.builder, this.stateDirectory});
        EasyMock.expect(this.builder.topologyConfigs()).andStubReturn(new TopologyConfig(new StreamsConfig(this.properties)));
        EasyMock.expect(this.builder.buildSubtopology(0)).andReturn(processorTopology).anyTimes();
        EasyMock.expect(processorTopology.sinkTopics()).andStubReturn(Collections.emptySet());
        EasyMock.expect(this.stateDirectory.getOrCreateDirectoryForTask(taskId)).andReturn(EasyMock.mock(File.class));
        EasyMock.expect(this.stateDirectory.checkpointFileFor(taskId)).andReturn(EasyMock.mock(File.class));
        EasyMock.expect(this.stateDirectory.getOrCreateDirectoryForTask(taskId2)).andReturn(EasyMock.mock(File.class));
        EasyMock.expect(this.stateDirectory.checkpointFileFor(taskId2)).andReturn(EasyMock.mock(File.class));
        EasyMock.expect(processorTopology.storeToChangelogTopic()).andReturn(Collections.emptyMap()).anyTimes();
        EasyMock.expect(processorTopology.source("topic")).andReturn(sourceNode).anyTimes();
        EasyMock.expect(sourceNode.getTimestampExtractor()).andReturn(EasyMock.mock(TimestampExtractor.class)).anyTimes();
        EasyMock.expect(processorTopology.globalStateStores()).andReturn(Collections.emptyList()).anyTimes();
        EasyMock.expect(processorTopology.terminalNodes()).andStubReturn(Collections.singleton(sourceNode.name()));
        EasyMock.expect(processorTopology.sources()).andStubReturn(Collections.singleton(sourceNode));
        EasyMock.replay(new Object[]{this.builder, this.stateDirectory, processorTopology, sourceNode});
        StreamsConfig streamsConfig = new StreamsConfig(this.properties);
        this.activeTaskCreator = new ActiveTaskCreator(new TopologyMetadata(this.builder, streamsConfig), streamsConfig, this.streamsMetrics, this.stateDirectory, this.changeLogReader, new ThreadCache(new LogContext(), 0L, this.streamsMetrics), new MockTime(), this.mockClientSupplier, "clientId-StreamThread-0", this.uuid, new LogContext().logger(ActiveTaskCreator.class), false);
        MatcherAssert.assertThat(this.activeTaskCreator.createTasks(this.mockClientSupplier.consumer, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(taskId, Collections.singleton(new TopicPartition("topic", 0))), Utils.mkEntry(taskId2, Collections.singleton(new TopicPartition("topic", 1)))})).stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet()), CoreMatchers.equalTo(Utils.mkSet(new TaskId[]{taskId, taskId2})));
    }

    private void addMetric(MockProducer<?, ?> mockProducer, String str, final double d) {
        final MetricName metricName = metricName(str);
        mockProducer.setMockMetrics(metricName, new Metric() { // from class: org.apache.kafka.streams.processor.internals.ActiveTaskCreatorTest.1
            public MetricName metricName() {
                return metricName;
            }

            public Object metricValue() {
                return Double.valueOf(d);
            }
        });
    }

    private MetricName metricName(String str) {
        return new MetricName(str, "", "", Collections.emptyMap());
    }
}
