package org.objectweb.proactive.extensions.amqp.remoteobject;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.objectweb.proactive.core.ProActiveException;
import org.objectweb.proactive.core.remoteobject.AbstractRemoteObjectFactory;
import org.objectweb.proactive.core.remoteobject.InternalRemoteRemoteObject;
import org.objectweb.proactive.core.remoteobject.InternalRemoteRemoteObjectImpl;
import org.objectweb.proactive.core.remoteobject.RemoteObject;
import org.objectweb.proactive.core.remoteobject.RemoteObjectAdapter;
import org.objectweb.proactive.core.remoteobject.RemoteObjectFactory;
import org.objectweb.proactive.core.remoteobject.RemoteRemoteObject;
import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl;
import org.objectweb.proactive.core.util.URIBuilder;
import org.objectweb.proactive.core.util.converter.remote.ProActiveMarshalInputStream;
import org.objectweb.proactive.core.util.converter.remote.ProActiveMarshalOutputStream;
import org.objectweb.proactive.core.util.log.ProActiveLogger;
import org.objectweb.proactive.extensions.amqp.AMQPConfig;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

/* loaded from: input_file:WEB-INF/lib/proactive-programming-bundle-5.2.0-update-12.jar:org/objectweb/proactive/extensions/amqp/remoteobject/AMQPRemoteObjectFactory.class */
public class AMQPRemoteObjectFactory extends AbstractRemoteObjectFactory implements RemoteObjectFactory {
    static final Logger logger = ProActiveLogger.getLogger(AMQPConfig.Loggers.AMQP_REMOTE_OBJECT_FACTORY);
    public static final String PROTOCOL_ID = "amqp";
    private boolean exchangeInitialized;

    @Override // org.objectweb.proactive.core.remoteobject.RemoteObjectFactory
    public RemoteRemoteObject newRemoteObject(InternalRemoteRemoteObject internalRemoteRemoteObject) throws ProActiveException {
        try {
            ensureExchangesExist(internalRemoteRemoteObject.getURI());
            return new AMQPRemoteObject(internalRemoteRemoteObject.getURI());
        } catch (IOException e) {
            throw new ProActiveException(String.format("AMQP unable to create the RemoteRemoteObject for %s", internalRemoteRemoteObject.toString()), e);
        }
    }

    @Override // org.objectweb.proactive.core.remoteobject.RemoteObjectFactory
    public RemoteRemoteObject register(InternalRemoteRemoteObject internalRemoteRemoteObject, URI uri, boolean z) throws ProActiveException {
        try {
            ensureExchangesExist(uri);
            new AMQPRemoteObjectServer(internalRemoteRemoteObject).connect(z);
            return new AMQPRemoteObject(uri);
        } catch (IOException e) {
            throw new ProActiveException(String.format("AMQP unable to register the object at %s", uri.toString()), e);
        }
    }

    @Override // org.objectweb.proactive.core.remoteobject.RemoteObjectFactory
    public void unregister(URI uri) throws ProActiveException {
        ReusableChannel reusableChannel = null;
        try {
            reusableChannel = AMQPUtils.getChannel(uri);
            reusableChannel.getChannel().queueDelete(AMQPUtils.computeQueueNameFromName(URIBuilder.getNameFromURI(uri)));
            AMQPUtils.returnChannel(reusableChannel);
        } catch (IOException e) {
            reusableChannel.close();
            throw new ProActiveException(e);
        }
    }

    @Override // org.objectweb.proactive.core.remoteobject.RemoteObjectFactory
    public <T> RemoteObject<T> lookup(URI uri) throws ProActiveException {
        try {
            ensureExchangesExist(uri);
            ReusableChannel channel = AMQPUtils.getChannel(uri);
            String computeQueueNameFromName = AMQPUtils.computeQueueNameFromName(URIBuilder.getNameFromURI(uri));
            try {
                channel.getChannel().queueDeclarePassive(computeQueueNameFromName);
                AMQPUtils.returnChannel(channel);
                return new RemoteObjectAdapter(new AMQPRemoteObject(uri));
            } catch (IOException e) {
                throw new ProActiveException("Lookup failed to get response while sending request to the " + computeQueueNameFromName, e);
            }
        } catch (IOException e2) {
            throw new ProActiveException(String.format("unable to lookup object at %s", uri.toString()), e2);
        }
    }

