package org.apache.storm.trident.spout;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.FailedException;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.ICommitterTridentSpout;
import org.apache.storm.trident.spout.ITridentSpout;
import org.apache.storm.trident.topology.BatchInfo;
import org.apache.storm.trident.topology.ITridentBatchBolt;
import org.apache.storm.trident.topology.MasterBatchCoordinator;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.apache.storm.trident.tuple.ConsList;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/trident/spout/TridentSpoutExecutor.class */
public class TridentSpoutExecutor implements ITridentBatchBolt {
    public static final String ID_FIELD = "$tx";
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) TridentSpoutExecutor.class);
    AddIdCollector collector;
    ITridentSpout<Object> spout;
    ITridentSpout.Emitter<Object> emitter;
    String streamName;
    String txStateId;
    TreeMap<Long, TransactionAttempt> activeBatches = new TreeMap<>();

    /* loaded from: input_file:org/apache/storm/trident/spout/TridentSpoutExecutor$AddIdCollector.class */
    private static class AddIdCollector implements TridentCollector {
        BatchOutputCollector delegate;
        Object id;
        String stream;

        public AddIdCollector(String str, BatchOutputCollector batchOutputCollector) {
            this.delegate = batchOutputCollector;
            this.stream = str;
        }

        public void setBatch(Object obj) {
            this.id = obj;
        }

        @Override // org.apache.storm.trident.operation.TridentCollector
        public void emit(List<Object> list) {
            this.delegate.emit(this.stream, new ConsList(this.id, list));
        }

        @Override // org.apache.storm.trident.operation.TridentCollector
        public void flush() {
            this.delegate.flush();
        }

        @Override // org.apache.storm.trident.operation.TridentCollector
        public void reportError(Throwable th) {
            this.delegate.reportError(th);
        }
    }

    public TridentSpoutExecutor(String str, String str2, ITridentSpout<Object> iTridentSpout) {
        this.txStateId = str;
        this.spout = iTridentSpout;
        this.streamName = str2;
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public void prepare(Map<String, Object> map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector) {
        this.emitter = this.spout.getEmitter(this.txStateId, map, topologyContext);
        this.collector = new AddIdCollector(this.streamName, batchOutputCollector);
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public void execute(BatchInfo batchInfo, Tuple tuple) {
        TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);
        if (tuple.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
            if (!transactionAttempt.equals(this.activeBatches.get(transactionAttempt.getTransactionId()))) {
                throw new FailedException("Received commit for different transaction attempt");
            }
            ((ICommitterTridentSpout.Emitter) this.emitter).commit(transactionAttempt);
            this.activeBatches.remove(transactionAttempt.getTransactionId());
            return;
        }
        if (tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            this.activeBatches.headMap(transactionAttempt.getTransactionId()).clear();
            this.emitter.success(transactionAttempt);
        } else {
            this.collector.setBatch(batchInfo.batchId);
            this.emitter.emitBatch(transactionAttempt, tuple.getValue(1), this.collector);
            this.activeBatches.put(transactionAttempt.getTransactionId(), transactionAttempt);
        }
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public void cleanup() {
        this.emitter.close();
    }

    @Override // org.apache.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        ArrayList arrayList = new ArrayList(this.spout.getOutputFields().toList());
        arrayList.add(0, ID_FIELD);
        outputFieldsDeclarer.declareStream(this.streamName, new Fields(arrayList));
    }

    @Override // org.apache.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return this.spout.getComponentConfiguration();
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public void finishBatch(BatchInfo batchInfo) {
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public Object initBatchState(String str, Object obj) {
        return null;
    }
}
