package org.apache.storm.utils;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/utils/InprocMessaging.class */
public class InprocMessaging {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) InprocMessaging.class);
    private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new HashMap();
    private static ConcurrentMap<Integer, AtomicBoolean> _hasReader = new ConcurrentHashMap();
    private static int port = 1;

    public static synchronized int acquireNewPort() {
        int i = port;
        port++;
        return i;
    }

    public static void sendMessage(int i, Object obj) {
        waitForReader(i);
        getQueue(i).add(obj);
    }

    public static void sendMessageNoWait(int i, Object obj) {
        getQueue(i).add(obj);
    }

    public static Object takeMessage(int i) throws InterruptedException {
        readerArrived(i);
        return getQueue(i).take();
    }

    public static Object pollMessage(int i) {
        readerArrived(i);
        return getQueue(i).poll();
    }

    private static AtomicBoolean getHasReader(int i) {
        AtomicBoolean atomicBoolean = _hasReader.get(Integer.valueOf(i));
        if (atomicBoolean == null) {
            _hasReader.putIfAbsent(Integer.valueOf(i), new AtomicBoolean(false));
            atomicBoolean = _hasReader.get(Integer.valueOf(i));
        }
        return atomicBoolean;
    }

    public static void waitForReader(int i) {
        AtomicBoolean hasReader = getHasReader(i);
        long currentTimeMillis = Time.currentTimeMillis();
        while (!hasReader.get()) {
            if (Time.isSimulating()) {
                Time.advanceTime(10L);
            }
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
            }
            if (Time.currentTimeMillis() - currentTimeMillis > 20000) {
                LOG.error("DONE WAITING FOR READER AFTER {} ms", Long.valueOf(Time.currentTimeMillis() - currentTimeMillis));
                return;
            }
        }
    }

    private static void readerArrived(int i) {
        getHasReader(i).set(true);
    }

    private static synchronized LinkedBlockingQueue<Object> getQueue(int i) {
        if (!_queues.containsKey(Integer.valueOf(i))) {
            _queues.put(Integer.valueOf(i), new LinkedBlockingQueue<>());
        }
        return _queues.get(Integer.valueOf(i));
    }
}
