package org.apache.storm.scheduler.resource;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.storm.DaemonConfig;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.SingleTopologyCluster;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
import org.apache.storm.scheduler.utils.IConfigLoader;
import org.apache.storm.shade.com.google.common.collect.ImmutableList;
import org.apache.storm.utils.DisallowedStrategyException;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/scheduler/resource/ResourceAwareScheduler.class */
public class ResourceAwareScheduler implements IScheduler {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ResourceAwareScheduler.class);
    private Map<String, Object> conf;
    private ISchedulingPriorityStrategy schedulingPriorityStrategy;
    private IConfigLoader configLoader;
    private int maxSchedulingAttempts;
    private int schedulingTimeoutSeconds;
    private ExecutorService backgroundScheduling;

    private static void markFailedTopology(User user, Cluster cluster, TopologyDetails topologyDetails, String str) {
        markFailedTopology(user, cluster, topologyDetails, str, null);
    }

    private static void markFailedTopology(User user, Cluster cluster, TopologyDetails topologyDetails, String str, Throwable th) {
        cluster.setStatus(topologyDetails, str);
        String str2 = topologyDetails.getId() + " " + str;
        if (th != null) {
            LOG.error(str2, th);
        } else {
            LOG.error(str2);
        }
        user.markTopoUnsuccess(topologyDetails);
    }

    private static double getCpuUsed(SchedulerAssignment schedulerAssignment) {
        return schedulerAssignment.getScheduledResources().values().stream().mapToDouble(workerResources -> {
            return workerResources.get_cpu();
        }).sum();
    }

    private static double getMemoryUsed(SchedulerAssignment schedulerAssignment) {
        return schedulerAssignment.getScheduledResources().values().stream().mapToDouble(workerResources -> {
            return workerResources.get_mem_on_heap() + workerResources.get_mem_off_heap();
        }).sum();
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public void prepare(Map<String, Object> map) {
        this.conf = map;
        this.schedulingPriorityStrategy = (ISchedulingPriorityStrategy) ReflectionUtils.newInstance((String) map.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
        this.configLoader = ConfigLoaderFactoryService.createConfigLoader(map);
        this.maxSchedulingAttempts = ObjectReader.getInt(map.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS), 5).intValue();
        this.schedulingTimeoutSeconds = ObjectReader.getInt(map.get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 60).intValue();
        this.backgroundScheduling = Executors.newFixedThreadPool(1);
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public void cleanup() {
        LOG.info("Cleanup ResourceAwareScheduler scheduler");
        this.backgroundScheduling.shutdown();
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public Map<String, Map<String, Double>> config() {
        return getUserResourcePools();
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public void schedule(Topologies topologies, Cluster cluster) {
        Map<String, User> users = getUsers(cluster);
        ArrayList arrayList = new ArrayList(this.schedulingPriorityStrategy.getOrderedTopologies(cluster, users));
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ordered list of topologies is: {}", arrayList.stream().map(topologyDetails -> {
                return topologyDetails.getId();
            }).collect(Collectors.toList()));
        }
        for (TopologyDetails topologyDetails2 : arrayList) {
            if (cluster.needsSchedulingRas(topologyDetails2)) {
                scheduleTopology(topologyDetails2, cluster, users.get(topologyDetails2.getTopologySubmitter()), arrayList);
            } else {
                cluster.setStatusIfAbsent(topologyDetails2.getId(), "Fully Scheduled");
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void scheduleTopology(TopologyDetails topologyDetails, Cluster cluster, User user, List<TopologyDetails> list) {
        Cluster cluster2 = new Cluster(cluster);
        RAS_Nodes rAS_Nodes = new RAS_Nodes(cluster2);
        String str = (String) topologyDetails.getConf().get("topology.scheduler.strategy");
        try {
            String str2 = (String) topologyDetails.getConf().get("topology.scheduler.strategy");
            if (str2.startsWith("backtype.storm")) {
                str2 = str2.replace("backtype.storm", "org.apache.storm");
                LOG.debug("Replaced backtype.storm with org.apache.storm for Config.TOPOLOGY_SCHEDULER_STRATEGY");
            }
            IStrategy iStrategy = (IStrategy) ReflectionUtils.newSchedulerStrategyInstance(str2, this.conf);
            iStrategy.prepare(this.conf);
            for (int i = 0; i < this.maxSchedulingAttempts; i++) {
                SingleTopologyCluster singleTopologyCluster = new SingleTopologyCluster(cluster2, topologyDetails.getId());
                try {
                    Future submit = this.backgroundScheduling.submit(() -> {
                        return iStrategy.schedule(singleTopologyCluster, topologyDetails);
                    });
                    try {
                        SchedulingResult schedulingResult = (SchedulingResult) submit.get(this.schedulingTimeoutSeconds, TimeUnit.SECONDS);
                        LOG.debug("scheduling result: {}", schedulingResult);
                        if (schedulingResult == null) {
                            markFailedTopology(user, cluster, topologyDetails, "Internal scheduler error");
                            return;
                        }
                        if (schedulingResult.isSuccess()) {
                            cluster.updateFrom(singleTopologyCluster);
                            cluster.setStatus(topologyDetails.getId(), "Running - " + schedulingResult.getMessage());
                            return;
                        }
                        if (schedulingResult.getStatus() != SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
                            user.markTopoUnsuccess(topologyDetails, cluster);
                            return;
                        }
                        LOG.debug("Not enough resources to schedule {}", topologyDetails.getName());
                        ImmutableList reverse = ImmutableList.copyOf((Collection) list).reverse();
                        boolean z = false;
                        LOG.debug("attempting to make space for topo {} from user {}", topologyDetails.getName(), topologyDetails.getTopologySubmitter());
                        int indexOf = reverse.indexOf(topologyDetails);
                        double totalRequestedCpu = topologyDetails.getTotalRequestedCpu();
                        double totalRequestedMemOffHeap = topologyDetails.getTotalRequestedMemOffHeap() + topologyDetails.getTotalRequestedMemOnHeap();
                        SchedulerAssignment assignmentById = cluster.getAssignmentById(topologyDetails.getId());
                        if (assignmentById != null) {
                            totalRequestedCpu -= getCpuUsed(assignmentById);
                            totalRequestedMemOffHeap -= getMemoryUsed(assignmentById);
                        }
                        for (int i2 = 0; i2 < indexOf; i2++) {
                            TopologyDetails topologyDetails2 = (TopologyDetails) reverse.get(i2);
                            SchedulerAssignment assignmentById2 = cluster2.getAssignmentById(topologyDetails2.getId());
                            if (assignmentById2 != null && !assignmentById2.getSlots().isEmpty()) {
                                Collection<WorkerSlot> usedSlotsByTopologyId = cluster2.getUsedSlotsByTopologyId(topologyDetails2.getId());
                                LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyDetails2.getName(), usedSlotsByTopologyId, topologyDetails2.getTopologySubmitter());
                                totalRequestedCpu -= getCpuUsed(assignmentById2);
                                totalRequestedMemOffHeap -= getMemoryUsed(assignmentById2);
                                z = true;
                                rAS_Nodes.freeSlots(usedSlotsByTopologyId);
                                if (totalRequestedCpu <= CMAESOptimizer.DEFAULT_STOPFITNESS && totalRequestedMemOffHeap <= CMAESOptimizer.DEFAULT_STOPFITNESS) {
                                    break;
                                }
                            }
                        }
                        if (!z) {
                            StringBuilder sb = new StringBuilder();
                            sb.append("Not enough resources to schedule ");
                            if (totalRequestedMemOffHeap > CMAESOptimizer.DEFAULT_STOPFITNESS || totalRequestedCpu > CMAESOptimizer.DEFAULT_STOPFITNESS) {
                                if (totalRequestedMemOffHeap > CMAESOptimizer.DEFAULT_STOPFITNESS) {
                                    sb.append(totalRequestedMemOffHeap).append(" MB ");
                                }
                                if (totalRequestedCpu > CMAESOptimizer.DEFAULT_STOPFITNESS) {
                                    sb.append(totalRequestedCpu).append("% CPU ");
                                }
                                sb.append("needed even after evicting lower priority topologies. ");
                            }
                            sb.append(schedulingResult.getErrorMessage());
                            markFailedTopology(user, cluster, topologyDetails, sb.toString());
                            return;
                        }
                    } catch (TimeoutException e) {
                        markFailedTopology(user, cluster, topologyDetails, "Scheduling took too long for " + topologyDetails.getId() + " using strategy " + iStrategy.getClass().getName() + " timeout after " + this.schedulingTimeoutSeconds + " seconds using config " + DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY + ".");
                        submit.cancel(true);
                        return;
                    }
                } catch (Exception e2) {
                    markFailedTopology(user, cluster, topologyDetails, "Internal Error - Exception thrown when scheduling. Please check logs for details", e2);
                    return;
                }
            }
            markFailedTopology(user, cluster, topologyDetails, "Failed to schedule within " + this.maxSchedulingAttempts + " attempts");
        } catch (DisallowedStrategyException e3) {
            markFailedTopology(user, cluster, topologyDetails, "Unsuccessful in scheduling - " + e3.getAttemptedClass() + " is not an allowed strategy. Please make sure your topology.scheduler.strategy config is one of the allowed strategies: " + e3.getAllowedStrategies(), e3);
        } catch (RuntimeException e4) {
            markFailedTopology(user, cluster, topologyDetails, "Unsuccessful in scheduling - failed to create instance of topology strategy " + str + ". Please check logs for details", e4);
        }
    }

    private Map<String, User> getUsers(Cluster cluster) {
        HashMap hashMap = new HashMap();
        Map<String, Map<String, Double>> userResourcePools = getUserResourcePools();
        LOG.debug("userResourcePools: {}", userResourcePools);
        Iterator<TopologyDetails> it = cluster.getTopologies().iterator();
        while (it.hasNext()) {
            TopologyDetails next = it.next();
            String topologySubmitter = next.getTopologySubmitter();
            if (topologySubmitter == null || topologySubmitter.equals("")) {
                LOG.error("Cannot determine user for topology {}.  Will skip scheduling this topology", next.getName());
            } else if (!hashMap.containsKey(topologySubmitter)) {
                hashMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
            }
        }
        return hashMap;
    }

    private Map<String, Map<String, Double>> convertToDouble(Map<String, Map<String, Number>> map) {
        HashMap hashMap = new HashMap();
        if (map != null) {
            for (Map.Entry<String, Map<String, Number>> entry : map.entrySet()) {
                String key = entry.getKey();
                hashMap.put(key, new HashMap());
                for (Map.Entry<String, Number> entry2 : entry.getValue().entrySet()) {
                    ((Map) hashMap.get(key)).put(entry2.getKey(), Double.valueOf(entry2.getValue().doubleValue()));
                }
            }
        }
        return hashMap;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Map<String, Map<String, Double>> getUserResourcePools() {
        if (this.configLoader != null) {
            Map<?, ?> load = this.configLoader.load(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
            if (load != null) {
                return convertToDouble(load);
            }
            LOG.warn("Config loader returned null. Will try to read from user-resource-pools.yaml");
        }
        Map map = (Map) Utils.findAndReadConfigFile("user-resource-pools.yaml", false).get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
        if (map != null) {
            return convertToDouble(map);
        }
        LOG.warn("Reading from user-resource-pools.yaml returned null. This could because the file is not available. Will load configs from storm configuration");
        return convertToDouble((Map) this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS));
    }
}
