package org.apache.storm.streams.processors;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.storm.shade.com.google.common.collect.ArrayListMultimap;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.operations.ValueJoiner;
import org.apache.storm.streams.tuple.Tuple3;

/* loaded from: input_file:org/apache/storm/streams/processors/JoinProcessor.class */
public class JoinProcessor<K, R, V1, V2> extends BaseProcessor<Pair<K, ?>> implements BatchProcessor {
    private final ValueJoiner<V1, V2, R> valueJoiner;
    private final String leftStream;
    private final String rightStream;
    private final List<Pair<K, V1>> leftRows;
    private final List<Pair<K, V2>> rightRows;
    private final JoinType leftType;
    private final JoinType rightType;

    /* loaded from: input_file:org/apache/storm/streams/processors/JoinProcessor$JoinType.class */
    public enum JoinType {
        INNER,
        OUTER
    }

    public JoinProcessor(String str, String str2, ValueJoiner<V1, V2, R> valueJoiner) {
        this(str, str2, valueJoiner, JoinType.INNER, JoinType.INNER);
    }

    public JoinProcessor(String str, String str2, ValueJoiner<V1, V2, R> valueJoiner, JoinType joinType, JoinType joinType2) {
        this.leftRows = new ArrayList();
        this.rightRows = new ArrayList();
        this.valueJoiner = valueJoiner;
        this.leftStream = str;
        this.rightStream = str2;
        this.leftType = joinType;
        this.rightType = joinType2;
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public void execute(Pair<K, ?> pair, String str) {
        K first = pair.getFirst();
        if (str.equals(this.leftStream)) {
            Pair<K, V1> of = Pair.of(first, pair.getSecond());
            this.leftRows.add(of);
            if (this.context.isWindowed()) {
                return;
            }
            joinAndForward(Collections.singletonList(of), this.rightRows);
            return;
        }
        if (str.equals(this.rightStream)) {
            Pair<K, V2> of2 = Pair.of(first, pair.getSecond());
            this.rightRows.add(of2);
            if (this.context.isWindowed()) {
                return;
            }
            joinAndForward(this.leftRows, Collections.singletonList(of2));
        }
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor
    public void finish() {
        joinAndForward(this.leftRows, this.rightRows);
        this.leftRows.clear();
        this.rightRows.clear();
    }

    public String getLeftStream() {
        return this.leftStream;
    }

    public String getRightStream() {
        return this.rightStream;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void joinAndForward(List<Pair<K, V1>> list, List<Pair<K, V2>> list2) {
        if (list.size() < list2.size()) {
            Iterator it = join(getJoinTable(list), list2, this.leftType, this.rightType).iterator();
            while (it.hasNext()) {
                Tuple3 tuple3 = (Tuple3) it.next();
                this.context.forward(Pair.of(tuple3.value1, this.valueJoiner.apply(tuple3.value2, tuple3.value3)));
            }
            return;
        }
        Iterator it2 = join(getJoinTable(list2), list, this.rightType, this.leftType).iterator();
        while (it2.hasNext()) {
            Tuple3 tuple32 = (Tuple3) it2.next();
            this.context.forward(Pair.of(tuple32.value1, this.valueJoiner.apply(tuple32.value3, tuple32.value2)));
        }
    }

    private <T1, T2> List<Tuple3<K, T1, T2>> join(Multimap<K, T1> multimap, List<Pair<K, T2>> list, JoinType joinType, JoinType joinType2) {
        ArrayList arrayList = new ArrayList();
        for (Pair<K, T2> pair : list) {
            Collection<T1> removeAll = multimap.removeAll(pair.getFirst());
            if (!removeAll.isEmpty()) {
                Iterator<T1> it = removeAll.iterator();
                while (it.hasNext()) {
                    arrayList.add(new Tuple3(pair.getFirst(), it.next(), pair.getSecond()));
                }
            } else if (joinType2 == JoinType.OUTER) {
                arrayList.add(new Tuple3(pair.getFirst(), null, pair.getSecond()));
            }
        }
        if (joinType == JoinType.OUTER) {
            for (Map.Entry<K, T1> entry : multimap.entries()) {
                arrayList.add(new Tuple3(entry.getKey(), entry.getValue(), null));
            }
        }
        return arrayList;
    }

    private <T> Multimap<K, T> getJoinTable(List<Pair<K, T>> list) {
        ArrayListMultimap create = ArrayListMultimap.create();
        for (Pair<K, T> pair : list) {
            create.put(pair.getFirst(), pair.getSecond());
        }
        return create;
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void punctuate(String str) {
        super.punctuate(str);
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void init(ProcessorContext processorContext) {
        super.init(processorContext);
    }
}
