package org.apache.storm.trident.spout;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.fs.shell.Count;
import org.apache.storm.Config;
import org.apache.storm.drpc.PrepareRequest;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.trident.topology.TridentBoltExecutor;
import org.apache.storm.trident.tuple.ConsList;
import org.apache.storm.trident.util.TridentUtils;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

/* loaded from: input_file:org/apache/storm/trident/spout/RichSpoutBatchTriggerer.class */
public class RichSpoutBatchTriggerer implements IRichSpout {
    String stream;
    IRichSpout delegate;
    List<Integer> outputTasks;
    Random rand;
    String coordStream;
    Map<Long, Long> msgIdToBatchId = new HashMap();
    Map<Long, FinishCondition> finishConditions = new HashMap();

    /* loaded from: input_file:org/apache/storm/trident/spout/RichSpoutBatchTriggerer$FinishCondition.class */
    static class FinishCondition {
        Set<Long> vals = new HashSet();
        Object msgId;

        FinishCondition() {
        }
    }

    /* loaded from: input_file:org/apache/storm/trident/spout/RichSpoutBatchTriggerer$StreamOverrideCollector.class */
    class StreamOverrideCollector implements ISpoutOutputCollector {
        SpoutOutputCollector collector;

        public StreamOverrideCollector(SpoutOutputCollector spoutOutputCollector) {
            this.collector = spoutOutputCollector;
        }

        @Override // org.apache.storm.spout.ISpoutOutputCollector
        public List<Integer> emit(String str, List<Object> list, Object obj) {
            long nextLong = RichSpoutBatchTriggerer.this.rand.nextLong();
            RichSpoutBatchId richSpoutBatchId = new RichSpoutBatchId(nextLong);
            FinishCondition finishCondition = new FinishCondition();
            finishCondition.msgId = obj;
            List<Integer> emit = this.collector.emit(RichSpoutBatchTriggerer.this.stream, new ConsList(richSpoutBatchId, list));
            HashSet hashSet = new HashSet(emit);
            for (Integer num : RichSpoutBatchTriggerer.this.outputTasks) {
                int i = 0;
                if (hashSet.contains(num)) {
                    i = 1;
                }
                long nextLong2 = RichSpoutBatchTriggerer.this.rand.nextLong();
                this.collector.emitDirect(num.intValue(), RichSpoutBatchTriggerer.this.coordStream, new Values(richSpoutBatchId, Integer.valueOf(i)), Long.valueOf(nextLong2));
                finishCondition.vals.add(Long.valueOf(nextLong2));
                RichSpoutBatchTriggerer.this.msgIdToBatchId.put(Long.valueOf(nextLong2), Long.valueOf(nextLong));
            }
            RichSpoutBatchTriggerer.this.finishConditions.put(Long.valueOf(nextLong), finishCondition);
            return emit;
        }

        @Override // org.apache.storm.spout.ISpoutOutputCollector
        public void emitDirect(int i, String str, List<Object> list, Object obj) {
            throw new RuntimeException("Trident does not support direct emits from spouts");
        }

        @Override // org.apache.storm.spout.ISpoutOutputCollector
        public void flush() {
            this.collector.flush();
        }

        @Override // org.apache.storm.task.IErrorReporter
        public void reportError(Throwable th) {
            this.collector.reportError(th);
        }

        @Override // org.apache.storm.spout.ISpoutOutputCollector
        public long getPendingCount() {
            return this.collector.getPendingCount();
        }
    }

    public RichSpoutBatchTriggerer(IRichSpout iRichSpout, String str, String str2) {
        this.delegate = iRichSpout;
        this.stream = str;
        this.coordStream = TridentBoltExecutor.coordStream(str2);
    }

    @Override // org.apache.storm.spout.ISpout
    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.delegate.open(map, topologyContext, new SpoutOutputCollector(new StreamOverrideCollector(spoutOutputCollector)));
        this.outputTasks = new ArrayList();
        Iterator it = ((Map) Utils.get(topologyContext.getThisTargets(), this.coordStream, new HashMap())).keySet().iterator();
        while (it.hasNext()) {
            this.outputTasks.addAll(topologyContext.getComponentTasks((String) it.next()));
        }
        this.rand = new Random(Utils.secureRandomLong());
    }

    @Override // org.apache.storm.spout.ISpout
    public void close() {
        this.delegate.close();
    }

    @Override // org.apache.storm.spout.ISpout
    public void activate() {
        this.delegate.activate();
    }

    @Override // org.apache.storm.spout.ISpout
    public void deactivate() {
        this.delegate.deactivate();
    }

    @Override // org.apache.storm.spout.ISpout
    public void nextTuple() {
        this.delegate.nextTuple();
    }

    @Override // org.apache.storm.spout.ISpout
    public void ack(Object obj) {
        Long remove = this.msgIdToBatchId.remove((Long) obj);
        FinishCondition finishCondition = this.finishConditions.get(remove);
        if (finishCondition != null) {
            finishCondition.vals.remove((Long) obj);
            if (finishCondition.vals.isEmpty()) {
                this.finishConditions.remove(remove);
                this.delegate.ack(finishCondition.msgId);
            }
        }
    }

    @Override // org.apache.storm.spout.ISpout
    public void fail(Object obj) {
        FinishCondition remove = this.finishConditions.remove(this.msgIdToBatchId.remove((Long) obj));
        if (remove != null) {
            this.delegate.fail(remove.msgId);
        }
    }

    @Override // org.apache.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(this.stream, TridentUtils.fieldsConcat(new Fields("$id$"), TridentUtils.getSingleOutputStreamFields(this.delegate)));
        outputFieldsDeclarer.declareStream(this.coordStream, true, new Fields(PrepareRequest.ID_STREAM, Count.NAME));
    }

    @Override // org.apache.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> componentConfiguration = this.delegate.getComponentConfiguration();
        HashMap hashMap = componentConfiguration == null ? new HashMap() : new HashMap(componentConfiguration);
        Config.registerSerialization(hashMap, RichSpoutBatchId.class, RichSpoutBatchIdSerializer.class);
        return hashMap;
    }
}
