package org.apache.kafka.streams.kstream.internals.graph;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.class */
public class StreamsGraphTest {
    private Initializer<String> initializer;
    private Aggregator<String, String, String> aggregator;
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
    private final String expectedJoinedTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n";
    private final String expectedJoinedFilteredTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-FILTER-0000000007\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n      --> none\n      <-- KSTREAM-MERGE-0000000006\n\n";
    private final String expectedFullTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-FILTER-0000000007\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000008\n      <-- KSTREAM-MERGE-0000000006\n    Processor: KSTREAM-MAPVALUES-0000000008 (stores: [])\n      --> KSTREAM-SINK-0000000009\n      <-- KSTREAM-FILTER-0000000007\n    Sink: KSTREAM-SINK-0000000009 (topic: output-topic)\n      <-- KSTREAM-MAPVALUES-0000000008\n\n";
    private final String expectedMergeOptimizedTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])\n      --> KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003, KSTREAM-MAPVALUES-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n      --> KSTREAM-MERGE-0000000005\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n      --> KSTREAM-MERGE-0000000005\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-MERGE-0000000005 (stores: [])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-SINK-0000000007\n      <-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004\n    Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n      <-- KSTREAM-MERGE-0000000006\n\n";
    private final String expectedComplexMergeOptimizeTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n      --> KSTREAM-TRANSFORM-0000000001\n    Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink\n      <-- KSTREAM-TRANSFORM-0000000001\n    Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> KTABLE-SUPPRESS-0000000007\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source\n    Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n      --> KSTREAM-PEEK-0000000020\n    Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n      --> KTABLE-TOSTREAM-0000000009\n      <-- KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-PEEK-0000000020 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000021\n      <-- KSTREAM-SOURCE-0000000019\n    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n      --> KSTREAM-FLATMAP-0000000010\n      <-- KTABLE-SUPPRESS-0000000007\n    Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n      --> KSTREAM-MERGE-0000000022\n      <-- KTABLE-TOSTREAM-0000000009\n    Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n      --> KSTREAM-MERGE-0000000022\n      <-- KSTREAM-PEEK-0000000020\n    Processor: KSTREAM-MERGE-0000000022 (stores: [])\n      --> KSTREAM-FILTER-0000000024\n      <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n    Processor: KSTREAM-FILTER-0000000024 (stores: [])\n      --> KSTREAM-SINK-0000000023\n      <-- KSTREAM-MERGE-0000000022\n    Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n      <-- KSTREAM-FILTER-0000000024\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n      --> KSTREAM-FLATMAP-0000000012\n    Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n      <-- KSTREAM-SOURCE-0000000011\n    Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-sink\n      <-- KSTREAM-FLATMAP-0000000012\n    Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n\n  Sub-topology: 3\n    Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n      --> KSTREAM-LEFTJOIN-0000000026\n    Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n      --> KSTREAM-BRANCH-0000000027\n      <-- KSTREAM-SOURCE-0000000025\n    Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n      --> KSTREAM-BRANCH-00000000270, KSTREAM-BRANCH-00000000271\n      <-- KSTREAM-LEFTJOIN-0000000026\n    Processor: KSTREAM-BRANCH-00000000270 (stores: [])\n      --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n      <-- KSTREAM-BRANCH-0000000027\n    Processor: KSTREAM-BRANCH-00000000271 (stores: [])\n      --> KSTREAM-MAP-0000000029\n      <-- KSTREAM-BRANCH-0000000027\n    Processor: KSTREAM-FILTER-0000000033 (stores: [])\n      --> KSTREAM-PEEK-0000000034\n      <-- KSTREAM-BRANCH-00000000270\n    Source: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n      --> KSTREAM-PEEK-0000000013\n    Processor: KSTREAM-MAP-0000000029 (stores: [])\n      --> KSTREAM-PEEK-0000000030\n      <-- KSTREAM-BRANCH-00000000271\n    Processor: KSTREAM-PEEK-0000000034 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000035\n      <-- KSTREAM-FILTER-0000000033\n    Processor: KSTREAM-MAP-0000000037 (stores: [])\n      --> KSTREAM-SINK-0000000038\n      <-- KSTREAM-BRANCH-00000000270\n    Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n      --> KSTREAM-SINK-0000000036\n      <-- KSTREAM-PEEK-0000000034\n    Processor: KSTREAM-PEEK-0000000013 (stores: [])\n      --> KSTREAM-AGGREGATE-0000000015\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-source\n    Processor: KSTREAM-PEEK-0000000030 (stores: [])\n      --> KSTREAM-SINK-0000000031\n      <-- KSTREAM-MAP-0000000029\n    Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n      --> none\n      <-- KSTREAM-PEEK-0000000013\n    Sink: KSTREAM-SINK-0000000031 (topic: external-command)\n      <-- KSTREAM-PEEK-0000000030\n    Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n      <-- KSTREAM-MAPVALUES-0000000035\n    Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n      <-- KSTREAM-MAP-0000000037\n\n";

    @Test
    public void shouldBeAbleToBuildTopologyIncrementally() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream join = streamsBuilder.stream("topic").join(streamsBuilder.stream("other-topic"), (str, str2) -> {
            return str + str2;
        }, JoinWindows.of(Duration.ofMillis(5000L)));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n", streamsBuilder.build().describe().toString());
        KStream filter = join.filter((str3, str4) -> {
            return str4.equals("foo");
        });
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-FILTER-0000000007\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n      --> none\n      <-- KSTREAM-MERGE-0000000006\n\n", streamsBuilder.build().describe().toString());
        filter.mapValues(str5 -> {
            return str5 + "some value";
        }).to("output-topic");
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-FILTER-0000000007\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000008\n      <-- KSTREAM-MERGE-0000000006\n    Processor: KSTREAM-MAPVALUES-0000000008 (stores: [])\n      --> KSTREAM-SINK-0000000009\n      <-- KSTREAM-FILTER-0000000007\n    Sink: KSTREAM-SINK-0000000009 (topic: output-topic)\n      <-- KSTREAM-MAPVALUES-0000000008\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void shouldBeAbleToProcessNestedMultipleKeyChangingNodes() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "test-application");
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("topology.optimization", "all");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream selectKey = streamsBuilder.stream("inputTopic").selectKey((str, str2) -> {
            return str2.substring(0, 5);
        });
        selectKey.groupByKey(Grouped.as("count-repartition")).count(Materialized.as("count-store")).toStream().to("count-topic", Produced.with(Serdes.String(), Serdes.Long()));
        selectKey.groupByKey(Grouped.as("windowed-repartition")).windowedBy(TimeWindows.of(Duration.ofSeconds(5L))).count(Materialized.as("windowed-count-store")).toStream().map((windowed, l) -> {
            return KeyValue.pair(windowed.key(), l);
        }).to("windowed-count", Produced.with(Serdes.String(), Serdes.Long()));
        streamsBuilder.build(properties);
    }

    @Test
    public void shouldNotThrowNPEWithMergeNodes() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "test-application");
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("topology.optimization", "all");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        this.initializer = () -> {
            return "";
        };
        this.aggregator = (str, str2, str3) -> {
            return str3 + str2.length();
        };
        KStream flatMap = streamsBuilder.stream("retryTopic", Consumed.with(Serdes.String(), Serdes.String())).transform(() -> {
            return new Transformer<String, String, KeyValue<String, String>>() { // from class: org.apache.kafka.streams.kstream.internals.graph.StreamsGraphTest.1
                public void init(ProcessorContext processorContext) {
                }

                public KeyValue<String, String> transform(String str4, String str5) {
                    return KeyValue.pair(str4, str5);
                }

                public void close() {
                }
            };
        }, new String[0]).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).aggregate(this.initializer, this.aggregator, Materialized.with(Serdes.String(), Serdes.String())).suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(500L), Suppressed.BufferConfig.maxBytes(64000000L))).toStream().flatMap((str4, str5) -> {
            return new ArrayList();
        });
        streamsBuilder.stream("internal-topic-command", Consumed.with(Serdes.String(), Serdes.String())).peek((str6, str7) -> {
            System.out.println("stdoutput");
        }).mapValues((str8, str9) -> {
            return str9;
        }).merge(flatMap).leftJoin(streamsBuilder.stream("id-table-topic", Consumed.with(Serdes.String(), Serdes.String())).flatMap((str10, str11) -> {
            return new ArrayList();
        }).peek((str12, str13) -> {
            System.out.println("data " + str12 + " " + str13);
        }).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).aggregate(this.initializer, this.aggregator, Materialized.with(Serdes.String(), Serdes.String())), (str14, str15) -> {
            return str14 + str15;
        }, Joined.with(Serdes.String(), Serdes.String(), Serdes.String())).split().branch((str16, str17) -> {
            return str17.equals("some-value");
        }, Branched.withConsumer(kStream -> {
            kStream.map((v0, v1) -> {
                return KeyValue.pair(v0, v1);
            }).peek((str18, str19) -> {
                System.out.println("printing out");
            }).to("external-command", Produced.with(Serdes.String(), Serdes.String()));
        })).defaultBranch(Branched.withConsumer(kStream2 -> {
            kStream2.filter((str18, str19) -> {
                return str19 != null;
            }).peek((str20, str21) -> {
                System.out.println("Printing output");
            }).mapValues((str22, str23) -> {
                return str23;
            }).to("dlq-topic", Produced.with(Serdes.String(), Serdes.String()));
            kStream2.map((v0, v1) -> {
                return KeyValue.pair(v0, v1);
            }).to("retryTopic", Produced.with(Serdes.String(), Serdes.String()));
        }));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n      --> KSTREAM-TRANSFORM-0000000001\n    Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink\n      <-- KSTREAM-TRANSFORM-0000000001\n    Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> KTABLE-SUPPRESS-0000000007\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source\n    Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n      --> KSTREAM-PEEK-0000000020\n    Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n      --> KTABLE-TOSTREAM-0000000009\n      <-- KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-PEEK-0000000020 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000021\n      <-- KSTREAM-SOURCE-0000000019\n    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n      --> KSTREAM-FLATMAP-0000000010\n      <-- KTABLE-SUPPRESS-0000000007\n    Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n      --> KSTREAM-MERGE-0000000022\n      <-- KTABLE-TOSTREAM-0000000009\n    Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n      --> KSTREAM-MERGE-0000000022\n      <-- KSTREAM-PEEK-0000000020\n    Processor: KSTREAM-MERGE-0000000022 (stores: [])\n      --> KSTREAM-FILTER-0000000024\n      <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n    Processor: KSTREAM-FILTER-0000000024 (stores: [])\n      --> KSTREAM-SINK-0000000023\n      <-- KSTREAM-MERGE-0000000022\n    Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n      <-- KSTREAM-FILTER-0000000024\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n      --> KSTREAM-FLATMAP-0000000012\n    Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n      <-- KSTREAM-SOURCE-0000000011\n    Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-sink\n      <-- KSTREAM-FLATMAP-0000000012\n    Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n\n  Sub-topology: 3\n    Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n      --> KSTREAM-LEFTJOIN-0000000026\n    Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n      --> KSTREAM-BRANCH-0000000027\n      <-- KSTREAM-SOURCE-0000000025\n    Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n      --> KSTREAM-BRANCH-00000000270, KSTREAM-BRANCH-00000000271\n      <-- KSTREAM-LEFTJOIN-0000000026\n    Processor: KSTREAM-BRANCH-00000000270 (stores: [])\n      --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n      <-- KSTREAM-BRANCH-0000000027\n    Processor: KSTREAM-BRANCH-00000000271 (stores: [])\n      --> KSTREAM-MAP-0000000029\n      <-- KSTREAM-BRANCH-0000000027\n    Processor: KSTREAM-FILTER-0000000033 (stores: [])\n      --> KSTREAM-PEEK-0000000034\n      <-- KSTREAM-BRANCH-00000000270\n    Source: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n      --> KSTREAM-PEEK-0000000013\n    Processor: KSTREAM-MAP-0000000029 (stores: [])\n      --> KSTREAM-PEEK-0000000030\n      <-- KSTREAM-BRANCH-00000000271\n    Processor: KSTREAM-PEEK-0000000034 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000035\n      <-- KSTREAM-FILTER-0000000033\n    Processor: KSTREAM-MAP-0000000037 (stores: [])\n      --> KSTREAM-SINK-0000000038\n      <-- KSTREAM-BRANCH-00000000270\n    Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n      --> KSTREAM-SINK-0000000036\n      <-- KSTREAM-PEEK-0000000034\n    Processor: KSTREAM-PEEK-0000000013 (stores: [])\n      --> KSTREAM-AGGREGATE-0000000015\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-source\n    Processor: KSTREAM-PEEK-0000000030 (stores: [])\n      --> KSTREAM-SINK-0000000031\n      <-- KSTREAM-MAP-0000000029\n    Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n      --> none\n      <-- KSTREAM-PEEK-0000000013\n    Sink: KSTREAM-SINK-0000000031 (topic: external-command)\n      <-- KSTREAM-PEEK-0000000030\n    Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n      <-- KSTREAM-MAPVALUES-0000000035\n    Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n      <-- KSTREAM-MAP-0000000037\n\n", streamsBuilder.build(properties).describe().toString());
    }

    @Test
    public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
        Assert.assertEquals(getTopologyWithChangingValuesAfterChangingKey("all").describe().toString(), getTopologyWithChangingValuesAfterChangingKey("none").describe().toString());
        Assert.assertEquals(2L, getCountOfRepartitionTopicsFound(r0.describe().toString()));
        Assert.assertEquals(2L, getCountOfRepartitionTopicsFound(r0.describe().toString()));
    }

    @Test
    @Deprecated
    public void shouldNotOptimizeWhenAThroughOperationIsDone() {
        Assert.assertEquals(getTopologyWithThroughOperation("all").describe().toString(), getTopologyWithThroughOperation("none").describe().toString());
        Assert.assertEquals(0L, getCountOfRepartitionTopicsFound(r0.describe().toString()));
        Assert.assertEquals(0L, getCountOfRepartitionTopicsFound(r0.describe().toString()));
    }

    @Test
    public void shouldOptimizeSeveralMergeNodesWithCommonKeyChangingParent() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream selectKey = streamsBuilder.stream("input_topic", Consumed.with(Serdes.Integer(), Serdes.Integer())).selectKey((v0, v1) -> {
            return Integer.sum(v0, v1);
        });
        KStream mapValues = selectKey.mapValues(num -> {
            return Integer.valueOf(num.intValue() + 1);
        });
        KStream mapValues2 = selectKey.mapValues(num2 -> {
            return Integer.valueOf(num2.intValue() + 2);
        });
        mapValues.merge(mapValues2).merge(selectKey.mapValues(num3 -> {
            return Integer.valueOf(num3.intValue() + 3);
        })).to("output_topic");
        Properties properties = new Properties();
        properties.setProperty("topology.optimization", "all");
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])\n      --> KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003, KSTREAM-MAPVALUES-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n      --> KSTREAM-MERGE-0000000005\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n      --> KSTREAM-MERGE-0000000005\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-MERGE-0000000005 (stores: [])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-SINK-0000000007\n      <-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004\n    Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n      <-- KSTREAM-MERGE-0000000006\n\n", streamsBuilder.build(properties).describe().toString());
    }

    @Test
    public void shouldNotOptimizeWhenRepartitionOperationIsDone() {
        Assert.assertEquals(getTopologyWithRepartitionOperation("all").describe().toString(), getTopologyWithRepartitionOperation("none").describe().toString());
        Assert.assertEquals(2L, getCountOfRepartitionTopicsFound(r0.describe().toString()));
        Assert.assertEquals(2L, getCountOfRepartitionTopicsFound(r0.describe().toString()));
    }

    private Topology getTopologyWithChangingValuesAfterChangingKey(String str) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.put("topology.optimization", str);
        KStream selectKey = streamsBuilder.stream("input").selectKey((str2, str3) -> {
            return str2 + str3;
        });
        selectKey.mapValues(str4 -> {
            return str4.toUpperCase(Locale.getDefault());
        }).groupByKey().count().toStream().to("output");
        selectKey.flatMapValues(str5 -> {
            return Arrays.asList(str5.split("\\s"));
        }).groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(5000L))).count().toStream().to("windowed-output");
        return streamsBuilder.build(properties);
    }

    @Deprecated
    private Topology getTopologyWithThroughOperation(String str) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.put("topology.optimization", str);
        KStream through = streamsBuilder.stream("input").selectKey((str2, str3) -> {
            return str2 + str3;
        }).through("through-topic");
        through.groupByKey().count().toStream().to("output");
        through.groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(5000L))).count().toStream().to("windowed-output");
        return streamsBuilder.build(properties);
    }

    private Topology getTopologyWithRepartitionOperation(String str) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.put("topology.optimization", str);
        KStream selectKey = streamsBuilder.stream("input").selectKey((str2, str3) -> {
            return str2 + str3;
        });
        selectKey.repartition().groupByKey().count().toStream().to("output");
        selectKey.repartition().groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(5000L))).count().toStream().to("windowed-output");
        return streamsBuilder.build(properties);
    }

    private int getCountOfRepartitionTopicsFound(String str) {
        Matcher matcher = this.repartitionTopicPattern.matcher(str);
        ArrayList arrayList = new ArrayList();
        while (matcher.find()) {
            arrayList.add(matcher.group());
        }
        return arrayList.size();
    }
}