    @Override // org.objectweb.proactive.core.remoteobject.RemoteObjectFactory
    public URI[] list(URI uri) throws ProActiveException {
        try {
            ensureExchangesExist(uri);
            List<URI> discover = new FindQueuesRPCClient().discover(uri, AMQPConfig.PA_AMQP_DISCOVER_EXCHANGE_NAME.getValue(), DefaultMessageListenerContainer.DEFAULT_RECOVERY_INTERVAL);
            URI[] uriArr = (URI[]) discover.toArray(new URI[discover.size()]);
            logger.debug(String.format("AMQP Registry contains %s", Arrays.toString(uriArr)));
            return uriArr;
        } catch (Exception e) {
            throw new ProActiveException(String.format("unable to list the AMQP registry at %s", uri.toString()), e);
        }
    }

    @Override // org.objectweb.proactive.core.remoteobject.RemoteObjectFactory
    public String getProtocolId() {
        return PROTOCOL_ID;
    }

    @Override // org.objectweb.proactive.core.remoteobject.RemoteObjectFactory
    public void unexport(RemoteRemoteObject remoteRemoteObject) throws ProActiveException {
        logger.debug("unexport is not supported for AMQP");
    }

    @Override // org.objectweb.proactive.core.remoteobject.RemoteObjectFactory
    public int getPort() {
        return AMQPConfig.PA_AMQP_BROKER_PORT.getValue();
    }

    @Override // org.objectweb.proactive.core.remoteobject.RemoteObjectFactory
    public InternalRemoteRemoteObject createRemoteObject(RemoteObject<?> remoteObject, String str, boolean z) throws ProActiveException {
        try {
            if (!str.startsWith("/")) {
                str = "/" + str;
            }
            URI uri = new URI(getProtocolId(), null, AMQPConfig.PA_AMQP_BROKER_ADDRESS.getValue(), getPort(), str, null, null);
            InternalRemoteRemoteObjectImpl internalRemoteRemoteObjectImpl = new InternalRemoteRemoteObjectImpl(remoteObject, uri);
            internalRemoteRemoteObjectImpl.setRemoteRemoteObject(register(internalRemoteRemoteObjectImpl, uri, z));
            return internalRemoteRemoteObjectImpl;
        } catch (URISyntaxException e) {
            throw new ProActiveException(String.format("Failed to create remote object %s", str), e);
        }
    }

    @Override // org.objectweb.proactive.core.remoteobject.RemoteObjectFactory
    public URI getBaseURI() {
        return URIBuilder.buildURI(AMQPConfig.PA_AMQP_BROKER_ADDRESS.getValue(), "", getProtocolId(), AMQPConfig.PA_AMQP_BROKER_PORT.getValue());
    }

    @Override // org.objectweb.proactive.core.remoteobject.RemoteObjectFactory
    public ObjectInputStream getProtocolObjectInputStream(InputStream inputStream) throws IOException {
        return new ProActiveMarshalInputStream(inputStream);
    }

    @Override // org.objectweb.proactive.core.remoteobject.RemoteObjectFactory
    public ObjectOutputStream getProtocolObjectOutputStream(OutputStream outputStream) throws IOException {
        return new ProActiveMarshalOutputStream(outputStream, ProActiveRuntimeImpl.getProActiveRuntime().getURL());
    }

    private void ensureExchangesExist(URI uri) throws IOException {
        if (this.exchangeInitialized) {
            return;
        }
        ReusableChannel channel = AMQPUtils.getChannel(uri);
        try {
            channel.getChannel().exchangeDeclare(AMQPConfig.PA_AMQP_DISCOVER_EXCHANGE_NAME.getValue(), "fanout", false, false, false, (Map) null);
            channel.getChannel().exchangeDeclare(AMQPConfig.PA_AMQP_RPC_EXCHANGE_NAME.getValue(), "direct", false, false, false, (Map) null);
            this.exchangeInitialized = true;
            AMQPUtils.returnChannel(channel);
        } catch (IOException e) {
            channel.close();
            throw e;
        }
    }
}
