From ddb96f61ff222f89cf62116c8d59482021be3eab Mon Sep 17 00:00:00 2001 From: fhanik Date: Tue, 19 Feb 2008 00:57:54 +0000 Subject: [PATCH] more UDP code git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@628940 13f79535-47bb-0310-9956-ffa450edef68 --- java/org/apache/catalina/tribes/Member.java | 38 +++++++------ .../catalina/tribes/transport/AbstractSender.java | 4 +- .../catalina/tribes/transport/ReceiverBase.java | 33 +++++++++++ .../catalina/tribes/transport/nio/NioReceiver.java | 12 ++-- .../tribes/transport/nio/NioReplicationTask.java | 61 ++++++++++++++------- .../catalina/tribes/transport/nio/NioSender.java | 50 ++++++++++++----- .../tribes/transport/nio/ParallelNioSender.java | 64 +++++++++++----------- 7 files changed, 176 insertions(+), 86 deletions(-) diff --git a/java/org/apache/catalina/tribes/Member.java b/java/org/apache/catalina/tribes/Member.java index 149a546a6..278b246f2 100644 --- a/java/org/apache/catalina/tribes/Member.java +++ b/java/org/apache/catalina/tribes/Member.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,7 +24,7 @@ package org.apache.catalina.tribes; * The host is what interface the member is listening to, to receive data
* The port is what port the member is listening to, to receive data
* The uniqueId defines the session id for the member. This is an important feature - * since a member that has crashed and the starts up again on the same port/host is + * since a member that has crashed and the starts up again on the same port/host is * not guaranteed to be the same member, so no state transfers will ever be confused * @author Filip Hanik * @version $Revision$, $Date$ @@ -32,18 +32,18 @@ package org.apache.catalina.tribes; public interface Member { - + /** * When a member leaves the cluster, the payload of the memberDisappeared member * will be the following bytes. This indicates a soft shutdown, and not a crash */ public static final byte[] SHUTDOWN_PAYLOAD = new byte[] {66, 65, 66, 89, 45, 65, 76, 69, 88}; - + /** * Returns the name of this node, should be unique within the group. */ public String getName(); - + /** * Returns the listen host for the ChannelReceiver implementation * @return IPv4 or IPv6 representation of the host address this member listens to incoming data @@ -57,7 +57,7 @@ public interface Member { * @see ChannelReceiver */ public int getPort(); - + /** * Returns the secure listen port for the ChannelReceiver implementation. * Returns -1 if its not listening to a secure port. @@ -65,7 +65,13 @@ public interface Member { * @see ChannelReceiver */ public int getSecurePort(); - + + /** + * Returns the UDP port that this member is listening to for UDP messages. + * @return the listen UDP port for this member, -1 if its not listening on a UDP port + */ + public int getUdpPort(); + /** * Contains information on how long this member has been online. @@ -74,7 +80,7 @@ public interface Member { * @return nr of milliseconds since this member started. */ public long getMemberAliveTime(); - + /** * The current state of the member * @return boolean - true if the member is functioning correctly @@ -85,32 +91,32 @@ public interface Member { * @return boolean - true if the member is suspect, but the crash has not been confirmed */ public boolean isSuspect(); - + /** - * - * @return boolean - true if the member has been confirmed to malfunction + * + * @return boolean - true if the member has been confirmed to malfunction */ public boolean isFailing(); - + /** * returns a UUID unique for this member over all sessions. * If the member crashes and restarts, the uniqueId will be different. * @return byte[] */ public byte[] getUniqueId(); - + /** * returns the payload associated with this member * @return byte[] */ public byte[] getPayload(); - + /** * returns the command associated with this member * @return byte[] */ public byte[] getCommand(); - + /** * Domain for this cluster * @return byte[] diff --git a/java/org/apache/catalina/tribes/transport/AbstractSender.java b/java/org/apache/catalina/tribes/transport/AbstractSender.java index c7193e181..aa3a2d059 100644 --- a/java/org/apache/catalina/tribes/transport/AbstractSender.java +++ b/java/org/apache/catalina/tribes/transport/AbstractSender.java @@ -116,7 +116,8 @@ public abstract class AbstractSender implements DataSender { */ public boolean keepalive() { boolean disconnect = false; - if ( keepAliveCount >= 0 && requestCount>keepAliveCount ) disconnect = true; + if (isUdpBased()) disconnect = true; //always disconnect UDP, TODO optimize the keepalive handling + else if ( keepAliveCount >= 0 && requestCount>keepAliveCount ) disconnect = true; else if ( keepAliveTime >= 0 && (System.currentTimeMillis()-connectTime)>keepAliveTime ) disconnect = true; if ( disconnect ) disconnect(); return disconnect; @@ -299,6 +300,7 @@ public abstract class AbstractSender implements DataSender { this.destination = destination; this.address = InetAddress.getByAddress(destination.getHost()); this.port = destination.getPort(); + this.udpPort = destination.getUdpPort(); } diff --git a/java/org/apache/catalina/tribes/transport/ReceiverBase.java b/java/org/apache/catalina/tribes/transport/ReceiverBase.java index 26ed35c50..3bc54a26d 100644 --- a/java/org/apache/catalina/tribes/transport/ReceiverBase.java +++ b/java/org/apache/catalina/tribes/transport/ReceiverBase.java @@ -17,6 +17,7 @@ package org.apache.catalina.tribes.transport; import java.io.IOException; +import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -221,6 +222,38 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R return retries; } + /** + * Same as bind() except it does it for the UDP port + * @param socket + * @param portstart + * @param retries + * @return + * @throws IOException + */ + protected int bindUdp(DatagramSocket socket, int portstart, int retries) throws IOException { + InetSocketAddress addr = null; + while ( retries > 0 ) { + try { + addr = new InetSocketAddress(getBind(), portstart); + socket.bind(addr); + setUdpPort(portstart); + log.info("UDP Receiver Server Socket bound to:"+addr); + return 0; + }catch ( IOException x) { + retries--; + if ( retries <= 0 ) { + log.info("Unable to bind UDP socket to:"+addr+" throwing error."); + throw x; + } + portstart++; + try {Thread.sleep(25);}catch( InterruptedException ti){Thread.currentThread().interrupted();} + retries = bindUdp(socket,portstart,retries); + } + } + return retries; + } + + public void messageDataReceived(ChannelMessage data) { if ( this.listener != null ) { if ( listener.accept(data) ) listener.messageReceived(data); diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java index 6db55d7bd..b9ef3ba14 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java @@ -144,10 +144,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv datagramChannel = DatagramChannel.open(); datagramChannel.configureBlocking(false); //bind to the address to avoid security checks - InetSocketAddress daddr = new InetSocketAddress(getBind(),getUdpPort()); - //TODO should we auto increment the UDP port to avoid collisions? - //we could auto increment with the offset from the tcp listen port - datagramChannel.connect(daddr); + bindUdp(datagramChannel.socket(),getUdpPort(),getAutoBind()); } @@ -188,7 +185,10 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv } key.cancel(); key.attach(null); - try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); } + if (key.channel() instanceof SocketChannel) + try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); } + if (key.channel() instanceof DatagramChannel) + try { ((DatagramChannel)key.channel()).socket().close(); } catch (Exception e) { if (log.isDebugEnabled()) log.debug("", e); } try { key.channel().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); } } @@ -249,7 +249,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv setListen(true); if (selector!=null && datagramChannel!=null) { - ObjectReader oreader = new ObjectReader(1024*65); + ObjectReader oreader = new ObjectReader(65535); //max size for a datagram packet registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader); } diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java index f61fa9d8f..4fea62b0d 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java @@ -17,6 +17,7 @@ package org.apache.catalina.tribes.transport.nio; import java.io.IOException; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.ReadableByteChannel; @@ -147,19 +148,33 @@ public class NioReplicationTask extends AbstractRxTask { reader.setLastAccess(System.currentTimeMillis()); reader.access(); ReadableByteChannel channel = (ReadableByteChannel) key.channel(); - int count; + int count=-1; buffer.clear(); // make buffer empty - - // loop while data available, channel is non-blocking - while ((count = channel.read (buffer)) > 0) { - buffer.flip(); // make buffer readable + SocketAddress saddr = null; + + if (channel instanceof SocketChannel) { + // loop while data available, channel is non-blocking + while ((count = channel.read (buffer)) > 0) { + buffer.flip(); // make buffer readable + if ( buffer.hasArray() ) + reader.append(buffer.array(),0,count,false); + else + reader.append(buffer,count,false); + buffer.clear(); // make buffer empty + //do we have at least one package? + if ( reader.hasPackage() ) break; + } + } else if (channel instanceof DatagramChannel) { + DatagramChannel dchannel = (DatagramChannel)channel; + saddr = dchannel.receive(buffer); + buffer.flip(); // make buffer readable if ( buffer.hasArray() ) - reader.append(buffer.array(),0,count,false); + reader.append(buffer.array(),0,buffer.limit()-buffer.position(),false); else - reader.append(buffer,count,false); - buffer.clear(); // make buffer empty - //do we have at least one package? - if ( reader.hasPackage() ) break; + reader.append(buffer,buffer.limit()-buffer.position(),false); + buffer.clear(); // make buffer empty + //did we get a package + count = reader.hasPackage()?1:-1; } int pkgcnt = reader.count(); @@ -180,7 +195,7 @@ public class NioReplicationTask extends AbstractRxTask { * server before completing the request * This is considered an asynchronized request */ - if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND); + if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND,saddr); try { if ( Logs.MESSAGES.isTraceEnabled() ) { try { @@ -194,13 +209,13 @@ public class NioReplicationTask extends AbstractRxTask { * server before sending the ack to the remote server * This is considered a synchronized request */ - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND,saddr); }catch ( RemoteProcessException e ) { if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e); - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND,saddr); }catch ( Exception e ) { log.error("Processing of cluster message failed.",e); - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND,saddr); } if ( getUseBufferPool() ) { BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); @@ -275,17 +290,25 @@ public class NioReplicationTask extends AbstractRxTask { /** - * send a reply-acknowledgement (6,2,3) + * send a reply-acknowledgement (6,2,3), sends it doing a busy write, the ACK is so small + * that it should always go to the buffer * @param key * @param channel */ - protected void sendAck(SelectionKey key, WritableByteChannel channel, byte[] command) { - + protected void sendAck(SelectionKey key, WritableByteChannel channel, byte[] command, SocketAddress udpaddr) { try { + ByteBuffer buf = ByteBuffer.wrap(command); int total = 0; - while ( total < command.length ) { - total += channel.write(buf); + if (channel instanceof DatagramChannel) { + DatagramChannel dchannel = (DatagramChannel)channel; + while ( total < command.length ) { + total += dchannel.send(buf, udpaddr); + } + } else { + while ( total < command.length ) { + total += channel.write(buf); + } } if (log.isTraceEnabled()) { log.trace("ACK sent to " + diff --git a/java/org/apache/catalina/tribes/transport/nio/NioSender.java b/java/org/apache/catalina/tribes/transport/nio/NioSender.java index 9f5c010d3..fe21cbf45 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioSender.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioSender.java @@ -154,7 +154,7 @@ public class NioSender extends AbstractSender implements DataSender{ protected boolean read(SelectionKey key) throws IOException { //if there is no message here, we are done if ( current == null ) return true; - int read = socketChannel.read(readbuf); + int read = isUdpBased()?dataChannel.read(readbuf) : socketChannel.read(readbuf); //end of stream if ( read == -1 ) throw new IOException("Unable to receive an ack message. EOF on socket channel has been reached."); //no data read @@ -175,14 +175,14 @@ public class NioSender extends AbstractSender implements DataSender{ protected boolean write(SelectionKey key) throws IOException { - if ( (!isConnected()) || (this.socketChannel==null)) { + if ( (!isConnected()) || (this.socketChannel==null && this.dataChannel==null)) { throw new IOException("NioSender is not connected, this should not occur."); } if ( current != null ) { if ( remaining > 0 ) { //weve written everything, or we are starting a new package //protect against buffer overwrite - int byteswritten = socketChannel.write(writebuf); + int byteswritten = isUdpBased()?dataChannel.write(writebuf) : socketChannel.write(writebuf); if (byteswritten == -1 ) throw new EOFException(); remaining -= byteswritten; //if the entire message was written from the buffer @@ -204,7 +204,7 @@ public class NioSender extends AbstractSender implements DataSender{ * @todo Implement this org.apache.catalina.tribes.transport.IDataSender method */ public synchronized void connect() throws IOException { - if ( connecting ) return; + if ( connecting || isConnected()) return; connecting = true; if ( isConnected() ) throw new IOException("NioSender is already in connected state."); if ( readbuf == null ) { @@ -218,15 +218,23 @@ public class NioSender extends AbstractSender implements DataSender{ writebuf.clear(); } - InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort()); - if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress."); - socketChannel = SocketChannel.open(); - socketChannel.configureBlocking(false); - if ( socketChannel.connect(addr) ) { - completeConnect(); - socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); + if (isUdpBased()) { + InetSocketAddress daddr = new InetSocketAddress(getAddress(),getUdpPort()); + if ( dataChannel != null ) throw new IOException("Datagram channel has already been established. Connection might be in progress."); + dataChannel = DatagramChannel.open(); + dataChannel.configureBlocking(false); + dataChannel.connect(daddr); } else { - socketChannel.register(getSelector(), SelectionKey.OP_CONNECT, this); + InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort()); + if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress."); + socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + if ( socketChannel.connect(addr) ) { + completeConnect(); + socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); + } else { + socketChannel.register(getSelector(), SelectionKey.OP_CONNECT, this); + } } } @@ -252,6 +260,18 @@ public class NioSender extends AbstractSender implements DataSender{ socketChannel = null; } } + if ( dataChannel != null ) { + try { + try {dataChannel.socket().close();}catch ( Exception x){} + //error free close, all the way + //try {socket.shutdownOutput();}catch ( Exception x){} + //try {socket.shutdownInput();}catch ( Exception x){} + //try {socket.close();}catch ( Exception x){} + try {dataChannel.close();}catch ( Exception x){} + }finally { + dataChannel = null; + } + } } catch ( Exception x ) { log.error("Unable to disconnect NioSender. msg="+x.getMessage()); if ( log.isDebugEnabled() ) log.debug("Unable to disconnect NioSender. msg="+x.getMessage(),x); @@ -273,6 +293,7 @@ public class NioSender extends AbstractSender implements DataSender{ setAttempt(0); setRequestCount(0); setConnectTime(-1); + setUdpBased(false); } private ByteBuffer getReadBuffer() { @@ -312,7 +333,10 @@ public class NioSender extends AbstractSender implements DataSender{ //writebuf.limit(length); writebuf.flip(); if (isConnected()) { - socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); + if (isUdpBased()) + dataChannel.register(getSelector(), SelectionKey.OP_WRITE, this); + else + socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); } } } diff --git a/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java b/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java index 7eeea1e70..b8c3de56b 100644 --- a/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java +++ b/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -48,7 +48,7 @@ import org.apache.catalina.tribes.UniqueId; * @version 1.0 */ public class ParallelNioSender extends AbstractSender implements MultiPointSender { - + protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ParallelNioSender.class); protected long selectTimeout = 5000; //default 5 seconds, same as send timeout protected Selector selector; @@ -58,15 +58,16 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende selector = Selector.open(); setConnected(true); } - - + + public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException { long start = System.currentTimeMillis(); + this.setUdpBased((msg.getOptions()&Channel.SEND_OPTIONS_UDP) == Channel.SEND_OPTIONS_UDP); byte[] data = XByteBuffer.createDataPackage((ChannelData)msg); NioSender[] senders = setupForSend(destination); connect(senders); setData(senders,data); - + int remaining = senders.length; ChannelException cx = null; try { @@ -108,17 +109,17 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende if ( x instanceof ChannelException ) throw (ChannelException)x; else throw new ChannelException(x); } - + } - + private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg) throws IOException, ChannelException { int completed = 0; int selectedKeys = selector.select(selectTimeOut); - + if (selectedKeys == 0) { return 0; } - + Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey sk = (SelectionKey) it.next(); @@ -140,16 +141,16 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende int attempt = sender.getAttempt()+1; boolean retry = (sender.getAttempt() <= maxAttempts && maxAttempts>0); synchronized (state) { - + //sk.cancel(); if (state.isSuspect()) state.setFailing(); if (state.isReady()) { state.setSuspect(); - if ( retry ) + if ( retry ) log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect and retrying."); - else + else log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect.", x); - } + } } if ( !isConnected() ) { log.warn("Not retrying send for:" + sender.getDestination().getName() + "; Sender is disconnected."); @@ -157,11 +158,11 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende cx.addFaultyMember(sender.getDestination(),x); throw cx; } - + byte[] data = sender.getMessage(); if ( retry ) { - try { - sender.disconnect(); + try { + sender.disconnect(); sender.connect(); sender.setAttempt(attempt); sender.setMessage(data); @@ -178,12 +179,12 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende return completed; } - + private void connect(NioSender[] senders) throws ChannelException { ChannelException x = null; for (int i=0; i