JDK14/Java14源码在线阅读

JDK14/Java14源码在线阅读 / jdk.sctp / unix / classes / sun / nio / ch / sctp / SctpMultiChannelImpl.java
/*
 * Copyright (c) 2009, 2019, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */
package sun.nio.ch.sctp;

import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.InetSocketAddress;
import java.io.FileDescriptor;
import java.io.IOException;
import java.util.Collections;
import java.util.Map.Entry;
import java.util.Iterator;
import java.util.Set;
import java.util.HashSet;
import java.util.HashMap;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetBoundException;
import java.nio.channels.spi.SelectorProvider;
import com.sun.nio.sctp.AbstractNotificationHandler;
import com.sun.nio.sctp.Association;
import com.sun.nio.sctp.AssociationChangeNotification;
import com.sun.nio.sctp.HandlerResult;
import com.sun.nio.sctp.IllegalReceiveException;
import com.sun.nio.sctp.InvalidStreamException;
import com.sun.nio.sctp.IllegalUnbindException;
import com.sun.nio.sctp.NotificationHandler;
import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.SctpChannel;
import com.sun.nio.sctp.SctpMultiChannel;
import com.sun.nio.sctp.SctpSocketOption;
import sun.net.util.IPAddressUtil;
import sun.nio.ch.DirectBuffer;
import sun.nio.ch.NativeThread;
import sun.nio.ch.IOStatus;
import sun.nio.ch.IOUtil;
import sun.nio.ch.Net;
import sun.nio.ch.SelChImpl;
import sun.nio.ch.SelectionKeyImpl;
import sun.nio.ch.Util;
import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
import static sun.nio.ch.sctp.ResultContainer.*;

/**
 * An implementation of SctpMultiChannel
 */
