package org.apache.storm.trident.spout;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.ITridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.apache.storm.trident.util.TridentUtils;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.RotatingMap;

/* loaded from: input_file:org/apache/storm/trident/spout/RichSpoutBatchExecutor.class */
public class RichSpoutBatchExecutor implements ITridentSpout<Object> {
    public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
    IRichSpout spout;

    /* loaded from: input_file:org/apache/storm/trident/spout/RichSpoutBatchExecutor$CaptureCollector.class */
    static class CaptureCollector implements ISpoutOutputCollector {
        public List<Object> ids;
        public int numEmitted;
        public long pendingCount;
        TridentCollector collector;

        CaptureCollector() {
        }

        public void reset(TridentCollector tridentCollector) {
            this.collector = tridentCollector;
            this.ids = new ArrayList();
        }

        @Override // org.apache.storm.task.IErrorReporter
        public void reportError(Throwable th) {
            this.collector.reportError(th);
        }

        @Override // org.apache.storm.spout.ISpoutOutputCollector
        public List<Integer> emit(String str, List<Object> list, Object obj) {
            if (obj != null) {
                this.ids.add(obj);
            }
            this.numEmitted++;
            this.collector.emit(list);
            return null;
        }

        @Override // org.apache.storm.spout.ISpoutOutputCollector
        public void emitDirect(int i, String str, List<Object> list, Object obj) {
            throw new UnsupportedOperationException("Trident does not support direct streams");
        }

        @Override // org.apache.storm.spout.ISpoutOutputCollector
        public void flush() {
            this.collector.flush();
        }

        @Override // org.apache.storm.spout.ISpoutOutputCollector
        public long getPendingCount() {
            return this.pendingCount;
        }
    }

    /* loaded from: input_file:org/apache/storm/trident/spout/RichSpoutBatchExecutor$RichSpoutCoordinator.class */
    private static class RichSpoutCoordinator implements ITridentSpout.BatchCoordinator<Object> {
        private RichSpoutCoordinator() {
        }

        @Override // org.apache.storm.trident.spout.ITridentSpout.BatchCoordinator
        public Object initializeTransaction(long j, Object obj, Object obj2) {
            return null;
        }

        @Override // org.apache.storm.trident.spout.ITridentSpout.BatchCoordinator
        public void success(long j) {
        }

        @Override // org.apache.storm.trident.spout.ITridentSpout.BatchCoordinator
        public boolean isReady(long j) {
            return true;
        }

        @Override // org.apache.storm.trident.spout.ITridentSpout.BatchCoordinator
        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/storm/trident/spout/RichSpoutBatchExecutor$RichSpoutEmitter.class */
    class RichSpoutEmitter implements ITridentSpout.Emitter<Object> {
        int maxBatchSize;
        CaptureCollector collector;
        RotatingMap<Long, List<Object>> idsMap;
        Map conf;
        TopologyContext context;
        long rotateTime;
        boolean prepared = false;
        long lastRotate = System.currentTimeMillis();

        public RichSpoutEmitter(Map<String, Object> map, TopologyContext topologyContext) {
            this.conf = map;
            this.context = topologyContext;
            Number number = (Number) map.get(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF);
            this.maxBatchSize = (number == null ? 1000 : number).intValue();
            this.collector = new CaptureCollector();
            this.idsMap = new RotatingMap<>(3);
            this.rotateTime = 1000 * ((Number) map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
        }

        @Override // org.apache.storm.trident.spout.ITridentSpout.Emitter
        public void emitBatch(TransactionAttempt transactionAttempt, Object obj, TridentCollector tridentCollector) {
            long longValue = transactionAttempt.getTransactionId().longValue();
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastRotate > this.rotateTime) {
                Iterator<Long> it = this.idsMap.rotate().keySet().iterator();
                while (it.hasNext()) {
                    fail(it.next().longValue());
                }
                this.lastRotate = currentTimeMillis;
            }
            if (this.idsMap.containsKey(Long.valueOf(longValue))) {
                fail(longValue);
            }
            this.collector.reset(tridentCollector);
            if (!this.prepared) {
                RichSpoutBatchExecutor.this.spout.open(this.conf, this.context, new SpoutOutputCollector(this.collector));
                this.prepared = true;
            }
            for (int i = 0; i < this.maxBatchSize; i++) {
                RichSpoutBatchExecutor.this.spout.nextTuple();
                if (this.collector.numEmitted < i) {
                    break;
                }
            }
            this.idsMap.put(Long.valueOf(longValue), this.collector.ids);
            this.collector.pendingCount = this.idsMap.size();
        }

        @Override // org.apache.storm.trident.spout.ITridentSpout.Emitter
        public void success(TransactionAttempt transactionAttempt) {
            ack(transactionAttempt.getTransactionId().longValue());
        }

        private void ack(long j) {
            List<Object> remove = this.idsMap.remove(Long.valueOf(j));
            if (remove != null) {
                Iterator<Object> it = remove.iterator();
                while (it.hasNext()) {
                    RichSpoutBatchExecutor.this.spout.ack(it.next());
                }
            }
        }

        private void fail(long j) {
            List<Object> remove = this.idsMap.remove(Long.valueOf(j));
            if (remove != null) {
                Iterator<Object> it = remove.iterator();
                while (it.hasNext()) {
                    RichSpoutBatchExecutor.this.spout.fail(it.next());
                }
            }
        }

        @Override // org.apache.storm.trident.spout.ITridentSpout.Emitter
        public void close() {
            RichSpoutBatchExecutor.this.spout.close();
        }
    }

    public RichSpoutBatchExecutor(IRichSpout iRichSpout) {
        this.spout = iRichSpout;
    }

    @Override // org.apache.storm.trident.spout.ITridentSpout
    public Map<String, Object> getComponentConfiguration() {
        return this.spout.getComponentConfiguration();
    }

    @Override // org.apache.storm.trident.spout.ITridentSpout
    public Fields getOutputFields() {
        return TridentUtils.getSingleOutputStreamFields(this.spout);
    }

    @Override // org.apache.storm.trident.spout.ITridentSpout
    public ITridentSpout.BatchCoordinator<Object> getCoordinator(String str, Map<String, Object> map, TopologyContext topologyContext) {
        return new RichSpoutCoordinator();
    }

    @Override // org.apache.storm.trident.spout.ITridentSpout
    public ITridentSpout.Emitter<Object> getEmitter(String str, Map<String, Object> map, TopologyContext topologyContext) {
        return new RichSpoutEmitter(map, topologyContext);
    }
}
