package org.apache.storm.executor.spout;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.ICredentialsListener;
import org.apache.storm.daemon.Acker;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.Task;
import org.apache.storm.daemon.metrics.BuiltinMetrics;
import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
import org.apache.storm.daemon.metrics.BuiltinSpoutMetrics;
import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.hooks.info.SpoutAckInfo;
import org.apache.storm.hooks.info.SpoutFailInfo;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.stats.ClientStatsUtil;
import org.apache.storm.stats.SpoutExecutorStats;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/executor/spout/SpoutExecutor.class */
public class SpoutExecutor extends Executor {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SpoutExecutor.class);
    private final IWaitStrategy spoutWaitStrategy;
    private final IWaitStrategy backPressureWaitStrategy;
    private final AtomicBoolean lastActive;
    private final MutableLong emittedCount;
    private final MutableLong emptyEmitStreak;
    private final SpoutThrottlingMetrics spoutThrottlingMetrics;
    private final boolean hasAckers;
    private final SpoutExecutorStats stats;
    private final BuiltinMetrics builtInMetrics;
    SpoutOutputCollectorImpl spoutOutputCollector;
    private Integer maxSpoutPending;
    private List<ISpout> spouts;
    private List<SpoutOutputCollector> outputCollectors;
    private RotatingMap<Long, TupleInfo> pending;
    private long threadId;

    public SpoutExecutor(WorkerState workerState, List<Long> list, Map<String, String> map) {
        super(workerState, list, map, ClientStatsUtil.SPOUT);
        this.threadId = 0L;
        this.spoutWaitStrategy = (IWaitStrategy) ReflectionUtils.newInstance((String) this.topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
        this.spoutWaitStrategy.prepare(this.topoConf, IWaitStrategy.WaitSituation.SPOUT_WAIT);
        this.backPressureWaitStrategy = (IWaitStrategy) ReflectionUtils.newInstance((String) this.topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
        this.backPressureWaitStrategy.prepare(this.topoConf, IWaitStrategy.WaitSituation.BACK_PRESSURE_WAIT);
        this.lastActive = new AtomicBoolean(false);
        this.hasAckers = StormCommon.hasAckers(this.topoConf);
        this.emittedCount = new MutableLong(0L);
        this.emptyEmitStreak = new MutableLong(0L);
        this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
        this.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(getTopoConf()), ObjectReader.getInt(getTopoConf().get(Config.NUM_STAT_BUCKETS)).intValue());
        this.builtInMetrics = new BuiltinSpoutMetrics(this.stats);
    }

    @Override // org.apache.storm.executor.Executor
    public SpoutExecutorStats getStats() {
        return this.stats;
    }

    public void init(final ArrayList<Task> arrayList, final int i) throws InterruptedException {
        this.threadId = Thread.currentThread().getId();
        this.executorTransfer.initLocalRecvQueues();
        this.workerReady.await();
        while (!this.stormActive.get()) {
            Utils.sleepNoSimulation(100L);
        }
        LOG.info("Opening spout {}:{}", this.componentId, this.taskIds);
        this.idToTask = arrayList;
        this.maxSpoutPending = Integer.valueOf(ObjectReader.getInt(this.topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0).intValue() * arrayList.size());
        this.spouts = new ArrayList();
        Iterator<Task> it = arrayList.iterator();
        while (it.hasNext()) {
            Task next = it.next();
            if (next != null) {
                this.spouts.add((ISpout) next.getTaskObject());
            }
        }
        this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() { // from class: org.apache.storm.executor.spout.SpoutExecutor.1
            @Override // org.apache.storm.utils.RotatingMap.ExpiredCallback
            public void expire(Long l, TupleInfo tupleInfo) {
                Long l2 = null;
                if (tupleInfo.getTimestamp() != 0) {
                    l2 = Long.valueOf(Time.deltaMs(tupleInfo.getTimestamp()));
                }
                SpoutExecutor.this.failSpoutMsg(SpoutExecutor.this, (Task) arrayList.get(tupleInfo.getTaskId() - i), l2, tupleInfo, "TIMEOUT");
            }
        });
        this.spoutThrottlingMetrics.registerAll(this.topoConf, arrayList.get(this.taskIds.get(0).intValue() - i).getUserContext());
        this.errorReportingMetrics.registerAll(this.topoConf, arrayList.get(this.taskIds.get(0).intValue() - i).getUserContext());
        this.outputCollectors = new ArrayList();
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Task task = arrayList.get(i2);
            if (task != null) {
                ISpout iSpout = (ISpout) task.getTaskObject();
                this.spoutOutputCollector = new SpoutOutputCollectorImpl(iSpout, this, task, this.emittedCount, this.hasAckers, this.rand, this.hasEventLoggers, this.isDebug, this.pending);
                SpoutOutputCollector spoutOutputCollector = new SpoutOutputCollector(this.spoutOutputCollector);
                this.outputCollectors.add(spoutOutputCollector);
                this.builtInMetrics.registerAll(this.topoConf, task.getUserContext());
                BuiltinMetricsUtil.registerQueueMetrics(ImmutableMap.of("receive", this.receiveQueue), this.topoConf, task.getUserContext());
                if (iSpout instanceof ICredentialsListener) {
                    ((ICredentialsListener) iSpout).setCredentials(this.credentials);
                }
                iSpout.open(this.topoConf, task.getUserContext(), spoutOutputCollector);
            }
        }
        this.openOrPrepareWasCalled.set(true);
        LOG.info("Opened spout {}:{}", this.componentId, this.taskIds);
        setupTicks(true);
        setupMetrics();
    }

    @Override // java.util.concurrent.Callable
    public Callable<Long> call() throws Exception {
        init(this.idToTask, this.idToTaskBase);
        return new Callable<Long>() { // from class: org.apache.storm.executor.spout.SpoutExecutor.2
            final int recvqCheckSkipCountMax;
            int recvqCheckSkips = 0;
            int swIdleCount = 0;
            int bpIdleCount = 0;
            int rmspCount = 0;

            {
                this.recvqCheckSkipCountMax = SpoutExecutor.this.getSpoutRecvqCheckSkipCount();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Long call() throws Exception {
                int i = 0;
                int i2 = this.recvqCheckSkips;
                this.recvqCheckSkips = i2 + 1;
                if (i2 == this.recvqCheckSkipCountMax) {
                    i = SpoutExecutor.this.receiveQueue.consume(SpoutExecutor.this);
                    this.recvqCheckSkips = 0;
                }
                long j = SpoutExecutor.this.emittedCount.get();
                boolean z = SpoutExecutor.this.maxSpoutPending.intValue() != 0 && SpoutExecutor.this.pending.size() >= SpoutExecutor.this.maxSpoutPending.intValue();
                if (!SpoutExecutor.this.stormActive.get()) {
                    SpoutExecutor.this.inactiveExecute();
                    return 0L;
                }
                if (!SpoutExecutor.this.lastActive.get()) {
                    SpoutExecutor.this.lastActive.set(true);
                    SpoutExecutor.this.activateSpouts();
                }
                boolean tryFlushPendingEmits = tryFlushPendingEmits();
                boolean z2 = true;
                long j2 = 0;
                if (!z && tryFlushPendingEmits) {
                    for (int i3 = 0; i3 < SpoutExecutor.this.spouts.size(); i3++) {
                        ((ISpout) SpoutExecutor.this.spouts.get(i3)).nextTuple();
                    }
                    z2 = j == SpoutExecutor.this.emittedCount.get();
                    if (z2) {
                        SpoutExecutor.this.emptyEmitStreak.increment();
                    } else {
                        j2 = SpoutExecutor.this.emptyEmitStreak.get();
                        SpoutExecutor.this.emptyEmitStreak.set(0L);
                    }
                }
                if (z) {
                    if (this.rmspCount == 0) {
                        SpoutExecutor.LOG.debug("Reached max spout pending");
                    }
                    this.rmspCount++;
                } else {
                    if (this.rmspCount > 0) {
                        SpoutExecutor.LOG.debug("Ended max spout pending stretch of {} iterations", Integer.valueOf(this.rmspCount));
                    }
                    this.rmspCount = 0;
                }
                if (i > 1) {
                    return 0L;
                }
                if (!SpoutExecutor.this.pendingEmits.isEmpty()) {
                    backPressureWaitStrategy();
                    return 0L;
                }
                this.bpIdleCount = 0;
                if (z2) {
                    spoutWaitStrategy(z, j2);
                    return 0L;
                }
                this.swIdleCount = 0;
                return 0L;
            }

            private void backPressureWaitStrategy() throws InterruptedException {
                long currentTimeMillis = Time.currentTimeMillis();
                if (this.bpIdleCount == 0) {
                    SpoutExecutor.LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
                }
                this.bpIdleCount = SpoutExecutor.this.backPressureWaitStrategy.idle(this.bpIdleCount);
                SpoutExecutor.this.spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - currentTimeMillis);
            }

            private void spoutWaitStrategy(boolean z, long j) throws InterruptedException {
                SpoutExecutor.this.emptyEmitStreak.increment();
                long currentTimeMillis = Time.currentTimeMillis();
                this.swIdleCount = SpoutExecutor.this.spoutWaitStrategy.idle(this.swIdleCount);
                if (z) {
                    SpoutExecutor.this.spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - currentTimeMillis);
                } else if (j > 0) {
                    SpoutExecutor.LOG.debug("Ending Spout Wait Stretch of {}", Long.valueOf(j));
                }
            }

            private boolean tryFlushPendingEmits() {
                Object peek = SpoutExecutor.this.pendingEmits.peek();
                while (true) {
                    AddressedTuple addressedTuple = (AddressedTuple) peek;
                    if (addressedTuple == null) {
                        return true;
                    }
                    if (!SpoutExecutor.this.executorTransfer.tryTransfer(addressedTuple, null)) {
                        return false;
                    }
                    SpoutExecutor.this.pendingEmits.poll();
                    peek = SpoutExecutor.this.pendingEmits.peek();
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void activateSpouts() {
        LOG.info("Activating spout {}:{}", this.componentId, this.taskIds);
        Iterator<ISpout> it = this.spouts.iterator();
        while (it.hasNext()) {
            it.next().activate();
        }
    }

    private void deactivateSpouts() {
        LOG.info("Deactivating spout {}:{}", this.componentId, this.taskIds);
        Iterator<ISpout> it = this.spouts.iterator();
        while (it.hasNext()) {
            it.next().deactivate();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void inactiveExecute() throws InterruptedException {
        if (this.lastActive.get()) {
            this.lastActive.set(false);
            deactivateSpouts();
        }
        long currentTimeMillis = Time.currentTimeMillis();
        Time.sleep(100L);
        this.spoutThrottlingMetrics.skippedInactiveMs(Time.currentTimeMillis() - currentTimeMillis);
    }

    @Override // org.apache.storm.executor.Executor
    public void tupleActionFn(int i, TupleImpl tupleImpl) throws Exception {
        String sourceStreamId = tupleImpl.getSourceStreamId();
        if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(sourceStreamId)) {
            this.spoutOutputCollector.flush();
            return;
        }
        if (sourceStreamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
            this.pending.rotate();
            return;
        }
        if (sourceStreamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
            metricsTick(this.idToTask.get(i - this.idToTaskBase), tupleImpl);
            return;
        }
        if (sourceStreamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
            Object taskObject = this.idToTask.get(i - this.idToTaskBase).getTaskObject();
            if (taskObject instanceof ICredentialsListener) {
                ((ICredentialsListener) taskObject).setCredentials((Map) tupleImpl.getValue(0));
                return;
            }
            return;
        }
        if (sourceStreamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
            Long l = (Long) tupleImpl.getValue(0);
            TupleInfo tupleInfo = this.pending.get(l);
            if (tupleInfo != null) {
                this.pending.put(l, tupleInfo);
                return;
            }
            return;
        }
        Long l2 = (Long) tupleImpl.getValue(0);
        Long l3 = (Long) tupleImpl.getValue(1);
        TupleInfo remove = this.pending.remove(l2);
        if (remove == null || remove.getMessageId() == null) {
            return;
        }
        if (i != remove.getTaskId()) {
            throw new RuntimeException("Fatal error, mismatched task ids: " + i + " " + remove.getTaskId());
        }
        Long l4 = null;
        if (this.hasAckers && remove.getTimestamp() != 0) {
            l4 = l3;
        }
        if (sourceStreamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
            ackSpoutMsg(this, this.idToTask.get(i - this.idToTaskBase), l4, remove);
        } else if (sourceStreamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
            failSpoutMsg(this, this.idToTask.get(i - this.idToTaskBase), l4, remove, "FAIL-STREAM");
        }
    }

    public void ackSpoutMsg(SpoutExecutor spoutExecutor, Task task, Long l, TupleInfo tupleInfo) {
        try {
            ISpout iSpout = (ISpout) task.getTaskObject();
            int intValue = task.getTaskId().intValue();
            if (spoutExecutor.getIsDebug().booleanValue()) {
                LOG.info("SPOUT Acking message {} {}", Long.valueOf(tupleInfo.getRootId()), tupleInfo.getMessageId());
            }
            iSpout.ack(tupleInfo.getMessageId());
            if (!task.getUserContext().getHooks().isEmpty()) {
                new SpoutAckInfo(tupleInfo.getMessageId(), intValue, l).applyOn(task.getUserContext());
            }
            if (this.hasAckers && l != null) {
                spoutExecutor.getStats().spoutAckedTuple(tupleInfo.getStream(), l.longValue(), task.getTaskMetrics().getAcked(tupleInfo.getStream()));
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public void failSpoutMsg(SpoutExecutor spoutExecutor, Task task, Long l, TupleInfo tupleInfo, String str) {
        try {
            ISpout iSpout = (ISpout) task.getTaskObject();
            int intValue = task.getTaskId().intValue();
            if (spoutExecutor.getIsDebug().booleanValue()) {
                LOG.info("SPOUT Failing {} : {} REASON: {}", Long.valueOf(tupleInfo.getRootId()), tupleInfo, str);
            }
            iSpout.fail(tupleInfo.getMessageId());
            new SpoutFailInfo(tupleInfo.getMessageId(), intValue, l).applyOn(task.getUserContext());
            if (l != null) {
                spoutExecutor.getStats().spoutFailedTuple(tupleInfo.getStream(), l.longValue(), task.getTaskMetrics().getFailed(tupleInfo.getStream()));
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public int getSpoutRecvqCheckSkipCount() {
        if (this.ackingEnabled) {
            return 0;
        }
        return ObjectReader.getInt(this.conf.get(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS), 0).intValue();
    }

    public long getThreadId() {
        return this.threadId;
    }
}