public class SctpMultiChannelImpl extends SctpMultiChannel
    implements SelChImpl
{
    private final FileDescriptor fd;

    private final int fdVal;

    /* IDs of native threads doing send and receives, for signalling */
    private volatile long receiverThread = 0;
    private volatile long senderThread = 0;

    /* Lock held by current receiving thread */
    private final Object receiveLock = new Object();

    /* Lock held by current sending thread */
    private final Object sendLock = new Object();

    /* Lock held by any thread that modifies the state fields declared below
     * DO NOT invoke a blocking I/O operation while holding this lock! */
    private final Object stateLock = new Object();

    private enum ChannelState {
        UNINITIALIZED,
        KILLPENDING,
        KILLED,
    }

    /* -- The following fields are protected by stateLock -- */
    private ChannelState state = ChannelState.UNINITIALIZED;

    /* Binding: Once bound the port will remain constant. */
    int port = -1;
    private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();
    /* Has the channel been bound to the wildcard address */
    private boolean wildcard; /* false */

    /* Keeps a map of addresses to association, and visa versa */
    private HashMap<SocketAddress, Association> addressMap =
                         new HashMap<SocketAddress, Association>();
    private HashMap<Association, Set<SocketAddress>> associationMap =
                         new HashMap<Association, Set<SocketAddress>>();

    /* -- End of fields protected by stateLock -- */

    /* If an association has been shutdown mark it for removal after
     * the user handler has been invoked */
    private final ThreadLocal<Association> associationToRemove =
        new ThreadLocal<Association>() {
             @Override protected Association initialValue() {
                 return null;
            }
    };

    /* A notification handler cannot invoke receive */
    private final ThreadLocal<Boolean> receiveInvoked =
        new ThreadLocal<Boolean>() {
             @Override protected Boolean initialValue() {
                 return Boolean.FALSE;
            }
    };

    public SctpMultiChannelImpl(SelectorProvider provider)
            throws IOException {
        //TODO: update provider, remove public modifier
        super(provider);
        this.fd = SctpNet.socket(false /*one-to-many*/);
        this.fdVal = IOUtil.fdVal(fd);
    }

    @Override
    public SctpMultiChannel bind(SocketAddress local, int backlog)
            throws IOException {
        synchronized (receiveLock) {
            synchronized (sendLock) {
                synchronized (stateLock) {
                    ensureOpen();
                    if (isBound())
                        SctpNet.throwAlreadyBoundException();
                    InetSocketAddress isa = (local == null) ?
                        new InetSocketAddress(0) : Net.checkAddress(local);

                    SecurityManager sm = System.getSecurityManager();
                    if (sm != null)
                        sm.checkListen(isa.getPort());
                    Net.bind(fd, isa.getAddress(), isa.getPort());

                    InetSocketAddress boundIsa = Net.localAddress(fd);
                    port = boundIsa.getPort();
                    localAddresses.add(isa);
                    if (isa.getAddress().isAnyLocalAddress())
                        wildcard = true;

                    SctpNet.listen(fdVal, backlog < 1 ? 50 : backlog);
                }
            }
        }
        return this;
    }

    @Override
    public SctpMultiChannel bindAddress(InetAddress address)
            throws IOException {
        return bindUnbindAddress(address, true);
    }

    @Override
    public SctpMultiChannel unbindAddress(InetAddress address)
            throws IOException {
        return bindUnbindAddress(address, false);
    }

    private SctpMultiChannel bindUnbindAddress(InetAddress address,
                                               boolean add)
            throws IOException {
        if (address == null)
            throw new IllegalArgumentException();

        synchronized (receiveLock) {
            synchronized (sendLock) {
                synchronized (stateLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                    if (!isBound())
                        throw new NotYetBoundException();
                    if (wildcard)
                        throw new IllegalStateException(
                                "Cannot add or remove addresses from a channel that is bound to the wildcard address");
                    if (address.isAnyLocalAddress())
                        throw new IllegalArgumentException(
                                "Cannot add or remove the wildcard address");
                    if (add) {
                        for (InetSocketAddress addr : localAddresses) {
                            if (addr.getAddress().equals(address)) {
                                SctpNet.throwAlreadyBoundException();
                            }
                        }
                    } else { /*removing */
                        /* Verify that there is more than one address
                         * and that address is already bound */
                        if (localAddresses.size() <= 1)
                            throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
                        boolean foundAddress = false;
                        for (InetSocketAddress addr : localAddresses) {
                            if (addr.getAddress().equals(address)) {
                                foundAddress = true;
                                break;
                            }
                        }
                        if (!foundAddress )
                            throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
                    }

                    SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);

                    /* Update our internal Set to reflect the addition/removal */
                    if (add)
                        localAddresses.add(new InetSocketAddress(address, port));
                    else {
                        for (InetSocketAddress addr : localAddresses) {
                            if (addr.getAddress().equals(address)) {
                                localAddresses.remove(addr);
                                break;
                            }
                        }
                    }
                }
            }
        }
        return this;
    }

    @Override
    public Set<Association> associations()
            throws ClosedChannelException, NotYetBoundException {
        synchronized (stateLock) {
            if (!isOpen())
                throw new ClosedChannelException();
            if (!isBound())
                throw new NotYetBoundException();

            return Collections.unmodifiableSet(associationMap.keySet());
        }
    }

    private boolean isBound() {
        synchronized (stateLock) {
            return port == -1 ? false : true;
        }
    }

    private void ensureOpen() throws IOException {
        synchronized (stateLock) {
            if (!isOpen())
                throw new ClosedChannelException();
        }
    }

    private void receiverCleanup() throws IOException {
        synchronized (stateLock) {
            receiverThread = 0;
            if (state == ChannelState.KILLPENDING)
                kill();
        }
    }

    private void senderCleanup() throws IOException {
        synchronized (stateLock) {
            senderThread = 0;
            if (state == ChannelState.KILLPENDING)
                kill();
        }
    }

    @Override
    protected void implConfigureBlocking(boolean block) throws IOException {
        IOUtil.configureBlocking(fd, block);
    }

    @Override
    public void implCloseSelectableChannel() throws IOException {
        synchronized (stateLock) {
            SctpNet.preClose(fdVal);

            if (receiverThread != 0)
                NativeThread.signal(receiverThread);

            if (senderThread != 0)
                NativeThread.signal(senderThread);

            if (!isRegistered())
                kill();
        }
    }

    @Override
    public FileDescriptor getFD() {
        return fd;
    }

    @Override
    public int getFDVal() {
        return fdVal;
    }

    /**
     * Translates native poll revent ops into a ready operation ops
     */
    private boolean translateReadyOps(int ops, int initialOps,
                                      SelectionKeyImpl sk) {
        int intOps = sk.nioInterestOps();
        int oldOps = sk.nioReadyOps();
        int newOps = initialOps;

        if ((ops & Net.POLLNVAL) != 0) {
            /* This should only happen if this channel is pre-closed while a
             * selection operation is in progress
             * ## Throw an error if this channel has not been pre-closed */
            return false;
        }

        if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
            newOps = intOps;
            sk.nioReadyOps(newOps);
            return (newOps & ~oldOps) != 0;
        }

        if (((ops & Net.POLLIN) != 0) &&
            ((intOps & SelectionKey.OP_READ) != 0))
            newOps |= SelectionKey.OP_READ;

        if (((ops & Net.POLLOUT) != 0) &&
            ((intOps & SelectionKey.OP_WRITE) != 0))
            newOps |= SelectionKey.OP_WRITE;

        sk.nioReadyOps(newOps);
        return (newOps & ~oldOps) != 0;
    }

    @Override
    public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
        return translateReadyOps(ops, sk.nioReadyOps(), sk);
    }

    @Override
    public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
        return translateReadyOps(ops, 0, sk);
    }

    @Override
    public int translateInterestOps(int ops) {
        int newOps = 0;
        if ((ops & SelectionKey.OP_READ) != 0)
            newOps |= Net.POLLIN;
        if ((ops & SelectionKey.OP_WRITE) != 0)
            newOps |= Net.POLLOUT;
        return newOps;
    }

    @Override
    public void kill() throws IOException {
        synchronized (stateLock) {
            if (state == ChannelState.KILLED)
                return;
            if (state == ChannelState.UNINITIALIZED) {
                state = ChannelState.KILLED;
                return;
            }
            assert !isOpen() && !isRegistered();

            /* Postpone the kill if there is a thread sending or receiving. */
            if (receiverThread == 0 && senderThread == 0) {
                SctpNet.close(fdVal);
                state = ChannelState.KILLED;
            } else {
                state = ChannelState.KILLPENDING;
            }
        }
    }

    @Override
    public <T> SctpMultiChannel setOption(SctpSocketOption<T> name,
                                          T value,
                                          Association association)
            throws IOException {
        if (name == null)
            throw new NullPointerException();
        if (!(supportedOptions().contains(name)))
            throw new UnsupportedOperationException("'" + name + "' not supported");

        synchronized (stateLock) {
            if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
                    name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) {
                checkAssociation(association);
            }
            if (!isOpen())
                throw new ClosedChannelException();

            int assocId = association == null ? 0 : association.associationID();
            SctpNet.setSocketOption(fdVal, name, value, assocId);
        }
        return this;
    }

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getOption(SctpSocketOption<T> name, Association association)
            throws IOException {
        if (name == null)
            throw new NullPointerException();
        if (!supportedOptions().contains(name))
            throw new UnsupportedOperationException("'" + name + "' not supported");

        synchronized (stateLock) {
            if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
                    name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) {
                checkAssociation(association);
            }
            if (!isOpen())
                throw new ClosedChannelException();

            int assocId = association == null ? 0 : association.associationID();
            return (T)SctpNet.getSocketOption(fdVal, name, assocId);
        }
    }

    private static class DefaultOptionsHolder {
        static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();

        private static Set<SctpSocketOption<?>> defaultOptions() {
            HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);
            set.add(SCTP_DISABLE_FRAGMENTS);
            set.add(SCTP_EXPLICIT_COMPLETE);
            set.add(SCTP_FRAGMENT_INTERLEAVE);
            set.add(SCTP_INIT_MAXSTREAMS);
            set.add(SCTP_NODELAY);
            set.add(SCTP_PRIMARY_ADDR);
            set.add(SCTP_SET_PEER_PRIMARY_ADDR);
            set.add(SO_SNDBUF);
            set.add(SO_RCVBUF);
            set.add(SO_LINGER);
            return Collections.unmodifiableSet(set);
        }
    }

    @Override
    public final Set<SctpSocketOption<?>> supportedOptions() {
        return DefaultOptionsHolder.defaultOptions;
    }

    @Override
    public <T> MessageInfo receive(ByteBuffer buffer,
                                   T attachment,
                                   NotificationHandler<T> handler)
            throws IOException {
        if (buffer == null)
            throw new IllegalArgumentException("buffer cannot be null");

        if (buffer.isReadOnly())
            throw new IllegalArgumentException("Read-only buffer");

        if (receiveInvoked.get())
            throw new IllegalReceiveException(
                    "cannot invoke receive from handler");
        receiveInvoked.set(Boolean.TRUE);

        try {
            ResultContainer resultContainer = new ResultContainer();
            do {
                resultContainer.clear();
                synchronized (receiveLock) {
                    ensureOpen();
                    if (!isBound())
                        throw new NotYetBoundException();

                    int n = 0;
                    try {
                        begin();

                        synchronized (stateLock) {
                            if(!isOpen())
                                return null;
                            receiverThread = NativeThread.current();
                        }

                        do {
                            n = receive(fdVal, buffer, resultContainer);
                        } while ((n == IOStatus.INTERRUPTED) && isOpen());

                    } finally {
                        receiverCleanup();
                        end((n > 0) || (n == IOStatus.UNAVAILABLE));
                        assert IOStatus.check(n);
                    }

                    if (!resultContainer.isNotification()) {
                        /* message or nothing */
                        if (resultContainer.hasSomething()) {
                            /* Set the association before returning */
                            MessageInfoImpl info =
                                    resultContainer.getMessageInfo();
                            info.setAssociation(lookupAssociation(info.
                                    associationID()));
                            SecurityManager sm = System.getSecurityManager();
                            if (sm != null) {
                                InetSocketAddress isa  = (InetSocketAddress)info.address();
                                if (!addressMap.containsKey(isa)) {
                                    /* must be a new association */
                                    try {
                                        sm.checkAccept(isa.getAddress().getHostAddress(),
                                                       isa.getPort());
                                    } catch (SecurityException se) {
                                        buffer.clear();
                                        throw se;
                                    }
                                }
                            }

                            assert info.association() != null;
                            return info;
                        } else  {
                          /* Non-blocking may return null if nothing available*/
                            return null;
                        }
                    } else { /* notification */
                        synchronized (stateLock) {
                            handleNotificationInternal(
                                    resultContainer);
                        }
                    }
                } /* receiveLock */
            } while (handler == null ? true :
                (invokeNotificationHandler(resultContainer, handler, attachment)
                 == HandlerResult.CONTINUE));
        } finally {
            receiveInvoked.set(Boolean.FALSE);
        }

        return null;
    }

    private int receive(int fd,
                        ByteBuffer dst,
                        ResultContainer resultContainer)
            throws IOException {
        int pos = dst.position();
        int lim = dst.limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0);
        if (dst instanceof DirectBuffer && rem > 0)
            return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos);

        /* Substitute a native buffer. */
        int newSize = Math.max(rem, 1);
        ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
        try {
            int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0);
            bb.flip();
            if (n > 0 && rem > 0)
                dst.put(bb);
            return n;
        } finally {
            Util.releaseTemporaryDirectBuffer(bb);
        }
    }

    private int receiveIntoNativeBuffer(int fd,
                                        ResultContainer resultContainer,
                                        ByteBuffer bb,
                                        int rem,
                                        int pos)
            throws IOException {
        int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem);
        if (n > 0)
            bb.position(pos + n);
        return n;
    }

    private InternalNotificationHandler internalNotificationHandler =
            new InternalNotificationHandler();

    private void handleNotificationInternal(ResultContainer resultContainer)
    {
        invokeNotificationHandler(resultContainer,
                internalNotificationHandler, null);
    }

    private class InternalNotificationHandler
            extends AbstractNotificationHandler<Object>
    {
        @Override
        public HandlerResult handleNotification(
                AssociationChangeNotification not, Object unused) {
            AssociationChange sac = (AssociationChange) not;

            /* Update map to reflect change in association */
            switch (not.event()) {
                case COMM_UP :
                    Association newAssociation = new AssociationImpl
                       (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
                    addAssociation(newAssociation);
                    break;
                case SHUTDOWN :
                case COMM_LOST :
                //case RESTART: ???
                    /* mark association for removal after user handler invoked*/
                    associationToRemove.set(lookupAssociation(sac.assocId()));
            }
            return HandlerResult.CONTINUE;
        }
    }

    private <T> HandlerResult invokeNotificationHandler(
                                   ResultContainer resultContainer,
                                   NotificationHandler<T> handler,
                                   T attachment) {
        HandlerResult result;
        SctpNotification notification = resultContainer.notification();
        notification.setAssociation(lookupAssociation(notification.assocId()));

        if (!(handler instanceof AbstractNotificationHandler)) {
            result = handler.handleNotification(notification, attachment);
        } else { /* AbstractNotificationHandler */
            AbstractNotificationHandler<T> absHandler =
                    (AbstractNotificationHandler<T>)handler;
            switch(resultContainer.type()) {
                case ASSOCIATION_CHANGED :
                    result = absHandler.handleNotification(
                            resultContainer.getAssociationChanged(), attachment);
                    break;
                case PEER_ADDRESS_CHANGED :
                    result = absHandler.handleNotification(
                            resultContainer.getPeerAddressChanged(), attachment);
                    break;
                case SEND_FAILED :
                    result = absHandler.handleNotification(
                            resultContainer.getSendFailed(), attachment);
                    break;
                case SHUTDOWN :
                    result =  absHandler.handleNotification(
                            resultContainer.getShutdown(), attachment);
                    break;
                default :
                    /* implementation specific handlers */
                    result =  absHandler.handleNotification(
                            resultContainer.notification(), attachment);
            }
        }

        if (!(handler instanceof InternalNotificationHandler)) {
            /* Only remove associations after user handler
             * has finished with them */
            Association assoc = associationToRemove.get();
            if (assoc != null) {
                removeAssociation(assoc);
                associationToRemove.set(null);
            }

        }

        return result;
    }

    private Association lookupAssociation(int assocId) {
        /* Lookup the association in our internal map */
        synchronized (stateLock) {
            Set<Association> assocs = associationMap.keySet();
            for (Association a : assocs) {
                if (a.associationID() == assocId) {
                    return a;
                }
            }
        }
        return null;
    }

    private void addAssociation(Association association) {
        synchronized (stateLock) {
            int assocId = association.associationID();
            Set<SocketAddress> addresses = null;

            try {
                addresses = SctpNet.getRemoteAddresses(fdVal, assocId);
            } catch (IOException unused) {
                /* OK, determining connected addresses may not be possible
                 * shutdown, connection lost, etc */
            }

            associationMap.put(association, addresses);
            if (addresses != null) {
                for (SocketAddress addr : addresses)
                    addressMap.put(addr, association);
            }
        }
    }

    private void removeAssociation(Association association) {
        synchronized (stateLock) {
            int assocId = association.associationID();
            Set<SocketAddress> addresses = null;

             try {
                addresses = SctpNet.getRemoteAddresses(fdVal, assocId);
            } catch (IOException unused) {
                /* OK, determining connected addresses may not be possible
                 * shutdown, connection lost, etc */
            }

            Set<Association> assocs = associationMap.keySet();
            for (Association a : assocs) {
                if (a.associationID() == assocId) {
                    associationMap.remove(a);
                    break;
                }
            }
            if (addresses != null) {
                for (SocketAddress addr : addresses)
                    addressMap.remove(addr);
            } else {
                /* We cannot determine the connected addresses */
                Set<java.util.Map.Entry<SocketAddress, Association>> addrAssocs =
                        addressMap.entrySet();
                Iterator<Entry<SocketAddress, Association>> iterator = addrAssocs.iterator();
                while (iterator.hasNext()) {
                    Entry<SocketAddress, Association> entry = iterator.next();
                    if (entry.getValue().equals(association)) {
                        iterator.remove();
                    }
                }
            }
        }
    }

    /**
     * @throws  IllegalArgumentException
     *          If the given association is not controlled by this channel
     *
     * @return  {@code true} if, and only if, the given association is one
     *          of the current associations controlled by this channel
     */
    private boolean checkAssociation(Association messageAssoc) {
        synchronized (stateLock) {
            for (Association association : associationMap.keySet()) {
                if (messageAssoc.equals(association)) {
                    return true;
                }
            }
        }
        throw new IllegalArgumentException(
              "Given Association is not controlled by this channel");
    }

    private void checkStreamNumber(Association assoc, int streamNumber) {
        synchronized (stateLock) {
            if (streamNumber < 0 || streamNumber >= assoc.maxOutboundStreams())
                throw new InvalidStreamException();
        }
    }

    /* TODO: Add support for ttl and isComplete to both 121 12M
     *       SCTP_EOR not yet supported on reference platforms
     *       TTL support limited...
     */
    @Override
    public int send(ByteBuffer buffer, MessageInfo messageInfo)
            throws IOException {
        if (buffer == null)
            throw new IllegalArgumentException("buffer cannot be null");

        if (messageInfo == null)
            throw new IllegalArgumentException("messageInfo cannot be null");

        synchronized (sendLock) {
            ensureOpen();

            if (!isBound())
                bind(null, 0);

            int n = 0;
            try {
                int assocId = -1;
                SocketAddress address = null;
                begin();

                synchronized (stateLock) {
                    if(!isOpen())
                        return 0;
                    senderThread = NativeThread.current();

                    /* Determine what address or association to send to */
                    Association assoc = messageInfo.association();
                    InetSocketAddress addr = (InetSocketAddress)messageInfo.address();
                    if (assoc != null) {
                        checkAssociation(assoc);
                        checkStreamNumber(assoc, messageInfo.streamNumber());
                        assocId = assoc.associationID();
                        /* have we also got a preferred address */
                        if (addr != null) {
                            if (!assoc.equals(addressMap.get(addr)))
                                throw new IllegalArgumentException("given preferred address is not part of this association");
                            address = addr;
                        }
                    } else if (addr != null) {
                        address = addr;
                        Association association = addressMap.get(addr);
                        if (association != null) {
                            checkStreamNumber(association, messageInfo.streamNumber());
                            assocId = association.associationID();

                        } else { /* must be new association */
                            SecurityManager sm = System.getSecurityManager();
                            if (sm != null)
                                sm.checkConnect(addr.getAddress().getHostAddress(),
                                                addr.getPort());
                        }
                    } else {
                        throw new AssertionError(
                            "Both association and address cannot be null");
                    }
                }

                do {
                    n = send(fdVal, buffer, assocId, address, messageInfo);
                } while ((n == IOStatus.INTERRUPTED) && isOpen());

                return IOStatus.normalize(n);
            } finally {
                senderCleanup();
                end((n > 0) || (n == IOStatus.UNAVAILABLE));
                assert IOStatus.check(n);
            }
        }
    }

    private int send(int fd,
                     ByteBuffer src,
                     int assocId,
                     SocketAddress target,
                     MessageInfo messageInfo)
            throws IOException {
        int streamNumber = messageInfo.streamNumber();
        boolean unordered = messageInfo.isUnordered();
        int ppid = messageInfo.payloadProtocolID();

        if (src instanceof DirectBuffer)
            return sendFromNativeBuffer(fd, src, target, assocId,
                    streamNumber, unordered, ppid);

        /* Substitute a native buffer */
        int pos = src.position();
        int lim = src.limit();
        assert (pos <= lim && streamNumber >= 0);

        int rem = (pos <= lim ? lim - pos : 0);
        ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
        try {
            bb.put(src);
            bb.flip();
            /* Do not update src until we see how many bytes were written */
            src.position(pos);

            int n = sendFromNativeBuffer(fd, bb, target, assocId,
                    streamNumber, unordered, ppid);
            if (n > 0) {
                /* now update src */
                src.position(pos + n);
            }
            return n;
        } finally {
            Util.releaseTemporaryDirectBuffer(bb);
        }
    }

    private int sendFromNativeBuffer(int fd,
                                     ByteBuffer bb,
                                     SocketAddress target,
                                     int assocId,
                                     int streamNumber,
                                     boolean unordered,
                                     int ppid)
            throws IOException {

/**代码未完, 请加载全部代码(NowJava.com).**/
展开阅读全文

关注时代Java

关注时代Java