package org.apache.kafka.streams.state.internals;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.VersionedBytesStore;
import org.apache.kafka.streams.state.VersionedRecord;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.class */
public class MeteredVersionedKeyValueStoreTest {
    private static final String STORE_NAME = "versioned_store";
    private static final String METRICS_SCOPE = "scope";
    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
    private static final String APPLICATION_ID = "test-app";
    private final VersionedBytesStore inner = (VersionedBytesStore) Mockito.mock(VersionedBytesStore.class);
    private final Metrics metrics = new Metrics();
    private final Time mockTime = new MockTime();
    private final String threadId = Thread.currentThread().getName();
    private InternalProcessorContext context = (InternalProcessorContext) Mockito.mock(InternalProcessorContext.class);
    private Map<String, String> tags;
    private MeteredVersionedKeyValueStore<String, String> store;
    private static final Serde<String> STRING_SERDE = new Serdes.StringSerde();
    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new ValueAndTimestampSerde(STRING_SERDE);
    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
    private static final String KEY = "k";
    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize((String) null, KEY));
    private static final String VALUE = "v";
    private static final byte[] RAW_VALUE = STRING_SERDE.serializer().serialize((String) null, VALUE);
    private static final long TIMESTAMP = 10;
    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer().serialize((String) null, ValueAndTimestamp.make(VALUE, TIMESTAMP));

    @Before
    public void setUp() {
        Mockito.when(this.inner.name()).thenReturn(STORE_NAME);
        Mockito.when(this.context.metrics()).thenReturn(new StreamsMetricsImpl(this.metrics, "test", "latest", this.mockTime));
        Mockito.when(this.context.applicationId()).thenReturn(APPLICATION_ID);
        Mockito.when(this.context.taskId()).thenReturn(TASK_ID);
        this.metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
        this.tags = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", TASK_ID.toString()), Utils.mkEntry("scope-state-id", STORE_NAME)});
        this.store = newMeteredStore(this.inner);
        this.store.init(this.context, this.store);
    }

    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(VersionedBytesStore versionedBytesStore) {
        return new MeteredVersionedKeyValueStore<>(versionedBytesStore, METRICS_SCOPE, this.mockTime, STRING_SERDE, STRING_SERDE);
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        this.store.close();
        VersionedBytesStore versionedBytesStore = (VersionedBytesStore) Mockito.mock(VersionedBytesStore.class);
        this.store = newMeteredStore(versionedBytesStore);
        this.store.init(this.context, this.store);
        ((VersionedBytesStore) Mockito.verify(versionedBytesStore)).init(this.context, this.store);
    }

    @Test
    public void shouldDelegateInit() {
        ((VersionedBytesStore) Mockito.verify(this.inner)).init(this.context, this.store);
    }

    @Test
    public void shouldPassChangelogTopicNameToStateStoreSerde() {
        Mockito.when(this.context.changelogFor(STORE_NAME)).thenReturn("changelog-topic");
        doShouldPassChangelogTopicNameToStateStoreSerde("changelog-topic");
    }

    @Test
    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, TASK_ID.topologyName());
        Mockito.when(this.context.changelogFor(STORE_NAME)).thenReturn((Object) null);
        doShouldPassChangelogTopicNameToStateStoreSerde(storeChangelogTopic);
    }

    private void doShouldPassChangelogTopicNameToStateStoreSerde(String str) {
        Serde serde = (Serde) Mockito.mock(Serde.class);
        Serializer serializer = (Serializer) Mockito.mock(Serializer.class);
        Serde serde2 = (Serde) Mockito.mock(Serde.class);
        Serializer serializer2 = (Serializer) Mockito.mock(Serializer.class);
        Deserializer deserializer = (Deserializer) Mockito.mock(Deserializer.class);
        Mockito.when(serde.serializer()).thenReturn(serializer);
        Mockito.when(serde2.serializer()).thenReturn(serializer2);
        Mockito.when(serde2.deserializer()).thenReturn(deserializer);
        this.store.close();
        this.store = new MeteredVersionedKeyValueStore<>(this.inner, METRICS_SCOPE, this.mockTime, serde, serde2);
        this.store.init(this.context, this.store);
        this.store.put(KEY, VALUE, TIMESTAMP);
        ((Serializer) Mockito.verify(serializer)).serialize(str, KEY);
        ((Serializer) Mockito.verify(serializer2)).serialize(str, VALUE);
    }

    @Test
    public void shouldRecordMetricsOnInit() {
        MatcherAssert.assertThat((Double) getMetric("restore-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldDelegateAndRecordMetricsOnPut() {
        Mockito.when(Long.valueOf(this.inner.put(RAW_KEY, RAW_VALUE, TIMESTAMP))).thenReturn(-1L);
        MatcherAssert.assertThat(Long.valueOf(this.store.put(KEY, VALUE, TIMESTAMP)), Matchers.is(-1L));
        MatcherAssert.assertThat((Double) getMetric("put-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldDelegateAndRecordMetricsOnDelete() {
        Mockito.when(this.inner.delete(RAW_KEY, TIMESTAMP)).thenReturn(RAW_VALUE_AND_TIMESTAMP);
        MatcherAssert.assertThat(this.store.delete(KEY, TIMESTAMP), Matchers.is(new VersionedRecord(VALUE, TIMESTAMP)));
        MatcherAssert.assertThat((Double) getMetric("delete-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldDelegateAndRecordMetricsOnGet() {
        Mockito.when(this.inner.get(RAW_KEY)).thenReturn(RAW_VALUE_AND_TIMESTAMP);
        MatcherAssert.assertThat(this.store.get(KEY), Matchers.is(new VersionedRecord(VALUE, TIMESTAMP)));
        MatcherAssert.assertThat((Double) getMetric("get-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() {
        Mockito.when(this.inner.get(RAW_KEY, TIMESTAMP)).thenReturn(RAW_VALUE_AND_TIMESTAMP);
        MatcherAssert.assertThat(this.store.get(KEY, TIMESTAMP), Matchers.is(new VersionedRecord(VALUE, TIMESTAMP)));
        MatcherAssert.assertThat((Double) getMetric("get-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldDelegateAndRecordMetricsOnFlush() {
        this.store.flush();
        ((VersionedBytesStore) Mockito.verify(this.inner)).flush();
        MatcherAssert.assertThat((Double) getMetric("flush-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldDelegateAndRemoveMetricsOnClose() {
        MatcherAssert.assertThat(storeMetrics(), Matchers.not(Matchers.empty()));
        this.store.close();
        ((VersionedBytesStore) Mockito.verify(this.inner)).close();
        MatcherAssert.assertThat(storeMetrics(), Matchers.empty());
    }

    @Test
    public void shouldRemoveMetricsOnCloseEvenIfInnerThrows() {
        ((VersionedBytesStore) Mockito.doThrow(new Throwable[]{new RuntimeException("uh oh")}).when(this.inner)).close();
        MatcherAssert.assertThat(storeMetrics(), Matchers.not(Matchers.empty()));
        Assert.assertThrows(RuntimeException.class, () -> {
            this.store.close();
        });
        MatcherAssert.assertThat(storeMetrics(), Matchers.empty());
    }

    @Test
    public void shouldNotSetFlushListenerIfInnerIsNotCaching() {
        MatcherAssert.assertThat(Boolean.valueOf(this.store.setFlushListener((CacheFlushListener) null, false)), Matchers.is(false));
    }

    @Test
    public void shouldThrowNullPointerOnPutIfKeyIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.store.put((Object) null, VALUE, TIMESTAMP);
        });
    }

    @Test
    public void shouldThrowNullPointerOnDeleteIfKeyIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.store.delete((Object) null, TIMESTAMP);
        });
    }

    @Test
    public void shouldThrowNullPointerOnGetIfKeyIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.store.get((Object) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnGetWithTimestampIfKeyIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.store.get((Object) null, TIMESTAMP);
        });
    }

    @Test
    public void shouldThrowOnIQv2RangeQuery() {
        Assert.assertThrows(UnsupportedOperationException.class, () -> {
            this.store.query((Query) Mockito.mock(RangeQuery.class), (PositionBound) null, (QueryConfig) null);
        });
    }

    @Test
    public void shouldThrowOnIQv2KeyQuery() {
        Assert.assertThrows(UnsupportedOperationException.class, () -> {
            this.store.query((Query) Mockito.mock(KeyQuery.class), (PositionBound) null, (QueryConfig) null);
        });
    }

    @Test
    public void shouldDelegateAndAddExecutionInfoOnCustomQuery() {
        Query query = (Query) Mockito.mock(Query.class);
        PositionBound positionBound = (PositionBound) Mockito.mock(PositionBound.class);
        QueryConfig queryConfig = (QueryConfig) Mockito.mock(QueryConfig.class);
        QueryResult queryResult = (QueryResult) Mockito.mock(QueryResult.class);
        Mockito.when(this.inner.query(query, positionBound, queryConfig)).thenReturn(queryResult);
        Mockito.when(Boolean.valueOf(queryConfig.isCollectExecutionInfo())).thenReturn(true);
        MatcherAssert.assertThat(this.store.query(query, positionBound, queryConfig), Matchers.is(queryResult));
        ((QueryResult) Mockito.verify(queryResult)).addExecutionInfo(ArgumentMatchers.anyString());
    }

    @Test
    public void shouldDelegateName() {
        Mockito.when(this.inner.name()).thenReturn(STORE_NAME);
        MatcherAssert.assertThat(this.store.name(), Matchers.is(STORE_NAME));
    }

    @Test
    public void shouldDelegatePersistent() {
        Mockito.when(Boolean.valueOf(this.inner.persistent())).thenReturn(true);
        MatcherAssert.assertThat(Boolean.valueOf(this.store.persistent()), Matchers.is(true));
        Mockito.when(Boolean.valueOf(this.inner.persistent())).thenReturn(false);
        MatcherAssert.assertThat(Boolean.valueOf(this.store.persistent()), Matchers.is(false));
    }

    @Test
    public void shouldDelegateIsOpen() {
        Mockito.when(Boolean.valueOf(this.inner.isOpen())).thenReturn(true);
        MatcherAssert.assertThat(Boolean.valueOf(this.store.isOpen()), Matchers.is(true));
        Mockito.when(Boolean.valueOf(this.inner.isOpen())).thenReturn(false);
        MatcherAssert.assertThat(Boolean.valueOf(this.store.isOpen()), Matchers.is(false));
    }

    @Test
    public void shouldDelegateGetPosition() {
        Position position = (Position) Mockito.mock(Position.class);
        Mockito.when(this.inner.getPosition()).thenReturn(position);
        MatcherAssert.assertThat(this.store.getPosition(), Matchers.is(position));
    }

    private KafkaMetric getMetric(String str) {
        return this.metrics.metric(new MetricName(str, STORE_LEVEL_GROUP, "", this.tags));
    }

    private List<MetricName> storeMetrics() {
        return (List) this.metrics.metrics().keySet().stream().filter(metricName -> {
            return metricName.group().equals(STORE_LEVEL_GROUP) && metricName.tags().equals(this.tags);
        }).collect(Collectors.toList());
    }
}
