package org.apache.storm.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.generated.SharedMemory;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.Utils;
import org.joni.constants.StackType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/scheduler/Cluster.class */
public class Cluster implements ISchedulingState {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Cluster.class);
    private final Map<String, SupervisorDetails> supervisors;
    private final Map<String, List<String>> networkTopography;
    private final Map<String, SchedulerAssignmentImpl> assignments;
    private final Map<String, String> status;
    private final Map<String, List<String>> hostToId;
    private final Map<String, Object> conf;
    private final Topologies topologies;
    private final Map<String, Map<WorkerSlot, NormalizedResourceRequest>> nodeToScheduledResourcesCache;
    private final Map<String, Map<String, Double>> nodeToScheduledOffHeapNodeMemoryCache;
    private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
    private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache;
    private final ResourceMetrics resourceMetrics;
    private SchedulerAssignmentImpl assignment;
    private Set<String> blackListedHosts;
    private INimbus inimbus;
    private double minWorkerCpu;

    private static <K, V> Map<K, V> makeMap(String str) {
        return new HashMap();
    }

    private static <K> Set<K> makeSet(String str) {
        return new HashSet();
    }

    public Cluster(INimbus iNimbus, ResourceMetrics resourceMetrics, Map<String, SupervisorDetails> map, Map<String, ? extends SchedulerAssignment> map2, Topologies topologies, Map<String, Object> map3) {
        this(iNimbus, resourceMetrics, map, map2, topologies, map3, null, null, null);
    }

    public Cluster(Cluster cluster) {
        this(cluster.inimbus, cluster.resourceMetrics, cluster.supervisors, cluster.assignments, cluster.topologies, new HashMap(cluster.conf), cluster.status, cluster.blackListedHosts, cluster.networkTopography);
    }

    @VisibleForTesting
    public Cluster(Cluster cluster, Topologies topologies) {
        this(cluster.inimbus, cluster.resourceMetrics, cluster.supervisors, cluster.assignments, topologies, new HashMap(cluster.conf), cluster.status, cluster.blackListedHosts, cluster.networkTopography);
    }

    private Cluster(INimbus iNimbus, ResourceMetrics resourceMetrics, Map<String, SupervisorDetails> map, Map<String, ? extends SchedulerAssignment> map2, Topologies topologies, Map<String, Object> map3, Map<String, String> map4, Set<String> set, Map<String, List<String>> map5) {
        this.supervisors = new HashMap();
        this.networkTopography = new HashMap();
        this.assignments = new HashMap();
        this.status = new HashMap();
        this.hostToId = new HashMap();
        this.totalResourcesPerNodeCache = new HashMap();
        this.blackListedHosts = new HashSet();
        this.minWorkerCpu = CMAESOptimizer.DEFAULT_STOPFITNESS;
        this.inimbus = iNimbus;
        this.resourceMetrics = resourceMetrics;
        this.supervisors.putAll(map);
        this.nodeToScheduledResourcesCache = new HashMap(this.supervisors.size());
        this.nodeToScheduledOffHeapNodeMemoryCache = new HashMap();
        this.nodeToUsedSlotsCache = new HashMap(this.supervisors.size());
        for (Map.Entry<String, SupervisorDetails> entry : map.entrySet()) {
            this.hostToId.computeIfAbsent(entry.getValue().getHost(), str -> {
                return new ArrayList();
            }).add(entry.getKey());
        }
        this.conf = map3;
        this.topologies = topologies;
        this.minWorkerCpu = ObjectReader.getDouble(map3.get(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS)).doubleValue();
        ArrayList arrayList = new ArrayList();
        Iterator<SupervisorDetails> it = map.values().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getHost());
        }
        if (map5 == null || map5.isEmpty()) {
            String str2 = (String) map3.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN);
            for (Map.Entry<String, String> entry2 : ((DNSToSwitchMapping) ReflectionUtils.newInstance((str2 == null || str2.isEmpty()) ? DefaultRackDNSToSwitchMapping.class.getName() : str2)).resolve(arrayList).entrySet()) {
                this.networkTopography.computeIfAbsent(entry2.getValue(), str3 -> {
                    return new ArrayList();
                }).add(entry2.getKey());
            }
        } else {
            this.networkTopography.putAll(map5);
        }
        if (map4 != null) {
            this.status.putAll(map4);
        }
        if (set != null) {
            this.blackListedHosts.addAll(set);
        }
        setAssignments(map2, true);
    }

    public static double getAssignedMemoryForSlot(Map<String, Object> map) {
        Integer valueOf = Integer.valueOf(StackType.REPEAT_INC);
        List<String> valueAsList = ConfigUtils.getValueAsList(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, map);
        List<String> valueAsList2 = ConfigUtils.getValueAsList(Config.WORKER_GC_CHILDOPTS, map);
        Double parseJvmHeapMemByChildOpts = Utils.parseJvmHeapMemByChildOpts(valueAsList, null);
        if (parseJvmHeapMemByChildOpts == null) {
            parseJvmHeapMemByChildOpts = Utils.parseJvmHeapMemByChildOpts(valueAsList2, null);
        }
        Double parseJvmHeapMemByChildOpts2 = Utils.parseJvmHeapMemByChildOpts(ConfigUtils.getValueAsList(Config.TOPOLOGY_WORKER_CHILDOPTS, map), null);
        Double parseJvmHeapMemByChildOpts3 = Utils.parseJvmHeapMemByChildOpts(ConfigUtils.getValueAsList(Config.WORKER_CHILDOPTS, map), null);
        double doubleValue = parseJvmHeapMemByChildOpts != null ? CMAESOptimizer.DEFAULT_STOPFITNESS + parseJvmHeapMemByChildOpts.doubleValue() : parseJvmHeapMemByChildOpts2 != null ? CMAESOptimizer.DEFAULT_STOPFITNESS + parseJvmHeapMemByChildOpts2.doubleValue() : parseJvmHeapMemByChildOpts3 != null ? CMAESOptimizer.DEFAULT_STOPFITNESS + parseJvmHeapMemByChildOpts3.doubleValue() : CMAESOptimizer.DEFAULT_STOPFITNESS + ObjectReader.getInt(map.get(Config.WORKER_HEAP_MEMORY_MB), valueOf).intValue();
        List<String> valueAsList3 = ConfigUtils.getValueAsList(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, map);
        if (valueAsList3 != null) {
            doubleValue += Utils.parseJvmHeapMemByChildOpts(valueAsList3, Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS)).doubleValue();
        }
        return doubleValue;
    }

    protected void assertValidTopologyForModification(String str) {
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Topologies getTopologies() {
        return this.topologies;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Set<String> getBlacklistedHosts() {
        return this.blackListedHosts;
    }

    public void setBlacklistedHosts(Set<String> set) {
        if (set == this.blackListedHosts) {
            return;
        }
        this.blackListedHosts.clear();
        this.blackListedHosts.addAll(set);
    }

    public void blacklistHost(String str) {
        this.blackListedHosts.add(str);
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public boolean isBlackListed(String str) {
        return this.blackListedHosts.contains(getHost(str));
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public boolean isBlacklistedHost(String str) {
        return this.blackListedHosts.contains(str);
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public String getHost(String str) {
        return this.inimbus.getHostName(this.supervisors, str);
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public List<TopologyDetails> needsSchedulingTopologies() {
        ArrayList arrayList = new ArrayList();
        Iterator<TopologyDetails> it = getTopologies().iterator();
        while (it.hasNext()) {
            TopologyDetails next = it.next();
            if (needsScheduling(next)) {
                arrayList.add(next);
            }
        }
        return arrayList;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public boolean needsScheduling(TopologyDetails topologyDetails) {
        return topologyDetails.getNumWorkers() > getAssignedNumWorkers(topologyDetails) || getUnassignedExecutors(topologyDetails).size() > 0;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public boolean needsSchedulingRas(TopologyDetails topologyDetails) {
        return getUnassignedExecutors(topologyDetails).size() > 0;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Map<ExecutorDetails, String> getNeedsSchedulingExecutorToComponents(TopologyDetails topologyDetails) {
        HashSet hashSet = new HashSet(topologyDetails.getExecutors());
        SchedulerAssignmentImpl schedulerAssignmentImpl = this.assignments.get(topologyDetails.getId());
        if (schedulerAssignmentImpl != null) {
            hashSet.removeAll(schedulerAssignmentImpl.getExecutors());
        }
        return topologyDetails.selectExecutorToComponent(hashSet);
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Map<String, List<ExecutorDetails>> getNeedsSchedulingComponentToExecutors(TopologyDetails topologyDetails) {
        Map<ExecutorDetails, String> needsSchedulingExecutorToComponents = getNeedsSchedulingExecutorToComponents(topologyDetails);
        HashMap hashMap = new HashMap();
        for (Map.Entry<ExecutorDetails, String> entry : needsSchedulingExecutorToComponents.entrySet()) {
            ExecutorDetails key = entry.getKey();
            String value = entry.getValue();
            if (!hashMap.containsKey(value)) {
                hashMap.put(value, new ArrayList());
            }
            ((List) hashMap.get(value)).add(key);
        }
        return hashMap;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Set<Integer> getUsedPorts(SupervisorDetails supervisorDetails) {
        return (Set) this.nodeToUsedSlotsCache.computeIfAbsent(supervisorDetails.getId(), Cluster::makeSet).stream().map((v0) -> {
            return v0.getPort();
        }).collect(Collectors.toSet());
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Set<Integer> getAvailablePorts(SupervisorDetails supervisorDetails) {
        Set<Integer> usedPorts = getUsedPorts(supervisorDetails);
        HashSet hashSet = new HashSet();
        hashSet.addAll(getAssignablePorts(supervisorDetails));
        hashSet.removeAll(usedPorts);
        return hashSet;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Set<Integer> getAssignablePorts(SupervisorDetails supervisorDetails) {
        return isBlackListed(supervisorDetails.getId()) ? Collections.emptySet() : supervisorDetails.getAllPorts();
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public List<WorkerSlot> getNonBlacklistedAvailableSlots(List<String> list) {
        ArrayList arrayList = new ArrayList();
        for (SupervisorDetails supervisorDetails : this.supervisors.values()) {
            if (!isBlackListed(supervisorDetails.getId()) && !list.contains(supervisorDetails.getId())) {
                arrayList.addAll(getAvailableSlots(supervisorDetails));
            }
        }
        return arrayList;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public List<WorkerSlot> getAvailableSlots() {
        ArrayList arrayList = new ArrayList();
        Iterator<SupervisorDetails> it = this.supervisors.values().iterator();
        while (it.hasNext()) {
            arrayList.addAll(getAvailableSlots(it.next()));
        }
        return arrayList;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisorDetails) {
        Set<Integer> availablePorts = getAvailablePorts(supervisorDetails);
        ArrayList arrayList = new ArrayList(availablePorts.size());
        Iterator<Integer> it = availablePorts.iterator();
        while (it.hasNext()) {
            arrayList.add(new WorkerSlot(supervisorDetails.getId(), it.next()));
        }
        return arrayList;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisorDetails) {
        Set<Integer> assignablePorts = getAssignablePorts(supervisorDetails);
        ArrayList arrayList = new ArrayList(assignablePorts.size());
        Iterator<Integer> it = assignablePorts.iterator();
        while (it.hasNext()) {
            arrayList.add(new WorkerSlot(supervisorDetails.getId(), it.next()));
        }
        return arrayList;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public List<WorkerSlot> getAssignableSlots() {
        ArrayList arrayList = new ArrayList();
        Iterator<SupervisorDetails> it = this.supervisors.values().iterator();
        while (it.hasNext()) {
            arrayList.addAll(getAssignableSlots(it.next()));
        }
        return arrayList;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Collection<ExecutorDetails> getUnassignedExecutors(TopologyDetails topologyDetails) {
        if (topologyDetails == null) {
            return new ArrayList(0);
        }
        HashSet hashSet = new HashSet(topologyDetails.getExecutors());
        SchedulerAssignment assignmentById = getAssignmentById(topologyDetails.getId());
        if (assignmentById != null) {
            hashSet.removeAll(assignmentById.getExecutors());
        }
        return hashSet;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public int getAssignedNumWorkers(TopologyDetails topologyDetails) {
        SchedulerAssignment assignmentById = topologyDetails != null ? getAssignmentById(topologyDetails.getId()) : null;
        if (assignmentById == null) {
            return 0;
        }
        HashSet hashSet = new HashSet();
        hashSet.addAll(assignmentById.getExecutorToSlot().values());
        return hashSet.size();
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public NormalizedResourceOffer getAvailableResources(SupervisorDetails supervisorDetails) {
        NormalizedResourceOffer normalizedResourceOffer = new NormalizedResourceOffer(supervisorDetails.getTotalResources());
        Iterator<SchedulerAssignmentImpl> it = this.assignments.values().iterator();
        while (it.hasNext()) {
            for (Map.Entry<WorkerSlot, WorkerResources> entry : it.next().getScheduledResources().entrySet()) {
                if (supervisorDetails.getId().equals(entry.getKey().getNodeId())) {
                    normalizedResourceOffer.remove(entry.getValue(), getResourceMetrics());
                }
            }
        }
        return normalizedResourceOffer;
    }

    private void addResource(Map<String, Double> map, String str, Double d) {
        if (!map.containsKey(str)) {
            map.put(str, Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        }
        map.put(str, Double.valueOf(map.get(str).doubleValue() + d.doubleValue()));
    }

    private WorkerResources calculateWorkerResources(TopologyDetails topologyDetails, Collection<ExecutorDetails> collection) {
        NormalizedResourceRequest normalizedResourceRequest = new NormalizedResourceRequest();
        HashMap hashMap = new HashMap();
        Iterator<ExecutorDetails> it = collection.iterator();
        while (it.hasNext()) {
            NormalizedResourceRequest totalResources = topologyDetails.getTotalResources(it.next());
            if (totalResources != null) {
                normalizedResourceRequest.add(totalResources);
            }
        }
        for (SharedMemory sharedMemory : topologyDetails.getSharedMemoryRequests(collection)) {
            normalizedResourceRequest.addOffHeap(sharedMemory.get_off_heap_worker());
            normalizedResourceRequest.addOnHeap(sharedMemory.get_on_heap());
            addResource(hashMap, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, Double.valueOf(sharedMemory.get_off_heap_worker()));
            addResource(hashMap, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, Double.valueOf(sharedMemory.get_on_heap()));
        }
        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(hashMap);
        Map<String, Double> normalizedMap = normalizedResourceRequest.toNormalizedMap();
        Double valueOf = Double.valueOf(normalizedResourceRequest.getTotalCpu());
        if (valueOf.doubleValue() < this.minWorkerCpu) {
            valueOf = Double.valueOf(this.minWorkerCpu);
            normalizedMap.put(Constants.COMMON_CPU_RESOURCE_NAME, valueOf);
        }
        WorkerResources workerResources = new WorkerResources();
        workerResources.set_resources(normalizedMap);
        workerResources.set_shared_resources(normalizedResourceMap);
        workerResources.set_cpu(valueOf.doubleValue());
        workerResources.set_mem_off_heap(normalizedResourceRequest.getOffHeapMemoryMb());
        workerResources.set_mem_on_heap(normalizedResourceRequest.getOnHeapMemoryMb());
        workerResources.set_shared_mem_off_heap(normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS)).doubleValue());
        workerResources.set_shared_mem_on_heap(normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS)).doubleValue());
        return workerResources;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public boolean wouldFit(WorkerSlot workerSlot, ExecutorDetails executorDetails, TopologyDetails topologyDetails, NormalizedResourceOffer normalizedResourceOffer, double d) {
        if (!normalizedResourceOffer.couldFit(this.minWorkerCpu, topologyDetails.getTotalResources(executorDetails))) {
            return false;
        }
        double d2 = 0.0d;
        double d3 = 0.0d;
        HashSet hashSet = new HashSet();
        hashSet.add(executorDetails);
        SchedulerAssignmentImpl schedulerAssignmentImpl = this.assignments.get(topologyDetails.getId());
        if (schedulerAssignmentImpl != null) {
            Collection<ExecutorDetails> collection = schedulerAssignmentImpl.getSlotToExecutors().get(workerSlot);
            if (collection != null) {
                hashSet.addAll(collection);
                WorkerResources calculateWorkerResources = calculateWorkerResources(topologyDetails, collection);
                d2 = calculateWorkerResources.get_mem_off_heap() + calculateWorkerResources.get_mem_on_heap();
                d3 = calculateWorkerResources.get_cpu();
            }
            d2 += calculateSharedOffHeapNodeMemory(workerSlot.getNodeId(), schedulerAssignmentImpl, topologyDetails);
        }
        WorkerResources calculateWorkerResources2 = calculateWorkerResources(topologyDetails, hashSet);
        double calculateSharedOffHeapNodeMemory = calculateWorkerResources2.get_mem_off_heap() + calculateWorkerResources2.get_mem_on_heap() + calculateSharedOffHeapNodeMemory(workerSlot.getNodeId(), schedulerAssignmentImpl, topologyDetails, executorDetails);
        double d4 = calculateWorkerResources2.get_mem_on_heap();
        double d5 = calculateWorkerResources2.get_cpu() - d3;
        double totalCpu = normalizedResourceOffer.getTotalCpu();
        if (d5 > totalCpu) {
            if (!LOG.isTraceEnabled()) {
                return false;
            }
            LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}", topologyDetails.getName(), executorDetails, workerSlot, Double.valueOf(d5), Double.valueOf(totalCpu));
            return false;
        }
        double d6 = calculateSharedOffHeapNodeMemory - d2;
        double totalMemoryMb = normalizedResourceOffer.getTotalMemoryMb();
        if (d6 > totalMemoryMb) {
            if (!LOG.isTraceEnabled()) {
                return false;
            }
            LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", topologyDetails.getName(), executorDetails, workerSlot, Double.valueOf(d6), Double.valueOf(totalMemoryMb));
            return false;
        }
        if (d4 <= d) {
            return true;
        }
        if (!LOG.isTraceEnabled()) {
            return false;
        }
        LOG.trace("Could not schedule {}:{} on {} HEAP would be too large {} > {}", topologyDetails.getName(), executorDetails, workerSlot, Double.valueOf(d4), Double.valueOf(d));
        return false;
    }

    public void assign(WorkerSlot workerSlot, String str, Collection<ExecutorDetails> collection) {
        assertValidTopologyForModification(str);
        if (isSlotOccupied(workerSlot)) {
            throw new RuntimeException("slot: [" + workerSlot.getNodeId() + Strings.DEFAULT_KEYVALUE_SEPARATOR + workerSlot.getPort() + "] is already occupied.");
        }
        TopologyDetails byId = this.topologies.getById(str);
        if (byId == null) {
            throw new IllegalArgumentException("Trying to schedule for topo " + str + " but that is not a known topology " + this.topologies.getAllIds());
        }
        WorkerResources calculateWorkerResources = calculateWorkerResources(byId, collection);
        SchedulerAssignmentImpl schedulerAssignmentImpl = this.assignments.get(str);
        if (schedulerAssignmentImpl == null) {
            schedulerAssignmentImpl = new SchedulerAssignmentImpl(str);
            this.assignments.put(str, schedulerAssignmentImpl);
        } else {
            for (ExecutorDetails executorDetails : collection) {
                if (schedulerAssignmentImpl.isExecutorAssigned(executorDetails)) {
                    throw new RuntimeException("Attempting to assign executor: " + executorDetails + " of topology: " + str + " to workerslot: " + workerSlot + ". The executor is already assigned to workerslot: " + schedulerAssignmentImpl.getExecutorToSlot().get(executorDetails) + ". The executor must unassigned before it can be assigned to another slot!");
                }
            }
        }
        schedulerAssignmentImpl.assign(workerSlot, collection, calculateWorkerResources);
        String nodeId = workerSlot.getNodeId();
        double calculateSharedOffHeapNodeMemory = calculateSharedOffHeapNodeMemory(nodeId, schedulerAssignmentImpl, byId);
        schedulerAssignmentImpl.setTotalSharedOffHeapNodeMemory(nodeId, calculateSharedOffHeapNodeMemory);
        updateCachesForWorkerSlot(workerSlot, calculateWorkerResources, str, Double.valueOf(calculateSharedOffHeapNodeMemory));
        this.totalResourcesPerNodeCache.remove(workerSlot.getNodeId());
    }

    public void assign(SchedulerAssignment schedulerAssignment, boolean z) {
        String topologyId = schedulerAssignment.getTopologyId();
        assertValidTopologyForModification(topologyId);
        for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry : schedulerAssignment.getSlotToExecutors().entrySet()) {
            try {
                assign(entry.getKey(), topologyId, entry.getValue());
            } catch (RuntimeException e) {
                if (!z) {
                    throw e;
                }
            }
        }
    }

    private double calculateSharedOffHeapNodeMemory(String str, SchedulerAssignmentImpl schedulerAssignmentImpl, TopologyDetails topologyDetails) {
        return calculateSharedOffHeapNodeMemory(str, schedulerAssignmentImpl, topologyDetails, null);
    }

    private double calculateSharedOffHeapNodeMemory(String str, SchedulerAssignmentImpl schedulerAssignmentImpl, TopologyDetails topologyDetails, ExecutorDetails executorDetails) {
        HashSet hashSet = new HashSet();
        if (schedulerAssignmentImpl != null) {
            for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry : schedulerAssignmentImpl.getSlotToExecutors().entrySet()) {
                if (str.equals(entry.getKey().getNodeId())) {
                    hashSet.addAll(entry.getValue());
                }
            }
        }
        if (executorDetails != null) {
            hashSet.add(executorDetails);
        }
        double d = 0.0d;
        Iterator<SharedMemory> it = topologyDetails.getSharedMemoryRequests(hashSet).iterator();
        while (it.hasNext()) {
            d += it.next().get_off_heap_node();
        }
        return d;
    }

    public void freeSlot(WorkerSlot workerSlot) {
        for (SchedulerAssignmentImpl schedulerAssignmentImpl : this.assignments.values()) {
            if (schedulerAssignmentImpl.isSlotOccupied(workerSlot)) {
                assertValidTopologyForModification(schedulerAssignmentImpl.getTopologyId());
                schedulerAssignmentImpl.unassignBySlot(workerSlot);
                String nodeId = workerSlot.getNodeId();
                schedulerAssignmentImpl.setTotalSharedOffHeapNodeMemory(nodeId, calculateSharedOffHeapNodeMemory(nodeId, schedulerAssignmentImpl, this.topologies.getById(schedulerAssignmentImpl.getTopologyId())));
                this.nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(workerSlot, new NormalizedResourceRequest());
                this.nodeToUsedSlotsCache.computeIfAbsent(nodeId, Cluster::makeSet).remove(workerSlot);
            }
        }
        this.totalResourcesPerNodeCache.remove(workerSlot.getNodeId());
    }

    public void freeSlots(Collection<WorkerSlot> collection) {
        if (collection != null) {
            Iterator<WorkerSlot> it = collection.iterator();
            while (it.hasNext()) {
                freeSlot(it.next());
            }
        }
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public boolean isSlotOccupied(WorkerSlot workerSlot) {
        return this.nodeToUsedSlotsCache.computeIfAbsent(workerSlot.getNodeId(), Cluster::makeSet).contains(workerSlot);
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public SchedulerAssignment getAssignmentById(String str) {
        if (this.assignments.containsKey(str)) {
            return this.assignments.get(str);
        }
        return null;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Collection<WorkerSlot> getUsedSlotsByTopologyId(String str) {
        SchedulerAssignmentImpl schedulerAssignmentImpl = this.assignments.get(str);
        return schedulerAssignmentImpl == null ? Collections.emptySet() : schedulerAssignmentImpl.getSlots();
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public SupervisorDetails getSupervisorById(String str) {
        return this.supervisors.get(str);
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Collection<WorkerSlot> getUsedSlots() {
        return (Collection) this.nodeToUsedSlotsCache.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet());
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public List<SupervisorDetails> getSupervisorsByHost(String str) {
        List<String> list = this.hostToId.get(str);
        ArrayList arrayList = new ArrayList();
        if (list != null) {
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(getSupervisorById(it.next()));
            }
        }
        return arrayList;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Map<String, SchedulerAssignment> getAssignments() {
        return new HashMap(this.assignments);
    }

    public void setAssignments(Map<String, ? extends SchedulerAssignment> map, boolean z) {
        if (map == this.assignments) {
            return;
        }
        Iterator<? extends SchedulerAssignment> it = map.values().iterator();
        while (it.hasNext()) {
            assertValidTopologyForModification(it.next().getTopologyId());
        }
        Iterator<SchedulerAssignmentImpl> it2 = this.assignments.values().iterator();
        while (it2.hasNext()) {
            assertValidTopologyForModification(it2.next().getTopologyId());
        }
        this.assignments.clear();
        this.totalResourcesPerNodeCache.clear();
        this.nodeToScheduledResourcesCache.values().forEach((v0) -> {
            v0.clear();
        });
        this.nodeToUsedSlotsCache.values().forEach((v0) -> {
            v0.clear();
        });
        Iterator<? extends SchedulerAssignment> it3 = map.values().iterator();
        while (it3.hasNext()) {
            assign(it3.next(), z);
        }
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Map<String, SupervisorDetails> getSupervisors() {
        return this.supervisors;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public NormalizedResourceOffer getNonBlacklistedClusterAvailableResources(Collection<String> collection) {
        NormalizedResourceOffer normalizedResourceOffer = new NormalizedResourceOffer();
        for (SupervisorDetails supervisorDetails : this.supervisors.values()) {
            if (!isBlackListed(supervisorDetails.getId()) && !collection.contains(supervisorDetails.getId())) {
                normalizedResourceOffer.add(supervisorDetails.getTotalResources());
                normalizedResourceOffer.remove(getAllScheduledResourcesForNode(supervisorDetails.getId()), getResourceMetrics());
            }
        }
        return normalizedResourceOffer;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public double getClusterTotalCpuResource() {
        double d = 0.0d;
        Iterator<SupervisorDetails> it = this.supervisors.values().iterator();
        while (it.hasNext()) {
            d += it.next().getTotalCpu();
        }
        return d;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public double getClusterTotalMemoryResource() {
        double d = 0.0d;
        Iterator<SupervisorDetails> it = this.supervisors.values().iterator();
        while (it.hasNext()) {
            d += it.next().getTotalMemory();
        }
        return d;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Map<String, List<String>> getNetworkTopography() {
        return this.networkTopography;
    }

    @VisibleForTesting
    public void setNetworkTopography(Map<String, List<String>> map) {
        this.networkTopography.clear();
        this.networkTopography.putAll(map);
    }

    public void setStatus(TopologyDetails topologyDetails, String str) {
        setStatus(topologyDetails.getId(), str);
    }

    public void setStatus(String str, String str2) {
        assertValidTopologyForModification(str);
        LOG.info("STATUS - {} {}", str, str2);
        this.status.put(str, str2);
    }

    public void setStatusIfAbsent(String str, String str2) {
        assertValidTopologyForModification(str);
        this.status.putIfAbsent(str, str2);
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Map<String, String> getStatusMap() {
        return this.status;
    }

    public void setStatusMap(Map<String, String> map) {
        if (map == this.status) {
            return;
        }
        Iterator<String> it = map.keySet().iterator();
        while (it.hasNext()) {
            assertValidTopologyForModification(it.next());
        }
        Iterator<String> it2 = this.status.keySet().iterator();
        while (it2.hasNext()) {
            assertValidTopologyForModification(it2.next());
        }
        this.status.clear();
        this.status.putAll(map);
    }

    public String getStatus(String str) {
        return this.status.get(str);
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Map<String, TopologyResources> getTopologyResourcesMap() {
        HashMap hashMap = new HashMap(this.assignments.size());
        for (TopologyDetails topologyDetails : this.topologies.getTopologies()) {
            String id = topologyDetails.getId();
            hashMap.put(id, new TopologyResources(topologyDetails, this.assignments.get(id)));
        }
        return hashMap;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Map<String, SupervisorResources> getSupervisorsResourcesMap() {
        HashMap hashMap = new HashMap();
        for (SupervisorDetails supervisorDetails : this.supervisors.values()) {
            hashMap.put(supervisorDetails.getId(), new SupervisorResources(supervisorDetails.getTotalMemory(), supervisorDetails.getTotalCpu(), CMAESOptimizer.DEFAULT_STOPFITNESS, CMAESOptimizer.DEFAULT_STOPFITNESS));
        }
        for (SchedulerAssignmentImpl schedulerAssignmentImpl : this.assignments.values()) {
            for (Map.Entry<WorkerSlot, WorkerResources> entry : schedulerAssignmentImpl.getScheduledResources().entrySet()) {
                String nodeId = entry.getKey().getNodeId();
                SupervisorResources supervisorResources = (SupervisorResources) hashMap.get(nodeId);
                if (supervisorResources == null) {
                    supervisorResources = new SupervisorResources(CMAESOptimizer.DEFAULT_STOPFITNESS, CMAESOptimizer.DEFAULT_STOPFITNESS, CMAESOptimizer.DEFAULT_STOPFITNESS, CMAESOptimizer.DEFAULT_STOPFITNESS);
                }
                hashMap.put(nodeId, supervisorResources.add(entry.getValue()));
            }
            Map<String, Double> nodeIdToTotalSharedOffHeapNodeMemory = schedulerAssignmentImpl.getNodeIdToTotalSharedOffHeapNodeMemory();
            if (nodeIdToTotalSharedOffHeapNodeMemory != null) {
                for (Map.Entry<String, Double> entry2 : nodeIdToTotalSharedOffHeapNodeMemory.entrySet()) {
                    String key = entry2.getKey();
                    SupervisorResources supervisorResources2 = (SupervisorResources) hashMap.get(key);
                    if (supervisorResources2 == null) {
                        supervisorResources2 = new SupervisorResources(CMAESOptimizer.DEFAULT_STOPFITNESS, CMAESOptimizer.DEFAULT_STOPFITNESS, CMAESOptimizer.DEFAULT_STOPFITNESS, CMAESOptimizer.DEFAULT_STOPFITNESS);
                    }
                    hashMap.put(key, supervisorResources2.addMem(entry2.getValue()));
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Map<String, Map<WorkerSlot, WorkerResources>> getWorkerResourcesMap() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, SchedulerAssignmentImpl> entry : this.assignments.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().getScheduledResources());
        }
        return hashMap;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public WorkerResources getWorkerResources(WorkerSlot workerSlot) {
        WorkerResources workerResources = null;
        Iterator<SchedulerAssignmentImpl> it = this.assignments.values().iterator();
        while (it.hasNext()) {
            workerResources = it.next().getScheduledResources().get(workerSlot);
            if (workerResources != null) {
                break;
            }
        }
        return workerResources;
    }

    private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, String str, Double d) {
        String nodeId = workerSlot.getNodeId();
        NormalizedResourceRequest normalizedResourceRequest = new NormalizedResourceRequest();
        normalizedResourceRequest.add(workerResources);
        this.nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(workerSlot, normalizedResourceRequest);
        this.nodeToScheduledOffHeapNodeMemoryCache.computeIfAbsent(nodeId, Cluster::makeMap).put(str, d);
        this.nodeToUsedSlotsCache.computeIfAbsent(nodeId, Cluster::makeSet).add(workerSlot);
    }

    public ResourceMetrics getResourceMetrics() {
        return this.resourceMetrics;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public NormalizedResourceRequest getAllScheduledResourcesForNode(String str) {
        return this.totalResourcesPerNodeCache.computeIfAbsent(str, str2 -> {
            NormalizedResourceRequest normalizedResourceRequest = new NormalizedResourceRequest();
            Iterator<NormalizedResourceRequest> it = this.nodeToScheduledResourcesCache.computeIfAbsent(str, Cluster::makeMap).values().iterator();
            while (it.hasNext()) {
                normalizedResourceRequest.add(it.next());
            }
            Iterator<Double> it2 = this.nodeToScheduledOffHeapNodeMemoryCache.computeIfAbsent(str2, Cluster::makeMap).values().iterator();
            while (it2.hasNext()) {
                normalizedResourceRequest.addOffHeap(it2.next().doubleValue());
            }
            return normalizedResourceRequest;
        });
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public double getScheduledMemoryForNode(String str) {
        return getAllScheduledResourcesForNode(str).getTotalMemoryMb();
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public double getScheduledCpuForNode(String str) {
        return getAllScheduledResourcesForNode(str).getTotalCpu();
    }

    public INimbus getINimbus() {
        return this.inimbus;
    }

    @Override // org.apache.storm.scheduler.ISchedulingState
    public Map<String, Object> getConf() {
        return this.conf;
    }

    public void unassign(String str) {
        assertValidTopologyForModification(str);
        freeSlots(getUsedSlotsByTopologyId(str));
    }

    public void updateFrom(Cluster cluster) {
        Iterator<SchedulerAssignment> it = cluster.getAssignments().values().iterator();
        while (it.hasNext()) {
            assertValidTopologyForModification(it.next().getTopologyId());
        }
        setAssignments(cluster.getAssignments(), false);
        setStatusMap(cluster.getStatusMap());
    }

    public double getMinWorkerCpu() {
        return this.minWorkerCpu;
    }
}
