package org.apache.storm.metric;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.metric.util.DataPointExpander;
import org.apache.storm.shade.com.google.common.base.Predicate;
import org.apache.storm.shade.com.google.common.collect.Iterables;
import org.apache.storm.shade.com.google.common.collect.Lists;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/metric/MetricsConsumerBolt.class */
public class MetricsConsumerBolt implements IBolt {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) MetricsConsumerBolt.class);
    private final int maxRetainMetricTuples;
    private final Predicate<IMetricsConsumer.DataPoint> filterPredicate;
    private final DataPointExpander expander;
    private final BlockingQueue<MetricsTask> taskQueue;
    IMetricsConsumer metricsConsumer;
    String consumerClassName;
    OutputCollector collector;
    Object registrationArgument;
    private Thread taskExecuteThread;
    private volatile boolean running = true;

    /* loaded from: input_file:org/apache/storm/metric/MetricsConsumerBolt$MetricsHandlerRunnable.class */
    class MetricsHandlerRunnable implements Runnable {
        MetricsHandlerRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (MetricsConsumerBolt.this.running) {
                try {
                    MetricsTask metricsTask = (MetricsTask) MetricsConsumerBolt.this.taskQueue.take();
                    MetricsConsumerBolt.this.metricsConsumer.handleDataPoints(metricsTask.getTaskInfo(), metricsTask.getDataPoints());
                } catch (InterruptedException e) {
                    return;
                } catch (Throwable th) {
                    MetricsConsumerBolt.LOG.error("Exception occurred during handle metrics", th);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/storm/metric/MetricsConsumerBolt$MetricsTask.class */
    static class MetricsTask {
        private IMetricsConsumer.TaskInfo taskInfo;
        private Collection<IMetricsConsumer.DataPoint> dataPoints;

        public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> collection) {
            this.taskInfo = taskInfo;
            this.dataPoints = collection;
        }

        public IMetricsConsumer.TaskInfo getTaskInfo() {
            return this.taskInfo;
        }

        public Collection<IMetricsConsumer.DataPoint> getDataPoints() {
            return this.dataPoints;
        }
    }

    public MetricsConsumerBolt(String str, Object obj, int i, Predicate<IMetricsConsumer.DataPoint> predicate, DataPointExpander dataPointExpander) {
        this.consumerClassName = str;
        this.registrationArgument = obj;
        this.maxRetainMetricTuples = i;
        this.filterPredicate = predicate;
        this.expander = dataPointExpander;
        if (this.maxRetainMetricTuples > 0) {
            this.taskQueue = new LinkedBlockingDeque(this.maxRetainMetricTuples);
        } else {
            this.taskQueue = new LinkedBlockingDeque();
        }
    }

    @Override // org.apache.storm.task.IBolt
    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        try {
            this.metricsConsumer = (IMetricsConsumer) Class.forName(this.consumerClassName).newInstance();
            this.metricsConsumer.prepare(map, this.registrationArgument, topologyContext, outputCollector);
            this.collector = outputCollector;
            this.taskExecuteThread = new Thread(new MetricsHandlerRunnable());
            this.taskExecuteThread.setDaemon(true);
            this.taskExecuteThread.start();
        } catch (Exception e) {
            throw new RuntimeException("Could not instantiate a class listed in config under section topology.metrics.consumer.register with fully qualified name " + this.consumerClassName, e);
        }
    }

    @Override // org.apache.storm.task.IBolt
    public void execute(Tuple tuple) {
        MetricsTask metricsTask = new MetricsTask((IMetricsConsumer.TaskInfo) tuple.getValue(0), getFilteredDataPoints(this.expander.expandDataPoints((Collection) tuple.getValue(1))));
        while (!this.taskQueue.offer(metricsTask)) {
            this.taskQueue.poll();
        }
        this.collector.ack(tuple);
    }

    private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> collection) {
        return Lists.newArrayList(Iterables.filter(collection, this.filterPredicate));
    }

    @Override // org.apache.storm.task.IBolt
    public void cleanup() {
        this.running = false;
        this.metricsConsumer.cleanup();
        this.taskExecuteThread.interrupt();
    }
}
