Fix properties.
authormarkt <markt@13f79535-47bb-0310-9956-ffa450edef68>
Mon, 23 Oct 2006 23:12:17 +0000 (23:12 +0000)
committermarkt <markt@13f79535-47bb-0310-9956-ffa450edef68>
Mon, 23 Oct 2006 23:12:17 +0000 (23:12 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@467173 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/tribes/io/ChannelData.java
java/org/apache/catalina/tribes/io/DirectByteArrayOutputStream.java
java/org/apache/catalina/tribes/io/ListenCallback.java
java/org/apache/catalina/tribes/io/ObjectReader.java
java/org/apache/catalina/tribes/io/ReplicationStream.java
java/org/apache/catalina/tribes/io/XByteBuffer.java

index ba63b87..eff5d50 100644 (file)
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements.  See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License.  You may obtain a copy of the License at\r
- * \r
- *      http://www.apache.org/licenses/LICENSE-2.0\r
- * \r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-package org.apache.catalina.tribes.io;\r
-\r
-import java.util.Arrays;\r
-\r
-import org.apache.catalina.tribes.ChannelMessage;\r
-import org.apache.catalina.tribes.Member;\r
-import org.apache.catalina.tribes.membership.MemberImpl;\r
-import org.apache.catalina.tribes.util.UUIDGenerator;\r
-import org.apache.catalina.tribes.Channel;\r
-import java.sql.Timestamp;\r
-\r
-/**\r
- * The <code>ChannelData</code> object is used to transfer a message through the \r
- * channel interceptor stack and eventually out on a transport to be sent \r
- * to another node. While the message is being processed by the different \r
- * interceptors, the message data can be manipulated as each interceptor seems appropriate.\r
- * @author Peter Rossbach\r
- * @author Filip Hanik\r
- * @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $\r
- * \r
- */\r
-public class ChannelData implements ChannelMessage {\r
-    public static ChannelData[] EMPTY_DATA_ARRAY = new ChannelData[0];\r
-    \r
-    public static boolean USE_SECURE_RANDOM_FOR_UUID = false;\r
-    \r
-    /**\r
-     * The options this message was sent with\r
-     */\r
-    private int options = 0 ;\r
-    /**\r
-     * The message data, stored in a dynamic buffer\r
-     */\r
-    private XByteBuffer message ;\r
-    /**\r
-     * The timestamp that goes with this message\r
-     */\r
-    private long timestamp ;\r
-    /**\r
-     * A unique message id\r
-     */\r
-    private byte[] uniqueId ;\r
-    /**\r
-     * The source or reply-to address for this message\r
-     */\r
-    private Member address;\r
-\r
-    /**\r
-     * Creates an empty channel data with a new unique Id\r
-     * @see #ChannelData(boolean)\r
-     */\r
-    public ChannelData() {\r
-        this(true);\r
-    }\r
-    \r
-    /**\r
-     * Create an empty channel data object\r
-     * @param generateUUID boolean - if true, a unique Id will be generated\r
-     */\r
-    public ChannelData(boolean generateUUID) {\r
-        if ( generateUUID ) generateUUID();\r
-    }\r
-    \r
-    \r
-    \r
-    /**\r
-     * Creates a new channel data object with data\r
-     * @param uniqueId - unique message id\r
-     * @param message - message data\r
-     * @param timestamp - message timestamp\r
-     */\r
-    public ChannelData(byte[] uniqueId, XByteBuffer message, long timestamp) {\r
-        this.uniqueId = uniqueId;\r
-        this.message = message;\r
-        this.timestamp = timestamp;\r
-    }\r
-    \r
-    /**\r
-     * @return Returns the message byte buffer\r
-     */\r
-    public XByteBuffer getMessage() {\r
-        return message;\r
-    }\r
-    /**\r
-     * @param message The message to send.\r
-     */\r
-    public void setMessage(XByteBuffer message) {\r
-        this.message = message;\r
-    }\r
-    /**\r
-     * @return Returns the timestamp.\r
-     */\r
-    public long getTimestamp() {\r
-        return timestamp;\r
-    }\r
-    /**\r
-     * @param timestamp The timestamp to send\r
-     */\r
-    public void setTimestamp(long timestamp) {\r
-        this.timestamp = timestamp;\r
-    }\r
-    /**\r
-     * @return Returns the uniqueId.\r
-     */\r
-    public byte[] getUniqueId() {\r
-        return uniqueId;\r
-    }\r
-    /**\r
-     * @param uniqueId The uniqueId to send.\r
-     */\r
-    public void setUniqueId(byte[] uniqueId) {\r
-        this.uniqueId = uniqueId;\r
-    }\r
-    /**\r
-     * @return returns the message options \r
-     * see org.apache.catalina.tribes.Channel#sendMessage(org.apache.catalina.tribes.Member[], java.io.Serializable, int)\r
-     *                                                 \r
-     */\r
-    public int getOptions() {\r
-        return options;\r
-    }\r
-    /**\r
-     * @param sets the message options\r
-     */\r
-    public void setOptions(int options) {\r
-        this.options = options;\r
-    }\r
-    \r
-    /**\r
-     * Returns the source or reply-to address\r
-     * @return Member\r
-     */\r
-    public Member getAddress() {\r
-        return address;\r
-    }\r
-\r
-    /**\r
-     * Sets the source or reply-to address\r
-     * @param address Member\r
-     */\r
-    public void setAddress(Member address) {\r
-        this.address = address;\r
-    }\r
-    \r
-    /**\r
-     * Generates a UUID and invokes setUniqueId\r
-     */\r
-    public void generateUUID() {\r
-        byte[] data = new byte[16];\r
-        UUIDGenerator.randomUUID(USE_SECURE_RANDOM_FOR_UUID,data,0);\r
-        setUniqueId(data);\r
-    }\r
-\r
-    public int getDataPackageLength() {\r
-        int length = \r
-            4 + //options\r
-            8 + //timestamp  off=4\r
-            4 + //unique id length off=12\r
-            uniqueId.length+ //id data off=12+uniqueId.length\r
-            4 + //addr length off=12+uniqueId.length+4\r
-            ((MemberImpl)address).getDataLength()+ //member data off=12+uniqueId.length+4+add.length\r
-            4 + //message length off=12+uniqueId.length+4+add.length+4\r
-            message.getLength();\r
-        return length;\r
-\r
-    }\r
-    \r
-    /**\r
-     * Serializes the ChannelData object into a byte[] array\r
-     * @return byte[]\r
-     */\r
-    public byte[] getDataPackage()  {\r
-        int length = getDataPackageLength();\r
-        byte[] data = new byte[length];\r
-        int offset = 0;\r
-        return getDataPackage(data,offset);\r
-    }\r
-\r
-    public byte[] getDataPackage(byte[] data, int offset)  {\r
-        byte[] addr = ((MemberImpl)address).getData(false);\r
-        XByteBuffer.toBytes(options,data,offset);\r
-        offset += 4; //options\r
-        XByteBuffer.toBytes(timestamp,data,offset);\r
-        offset += 8; //timestamp\r
-        XByteBuffer.toBytes(uniqueId.length,data,offset);\r
-        offset += 4; //uniqueId.length\r
-        System.arraycopy(uniqueId,0,data,offset,uniqueId.length);\r
-        offset += uniqueId.length; //uniqueId data\r
-        XByteBuffer.toBytes(addr.length,data,offset);\r
-        offset += 4; //addr.length\r
-        System.arraycopy(addr,0,data,offset,addr.length);\r
-        offset += addr.length; //addr data\r
-        XByteBuffer.toBytes(message.getLength(),data,offset);\r
-        offset += 4; //message.length\r
-        System.arraycopy(message.getBytesDirect(),0,data,offset,message.getLength());\r
-        offset += message.getLength(); //message data\r
-        return data;\r
-    }\r
-    \r
-    /**\r
-     * Deserializes a ChannelData object from a byte array\r
-     * @param b byte[]\r
-     * @return ChannelData\r
-     */\r
-    public static ChannelData getDataFromPackage(XByteBuffer xbuf)  {\r
-        ChannelData data = new ChannelData(false);\r
-        int offset = 0;\r
-        data.setOptions(XByteBuffer.toInt(xbuf.getBytesDirect(),offset));\r
-        offset += 4; //options\r
-        data.setTimestamp(XByteBuffer.toLong(xbuf.getBytesDirect(),offset));\r
-        offset += 8; //timestamp\r
-        data.uniqueId = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];\r
-        offset += 4; //uniqueId length\r
-        System.arraycopy(xbuf.getBytesDirect(),offset,data.uniqueId,0,data.uniqueId.length);\r
-        offset += data.uniqueId.length; //uniqueId data\r
-        byte[] addr = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];\r
-        offset += 4; //addr length\r
-        System.arraycopy(xbuf.getBytesDirect(),offset,addr,0,addr.length);\r
-        data.setAddress(MemberImpl.getMember(addr));\r
-        offset += addr.length; //addr data\r
-        int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(),offset);\r
-        offset += 4; //xsize length\r
-        System.arraycopy(xbuf.getBytesDirect(),offset,xbuf.getBytesDirect(),0,xsize);\r
-        xbuf.setLength(xsize);\r
-        data.message = xbuf;\r
-        return data;\r
-\r
-    }\r
-\r
-    public static ChannelData getDataFromPackage(byte[] b)  {\r
-        ChannelData data = new ChannelData(false);\r
-        int offset = 0;\r
-        data.setOptions(XByteBuffer.toInt(b,offset));\r
-        offset += 4; //options\r
-        data.setTimestamp(XByteBuffer.toLong(b,offset));\r
-        offset += 8; //timestamp\r
-        data.uniqueId = new byte[XByteBuffer.toInt(b,offset)];\r
-        offset += 4; //uniqueId length\r
-        System.arraycopy(b,offset,data.uniqueId,0,data.uniqueId.length);\r
-        offset += data.uniqueId.length; //uniqueId data\r
-        byte[] addr = new byte[XByteBuffer.toInt(b,offset)];\r
-        offset += 4; //addr length\r
-        System.arraycopy(b,offset,addr,0,addr.length);\r
-        data.setAddress(MemberImpl.getMember(addr));\r
-        offset += addr.length; //addr data\r
-        int xsize = XByteBuffer.toInt(b,offset);\r
-        //data.message = new XByteBuffer(new byte[xsize],false);\r
-        data.message = BufferPool.getBufferPool().getBuffer(xsize,false);\r
-        offset += 4; //message length\r
-        System.arraycopy(b,offset,data.message.getBytesDirect(),0,xsize);\r
-        data.message.append(b,offset,xsize);\r
-        offset += xsize; //message data\r
-        return data;\r
-    }\r
-    \r
-    public int hashCode() {\r
-        return XByteBuffer.toInt(getUniqueId(),0);\r
-    }\r
-    \r
-    /**\r
-     * Compares to ChannelData objects, only compares on getUniqueId().equals(o.getUniqueId())\r
-     * @param o Object\r
-     * @return boolean\r
-     */\r
-    public boolean equals(Object o) {\r
-        if ( o instanceof ChannelData ) {\r
-            return Arrays.equals(getUniqueId(),((ChannelData)o).getUniqueId());\r
-        } else return false;\r
-    }\r
-    \r
-    /**\r
-     * Create a shallow clone, only the data gets recreated\r
-     * @return ClusterData\r
-     */\r
-    public Object clone() {\r
-//        byte[] d = this.getDataPackage();\r
-//        return ClusterData.getDataFromPackage(d);\r
-        ChannelData clone = new ChannelData(false);\r
-        clone.options = this.options;\r
-        clone.message = new XByteBuffer(this.message.getBytesDirect(),false);\r
-        clone.timestamp = this.timestamp;\r
-        clone.uniqueId = this.uniqueId;\r
-        clone.address = this.address;\r
-        return clone;\r
-    }\r
-    \r
-    /**\r
-     * Complete clone\r
-     * @return ClusterData\r
-     */\r
-    public Object deepclone() {\r
-        byte[] d = this.getDataPackage();\r
-        return ChannelData.getDataFromPackage(d);\r
-    }\r
-    \r
-    /**\r
-     * Utility method, returns true if the options flag indicates that an ack\r
-     * is to be sent after the message has been received and processed\r
-     * @param options int - the options for the message\r
-     * @return boolean \r
-     * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_USE_ACK\r
-     * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_SYNCHRONIZED_ACK\r
-     */\r
-    public static boolean sendAckSync(int options) {\r
-        return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) &&\r
-            ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) == Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);\r
-    }\r
-\r
-\r
-    /**\r
-     * Utility method, returns true if the options flag indicates that an ack\r
-     * is to be sent after the message has been received but not yet processed\r
-     * @param options int - the options for the message\r
-     * @return boolean \r
-     * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_USE_ACK\r
-     * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_SYNCHRONIZED_ACK\r
-     */\r
-    public static boolean sendAckAsync(int options) {\r
-        return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) &&\r
-            ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) != Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);\r
-    }\r
-    \r
-    public String toString() {\r
-        StringBuffer buf = new StringBuffer();\r
-        buf.append("ClusterData[src=");\r
-        buf.append(getAddress()).append("; id=");\r
-        buf.append(bToS(getUniqueId())).append("; sent=");\r
-        buf.append(new Timestamp(this.getTimestamp()).toString()).append("]");\r
-        return buf.toString();\r
-    }\r
-    \r
-    public static String bToS(byte[] data) {\r
-        StringBuffer buf = new StringBuffer(4*16);\r
-        buf.append("{");\r
-        for (int i=0; data!=null && i<data.length; i++ ) buf.append(String.valueOf(data[i])).append(" ");\r
-        buf.append("}");\r
-        return buf.toString();\r
-    }\r
-\r
-    \r
-}\r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import java.util.Arrays;
+
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+import org.apache.catalina.tribes.Channel;
+import java.sql.Timestamp;
+
+/**
+ * The <code>ChannelData</code> object is used to transfer a message through the 
+ * channel interceptor stack and eventually out on a transport to be sent 
+ * to another node. While the message is being processed by the different 
+ * interceptors, the message data can be manipulated as each interceptor seems appropriate.
+ * @author Peter Rossbach
+ * @author Filip Hanik
+ * @version $Revision$ $Date$
+ * 
+ */
+public class ChannelData implements ChannelMessage {
+    public static ChannelData[] EMPTY_DATA_ARRAY = new ChannelData[0];
+    
+    public static boolean USE_SECURE_RANDOM_FOR_UUID = false;
+    
+    /**
+     * The options this message was sent with
+     */
+    private int options = 0 ;
+    /**
+     * The message data, stored in a dynamic buffer
+     */
+    private XByteBuffer message ;
+    /**
+     * The timestamp that goes with this message
+     */
+    private long timestamp ;
+    /**
+     * A unique message id
+     */
+    private byte[] uniqueId ;
+    /**
+     * The source or reply-to address for this message
+     */
+    private Member address;
+
+    /**
+     * Creates an empty channel data with a new unique Id
+     * @see #ChannelData(boolean)
+     */
+    public ChannelData() {
+        this(true);
+    }
+    
+    /**
+     * Create an empty channel data object
+     * @param generateUUID boolean - if true, a unique Id will be generated
+     */
+    public ChannelData(boolean generateUUID) {
+        if ( generateUUID ) generateUUID();
+    }
+    
+    
+    
+    /**
+     * Creates a new channel data object with data
+     * @param uniqueId - unique message id
+     * @param message - message data
+     * @param timestamp - message timestamp
+     */
+    public ChannelData(byte[] uniqueId, XByteBuffer message, long timestamp) {
+        this.uniqueId = uniqueId;
+        this.message = message;
+        this.timestamp = timestamp;
+    }
+    
+    /**
+     * @return Returns the message byte buffer
+     */
+    public XByteBuffer getMessage() {
+        return message;
+    }
+    /**
+     * @param message The message to send.
+     */
+    public void setMessage(XByteBuffer message) {
+        this.message = message;
+    }
+    /**
+     * @return Returns the timestamp.
+     */
+    public long getTimestamp() {
+        return timestamp;
+    }
+    /**
+     * @param timestamp The timestamp to send
+     */
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+    /**
+     * @return Returns the uniqueId.
+     */
+    public byte[] getUniqueId() {
+        return uniqueId;
+    }
+    /**
+     * @param uniqueId The uniqueId to send.
+     */
+    public void setUniqueId(byte[] uniqueId) {
+        this.uniqueId = uniqueId;
+    }
+    /**
+     * @return returns the message options 
+     * see org.apache.catalina.tribes.Channel#sendMessage(org.apache.catalina.tribes.Member[], java.io.Serializable, int)
+     *                                                 
+     */
+    public int getOptions() {
+        return options;
+    }
+    /**
+     * @param sets the message options
+     */
+    public void setOptions(int options) {
+        this.options = options;
+    }
+    
+    /**
+     * Returns the source or reply-to address
+     * @return Member
+     */
+    public Member getAddress() {
+        return address;
+    }
+
+    /**
+     * Sets the source or reply-to address
+     * @param address Member
+     */
+    public void setAddress(Member address) {
+        this.address = address;
+    }
+    
+    /**
+     * Generates a UUID and invokes setUniqueId
+     */
+    public void generateUUID() {
+        byte[] data = new byte[16];
+        UUIDGenerator.randomUUID(USE_SECURE_RANDOM_FOR_UUID,data,0);
+        setUniqueId(data);
+    }
+
+    public int getDataPackageLength() {
+        int length = 
+            4 + //options
+            8 + //timestamp  off=4
+            4 + //unique id length off=12
+            uniqueId.length+ //id data off=12+uniqueId.length
+            4 + //addr length off=12+uniqueId.length+4
+            ((MemberImpl)address).getDataLength()+ //member data off=12+uniqueId.length+4+add.length
+            4 + //message length off=12+uniqueId.length+4+add.length+4
+            message.getLength();
+        return length;
+
+    }
+    
+    /**
+     * Serializes the ChannelData object into a byte[] array
+     * @return byte[]
+     */
+    public byte[] getDataPackage()  {
+        int length = getDataPackageLength();
+        byte[] data = new byte[length];
+        int offset = 0;
+        return getDataPackage(data,offset);
+    }
+
+    public byte[] getDataPackage(byte[] data, int offset)  {
+        byte[] addr = ((MemberImpl)address).getData(false);
+        XByteBuffer.toBytes(options,data,offset);
+        offset += 4; //options
+        XByteBuffer.toBytes(timestamp,data,offset);
+        offset += 8; //timestamp
+        XByteBuffer.toBytes(uniqueId.length,data,offset);
+        offset += 4; //uniqueId.length
+        System.arraycopy(uniqueId,0,data,offset,uniqueId.length);
+        offset += uniqueId.length; //uniqueId data
+        XByteBuffer.toBytes(addr.length,data,offset);
+        offset += 4; //addr.length
+        System.arraycopy(addr,0,data,offset,addr.length);
+        offset += addr.length; //addr data
+        XByteBuffer.toBytes(message.getLength(),data,offset);
+        offset += 4; //message.length
+        System.arraycopy(message.getBytesDirect(),0,data,offset,message.getLength());
+        offset += message.getLength(); //message data
+        return data;
+    }
+    
+    /**
+     * Deserializes a ChannelData object from a byte array
+     * @param b byte[]
+     * @return ChannelData
+     */
+    public static ChannelData getDataFromPackage(XByteBuffer xbuf)  {
+        ChannelData data = new ChannelData(false);
+        int offset = 0;
+        data.setOptions(XByteBuffer.toInt(xbuf.getBytesDirect(),offset));
+        offset += 4; //options
+        data.setTimestamp(XByteBuffer.toLong(xbuf.getBytesDirect(),offset));
+        offset += 8; //timestamp
+        data.uniqueId = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];
+        offset += 4; //uniqueId length
+        System.arraycopy(xbuf.getBytesDirect(),offset,data.uniqueId,0,data.uniqueId.length);
+        offset += data.uniqueId.length; //uniqueId data
+        byte[] addr = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];
+        offset += 4; //addr length
+        System.arraycopy(xbuf.getBytesDirect(),offset,addr,0,addr.length);
+        data.setAddress(MemberImpl.getMember(addr));
+        offset += addr.length; //addr data
+        int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(),offset);
+        offset += 4; //xsize length
+        System.arraycopy(xbuf.getBytesDirect(),offset,xbuf.getBytesDirect(),0,xsize);
+        xbuf.setLength(xsize);
+        data.message = xbuf;
+        return data;
+
+    }
+
+    public static ChannelData getDataFromPackage(byte[] b)  {
+        ChannelData data = new ChannelData(false);
+        int offset = 0;
+        data.setOptions(XByteBuffer.toInt(b,offset));
+        offset += 4; //options
+        data.setTimestamp(XByteBuffer.toLong(b,offset));
+        offset += 8; //timestamp
+        data.uniqueId = new byte[XByteBuffer.toInt(b,offset)];
+        offset += 4; //uniqueId length
+        System.arraycopy(b,offset,data.uniqueId,0,data.uniqueId.length);
+        offset += data.uniqueId.length; //uniqueId data
+        byte[] addr = new byte[XByteBuffer.toInt(b,offset)];
+        offset += 4; //addr length
+        System.arraycopy(b,offset,addr,0,addr.length);
+        data.setAddress(MemberImpl.getMember(addr));
+        offset += addr.length; //addr data
+        int xsize = XByteBuffer.toInt(b,offset);
+        //data.message = new XByteBuffer(new byte[xsize],false);
+        data.message = BufferPool.getBufferPool().getBuffer(xsize,false);
+        offset += 4; //message length
+        System.arraycopy(b,offset,data.message.getBytesDirect(),0,xsize);
+        data.message.append(b,offset,xsize);
+        offset += xsize; //message data
+        return data;
+    }
+    
+    public int hashCode() {
+        return XByteBuffer.toInt(getUniqueId(),0);
+    }
+    
+    /**
+     * Compares to ChannelData objects, only compares on getUniqueId().equals(o.getUniqueId())
+     * @param o Object
+     * @return boolean
+     */
+    public boolean equals(Object o) {
+        if ( o instanceof ChannelData ) {
+            return Arrays.equals(getUniqueId(),((ChannelData)o).getUniqueId());
+        } else return false;
+    }
+    
+    /**
+     * Create a shallow clone, only the data gets recreated
+     * @return ClusterData
+     */
+    public Object clone() {
+//        byte[] d = this.getDataPackage();
+//        return ClusterData.getDataFromPackage(d);
+        ChannelData clone = new ChannelData(false);
+        clone.options = this.options;
+        clone.message = new XByteBuffer(this.message.getBytesDirect(),false);
+        clone.timestamp = this.timestamp;
+        clone.uniqueId = this.uniqueId;
+        clone.address = this.address;
+        return clone;
+    }
+    
+    /**
+     * Complete clone
+     * @return ClusterData
+     */
+    public Object deepclone() {
+        byte[] d = this.getDataPackage();
+        return ChannelData.getDataFromPackage(d);
+    }
+    
+    /**
+     * Utility method, returns true if the options flag indicates that an ack
+     * is to be sent after the message has been received and processed
+     * @param options int - the options for the message
+     * @return boolean 
+     * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_USE_ACK
+     * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_SYNCHRONIZED_ACK
+     */
+    public static boolean sendAckSync(int options) {
+        return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) &&
+            ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) == Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+    }
+
+
+    /**
+     * Utility method, returns true if the options flag indicates that an ack
+     * is to be sent after the message has been received but not yet processed
+     * @param options int - the options for the message
+     * @return boolean 
+     * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_USE_ACK
+     * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_SYNCHRONIZED_ACK
+     */
+    public static boolean sendAckAsync(int options) {
+        return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) &&
+            ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) != Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+    }
+    
+    public String toString() {
+        StringBuffer buf = new StringBuffer();
+        buf.append("ClusterData[src=");
+        buf.append(getAddress()).append("; id=");
+        buf.append(bToS(getUniqueId())).append("; sent=");
+        buf.append(new Timestamp(this.getTimestamp()).toString()).append("]");
+        return buf.toString();
+    }
+    
+    public static String bToS(byte[] data) {
+        StringBuffer buf = new StringBuffer(4*16);
+        buf.append("{");
+        for (int i=0; data!=null && i<data.length; i++ ) buf.append(String.valueOf(data[i])).append(" ");
+        buf.append("}");
+        return buf.toString();
+    }
+
+    
+}
index bf3ab37..7f908a5 100644 (file)
@@ -1,63 +1,63 @@
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements.  See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License.  You may obtain a copy of the License at\r
- *\r
- *      http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-\r
-package org.apache.catalina.tribes.io;\r
-\r
-import java.io.IOException;\r
-import java.io.OutputStream;\r
-\r
-/**\r
- * Byte array output stream that exposes the byte array directly\r
- *\r
- * @author not attributable\r
- * @version 1.0\r
- */\r
-public class DirectByteArrayOutputStream extends OutputStream {\r
-    \r
-    private XByteBuffer buffer;\r
-    \r
-    public DirectByteArrayOutputStream(int size) {\r
-        buffer = new XByteBuffer(size,false);\r
-    }\r
-\r
-    /**\r
-     * Writes the specified byte to this output stream.\r
-     *\r
-     * @param b the <code>byte</code>.\r
-     * @throws IOException if an I/O error occurs. In particular, an\r
-     *   <code>IOException</code> may be thrown if the output stream has\r
-     *   been closed.\r
-     * @todo Implement this java.io.OutputStream method\r
-     */\r
-    public void write(int b) throws IOException {\r
-        buffer.append((byte)b);\r
-    }\r
-    \r
-    public int size() {\r
-        return buffer.getLength();\r
-    }\r
-    \r
-    public byte[] getArrayDirect() {\r
-        return buffer.getBytesDirect();\r
-    }\r
-    \r
-    public byte[] getArray() {\r
-        return buffer.getBytes();\r
-    }\r
-\r
-\r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Byte array output stream that exposes the byte array directly
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class DirectByteArrayOutputStream extends OutputStream {
+    
+    private XByteBuffer buffer;
+    
+    public DirectByteArrayOutputStream(int size) {
+        buffer = new XByteBuffer(size,false);
+    }
+
+    /**
+     * Writes the specified byte to this output stream.
+     *
+     * @param b the <code>byte</code>.
+     * @throws IOException if an I/O error occurs. In particular, an
+     *   <code>IOException</code> may be thrown if the output stream has
+     *   been closed.
+     * @todo Implement this java.io.OutputStream method
+     */
+    public void write(int b) throws IOException {
+        buffer.append((byte)b);
+    }
+    
+    public int size() {
+        return buffer.getLength();
+    }
+    
+    public byte[] getArrayDirect() {
+        return buffer.getBytesDirect();
+    }
+    
+    public byte[] getArray() {
+        return buffer.getBytes();
+    }
+
+
 }
\ No newline at end of file
index 3812ec2..183d578 100644 (file)
@@ -1,42 +1,42 @@
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements.  See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License.  You may obtain a copy of the License at\r
- * \r
- *      http://www.apache.org/licenses/LICENSE-2.0\r
- * \r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-\r
-package org.apache.catalina.tribes.io;\r
-\r
-import org.apache.catalina.tribes.ChannelMessage;\r
-\r
-\r
-\r
-/**\r
- * Internal interface, similar to the MessageListener but used \r
- * at the IO base\r
- * The listen callback interface is used by the replication system\r
- * when data has been received. The interface does not care about\r
- * objects and marshalling and just passes the bytes straight through.\r
- * @author Filip Hanik\r
- * @version $Revision: 303987 $, $Date: 2005-07-08 15:50:30 -0500 (Fri, 08 Jul 2005) $\r
- */\r
-public interface ListenCallback\r
-{\r
-    /**\r
-     * This method is invoked on the callback object to notify it that new data has\r
-     * been received from one of the cluster nodes.\r
-     * @param data - the message bytes received from the cluster/replication system\r
-     */\r
-     public void messageDataReceived(ChannelMessage data);\r
-     \r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.io;
+
+import org.apache.catalina.tribes.ChannelMessage;
+
+
+
+/**
+ * Internal interface, similar to the MessageListener but used 
+ * at the IO base
+ * The listen callback interface is used by the replication system
+ * when data has been received. The interface does not care about
+ * objects and marshalling and just passes the bytes straight through.
+ * @author Filip Hanik
+ * @version $Revision$, $Date$
+ */
+public interface ListenCallback
+{
+    /**
+     * This method is invoked on the callback object to notify it that new data has
+     * been received from one of the cluster nodes.
+     * @param data - the message bytes received from the cluster/replication system
+     */
+     public void messageDataReceived(ChannelMessage data);
+     
 }
\ No newline at end of file
index 62f337d..bb65607 100644 (file)
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements.  See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License.  You may obtain a copy of the License at\r
- * \r
- *      http://www.apache.org/licenses/LICENSE-2.0\r
- * \r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-package org.apache.catalina.tribes.io;\r
-\r
-import java.io.IOException;\r
-import java.net.Socket;\r
-import java.nio.ByteBuffer;\r
-import java.nio.channels.SocketChannel;\r
-\r
-import org.apache.catalina.tribes.ChannelMessage;\r
-\r
-\r
-\r
-/**\r
- * The object reader object is an object used in conjunction with\r
- * java.nio TCP messages. This object stores the message bytes in a\r
- * <code>XByteBuffer</code> until a full package has been received.\r
- * This object uses an XByteBuffer which is an extendable object buffer that also allows\r
- * for message encoding and decoding.\r
- *\r
- * @author Filip Hanik\r
- * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $\r
- */\r
-public class ObjectReader {\r
-\r
-    protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ObjectReader.class);\r
-\r
-    private XByteBuffer buffer;\r
-    \r
-    protected long lastAccess = System.currentTimeMillis();\r
-    \r
-    protected boolean accessed = false;\r
-    private boolean cancelled;\r
-\r
-    /**\r
-     * Creates an <code>ObjectReader</code> for a TCP NIO socket channel\r
-     * @param channel - the channel to be read.\r
-     */\r
-    public ObjectReader(SocketChannel channel) {\r
-        this(channel.socket());\r
-    }\r
-    \r
-    /**\r
-     * Creates an <code>ObjectReader</code> for a TCP socket\r
-     * @param socket Socket\r
-     */\r
-    public ObjectReader(Socket socket) {\r
-        try{\r
-            this.buffer = new XByteBuffer(socket.getReceiveBufferSize(), true);\r
-        }catch ( IOException x ) {\r
-            //unable to get buffer size\r
-            log.warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes.");\r
-            this.buffer = new XByteBuffer(43800,true);\r
-        }\r
-    }\r
-    \r
-    public synchronized void access() {\r
-        this.accessed = true;\r
-        this.lastAccess = System.currentTimeMillis();\r
-    }\r
-    \r
-    public synchronized void finish() {\r
-        this.accessed = false;\r
-        this.lastAccess = System.currentTimeMillis();\r
-    }\r
-    \r
-    public boolean isAccessed() {\r
-        return this.accessed;\r
-    }\r
-\r
-    /**\r
-     * Append new bytes to buffer. \r
-     * @see XByteBuffer#countPackages()\r
-     * @param data new transfer buffer\r
-     * @param off offset\r
-     * @param len length in buffer\r
-     * @return number of messages that sended to callback\r
-     * @throws java.io.IOException\r
-     */\r
-    public int append(ByteBuffer data, int len, boolean count) throws java.io.IOException {\r
-       buffer.append(data,len);\r
-       int pkgCnt = -1;\r
-       if ( count ) pkgCnt = buffer.countPackages();\r
-       return pkgCnt;\r
-   }\r
-\r
-     public int append(byte[] data,int off,int len, boolean count) throws java.io.IOException {\r
-        buffer.append(data,off,len);\r
-        int pkgCnt = -1;\r
-        if ( count ) pkgCnt = buffer.countPackages();\r
-        return pkgCnt;\r
-    }\r
-\r
-    /**\r
-     * Send buffer to cluster listener (callback).\r
-     * Is message complete receiver send message to callback?\r
-     *\r
-     * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#messageDataReceived(ChannelMessage)\r
-     * @see XByteBuffer#doesPackageExist()\r
-     * @see XByteBuffer#extractPackage(boolean)\r
-     *\r
-     * @return number of received packages/messages\r
-     * @throws java.io.IOException\r
-     */\r
-    public ChannelMessage[] execute() throws java.io.IOException {\r
-        int pkgCnt = buffer.countPackages();\r
-        ChannelMessage[] result = new ChannelMessage[pkgCnt];\r
-        for (int i=0; i<pkgCnt; i++)  {\r
-            ChannelMessage data = buffer.extractPackage(true);\r
-            result[i] = data;\r
-        }\r
-        return result;\r
-    }\r
-    \r
-    public int bufferSize() {\r
-        return buffer.getLength();\r
-    }\r
-    \r
-\r
-    public boolean hasPackage() {\r
-        return buffer.countPackages(true)>0;\r
-    }\r
-    /**\r
-     * Returns the number of packages that the reader has read\r
-     * @return int\r
-     */\r
-    public int count() {\r
-        return buffer.countPackages();\r
-    }\r
-    \r
-    public void close() {\r
-        this.buffer = null;\r
-    }\r
-\r
-    public long getLastAccess() {\r
-        return lastAccess;\r
-    }\r
-\r
-    public boolean isCancelled() {\r
-        return cancelled;\r
-    }\r
-\r
-    public void setLastAccess(long lastAccess) {\r
-        this.lastAccess = lastAccess;\r
-    }\r
-\r
-    public void setCancelled(boolean cancelled) {\r
-        this.cancelled = cancelled;\r
-    }\r
-\r
-}\r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.catalina.tribes.ChannelMessage;
+
+
+
+/**
+ * The object reader object is an object used in conjunction with
+ * java.nio TCP messages. This object stores the message bytes in a
+ * <code>XByteBuffer</code> until a full package has been received.
+ * This object uses an XByteBuffer which is an extendable object buffer that also allows
+ * for message encoding and decoding.
+ *
+ * @author Filip Hanik
+ * @version $Revision$, $Date$
+ */
+public class ObjectReader {
+
+    protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ObjectReader.class);
+
+    private XByteBuffer buffer;
+    
+    protected long lastAccess = System.currentTimeMillis();
+    
+    protected boolean accessed = false;
+    private boolean cancelled;
+
+    /**
+     * Creates an <code>ObjectReader</code> for a TCP NIO socket channel
+     * @param channel - the channel to be read.
+     */
+    public ObjectReader(SocketChannel channel) {
+        this(channel.socket());
+    }
+    
+    /**
+     * Creates an <code>ObjectReader</code> for a TCP socket
+     * @param socket Socket
+     */
+    public ObjectReader(Socket socket) {
+        try{
+            this.buffer = new XByteBuffer(socket.getReceiveBufferSize(), true);
+        }catch ( IOException x ) {
+            //unable to get buffer size
+            log.warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes.");
+            this.buffer = new XByteBuffer(43800,true);
+        }
+    }
+    
+    public synchronized void access() {
+        this.accessed = true;
+        this.lastAccess = System.currentTimeMillis();
+    }
+    
+    public synchronized void finish() {
+        this.accessed = false;
+        this.lastAccess = System.currentTimeMillis();
+    }
+    
+    public boolean isAccessed() {
+        return this.accessed;
+    }
+
+    /**
+     * Append new bytes to buffer. 
+     * @see XByteBuffer#countPackages()
+     * @param data new transfer buffer
+     * @param off offset
+     * @param len length in buffer
+     * @return number of messages that sended to callback
+     * @throws java.io.IOException
+     */
+    public int append(ByteBuffer data, int len, boolean count) throws java.io.IOException {
+       buffer.append(data,len);
+       int pkgCnt = -1;
+       if ( count ) pkgCnt = buffer.countPackages();
+       return pkgCnt;
+   }
+
+     public int append(byte[] data,int off,int len, boolean count) throws java.io.IOException {
+        buffer.append(data,off,len);
+        int pkgCnt = -1;
+        if ( count ) pkgCnt = buffer.countPackages();
+        return pkgCnt;
+    }
+
+    /**
+     * Send buffer to cluster listener (callback).
+     * Is message complete receiver send message to callback?
+     *
+     * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#messageDataReceived(ChannelMessage)
+     * @see XByteBuffer#doesPackageExist()
+     * @see XByteBuffer#extractPackage(boolean)
+     *
+     * @return number of received packages/messages
+     * @throws java.io.IOException
+     */
+    public ChannelMessage[] execute() throws java.io.IOException {
+        int pkgCnt = buffer.countPackages();
+        ChannelMessage[] result = new ChannelMessage[pkgCnt];
+        for (int i=0; i<pkgCnt; i++)  {
+            ChannelMessage data = buffer.extractPackage(true);
+            result[i] = data;
+        }
+        return result;
+    }
+    
+    public int bufferSize() {
+        return buffer.getLength();
+    }
+    
+
+    public boolean hasPackage() {
+        return buffer.countPackages(true)>0;
+    }
+    /**
+     * Returns the number of packages that the reader has read
+     * @return int
+     */
+    public int count() {
+        return buffer.countPackages();
+    }
+    
+    public void close() {
+        this.buffer = null;
+    }
+
+    public long getLastAccess() {
+        return lastAccess;
+    }
+
+    public boolean isCancelled() {
+        return cancelled;
+    }
+
+    public void setLastAccess(long lastAccess) {
+        this.lastAccess = lastAccess;
+    }
+
+    public void setCancelled(boolean cancelled) {
+        this.cancelled = cancelled;
+    }
+
+}
index 1827dec..38cd29b 100644 (file)
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements.  See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License.  You may obtain a copy of the License at\r
- * \r
- *      http://www.apache.org/licenses/LICENSE-2.0\r
- * \r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-\r
-\r
-package org.apache.catalina.tribes.io;\r
-\r
-import java.io.IOException;\r
-import java.io.InputStream;\r
-import java.io.ObjectInputStream;\r
-import java.io.ObjectStreamClass;\r
-\r
-/**\r
- * Custom subclass of <code>ObjectInputStream</code> that loads from the\r
- * class loader for this web application.  This allows classes defined only\r
- * with the web application to be found correctly.\r
- *\r
- * @author Craig R. McClanahan\r
- * @author Bip Thelin\r
- * @author Filip Hanik\r
- * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $\r
- */\r
-\r
-public final class ReplicationStream extends ObjectInputStream {\r
-\r
-    \r
-    /**\r
-     * The class loader we will use to resolve classes.\r
-     */\r
-    private ClassLoader[] classLoaders = null;\r
-    \r
-\r
-    /**\r
-     * Construct a new instance of CustomObjectInputStream\r
-     *\r
-     * @param stream The input stream we will read from\r
-     * @param classLoader The class loader used to instantiate objects\r
-     *\r
-     * @exception IOException if an input/output error occurs\r
-     */\r
-    public ReplicationStream(InputStream stream,\r
-                             ClassLoader[] classLoaders)\r
-        throws IOException {\r
-\r
-        super(stream);\r
-        this.classLoaders = classLoaders;\r
-    }\r
-\r
-    /**\r
-     * Load the local class equivalent of the specified stream class\r
-     * description, by using the class loader assigned to this Context.\r
-     *\r
-     * @param classDesc Class description from the input stream\r
-     *\r
-     * @exception ClassNotFoundException if this class cannot be found\r
-     * @exception IOException if an input/output error occurs\r
-     */\r
-    public Class resolveClass(ObjectStreamClass classDesc)\r
-        throws ClassNotFoundException, IOException {\r
-        String name = classDesc.getName();\r
-        boolean tryRepFirst = name.startsWith("org.apache.catalina.tribes");\r
-        try {\r
-            try\r
-            {\r
-                if ( tryRepFirst ) return findReplicationClass(name);\r
-                else return findExternalClass(name);\r
-            }\r
-            catch ( Exception x )\r
-            {\r
-                if ( tryRepFirst ) return findExternalClass(name);\r
-                else return findReplicationClass(name);\r
-            }\r
-        } catch (ClassNotFoundException e) {\r
-            return super.resolveClass(classDesc);\r
-        }\r
-    }\r
-    \r
-    public Class findReplicationClass(String name)\r
-        throws ClassNotFoundException, IOException {\r
-        Class clazz = Class.forName(name, false, getClass().getClassLoader());\r
-        return clazz;\r
-    }\r
-\r
-    public Class findExternalClass(String name) throws ClassNotFoundException  {\r
-        ClassNotFoundException cnfe = null;\r
-        for (int i=0; i<classLoaders.length; i++ ) {\r
-            try {\r
-                Class clazz = Class.forName(name, false, classLoaders[i]);\r
-                return clazz;\r
-            } catch ( ClassNotFoundException x ) {\r
-                cnfe = x;\r
-            } \r
-        }\r
-        if ( cnfe != null ) throw cnfe;\r
-        else throw new ClassNotFoundException(name);\r
-    }\r
-    \r
-    public void close() throws IOException  {\r
-        this.classLoaders = null;\r
-        super.close();\r
-    }\r
-\r
-\r
-}\r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.tribes.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * Custom subclass of <code>ObjectInputStream</code> that loads from the
+ * class loader for this web application.  This allows classes defined only
+ * with the web application to be found correctly.
+ *
+ * @author Craig R. McClanahan
+ * @author Bip Thelin
+ * @author Filip Hanik
+ * @version $Revision$, $Date$
+ */
+
+public final class ReplicationStream extends ObjectInputStream {
+
+    
+    /**
+     * The class loader we will use to resolve classes.
+     */
+    private ClassLoader[] classLoaders = null;
+    
+
+    /**
+     * Construct a new instance of CustomObjectInputStream
+     *
+     * @param stream The input stream we will read from
+     * @param classLoader The class loader used to instantiate objects
+     *
+     * @exception IOException if an input/output error occurs
+     */
+    public ReplicationStream(InputStream stream,
+                             ClassLoader[] classLoaders)
+        throws IOException {
+
+        super(stream);
+        this.classLoaders = classLoaders;
+    }
+
+    /**
+     * Load the local class equivalent of the specified stream class
+     * description, by using the class loader assigned to this Context.
+     *
+     * @param classDesc Class description from the input stream
+     *
+     * @exception ClassNotFoundException if this class cannot be found
+     * @exception IOException if an input/output error occurs
+     */
+    public Class resolveClass(ObjectStreamClass classDesc)
+        throws ClassNotFoundException, IOException {
+        String name = classDesc.getName();
+        boolean tryRepFirst = name.startsWith("org.apache.catalina.tribes");
+        try {
+            try
+            {
+                if ( tryRepFirst ) return findReplicationClass(name);
+                else return findExternalClass(name);
+            }
+            catch ( Exception x )
+            {
+                if ( tryRepFirst ) return findExternalClass(name);
+                else return findReplicationClass(name);
+            }
+        } catch (ClassNotFoundException e) {
+            return super.resolveClass(classDesc);
+        }
+    }
+    
+    public Class findReplicationClass(String name)
+        throws ClassNotFoundException, IOException {
+        Class clazz = Class.forName(name, false, getClass().getClassLoader());
+        return clazz;
+    }
+
+    public Class findExternalClass(String name) throws ClassNotFoundException  {
+        ClassNotFoundException cnfe = null;
+        for (int i=0; i<classLoaders.length; i++ ) {
+            try {
+                Class clazz = Class.forName(name, false, classLoaders[i]);
+                return clazz;
+            } catch ( ClassNotFoundException x ) {
+                cnfe = x;
+            } 
+        }
+        if ( cnfe != null ) throw cnfe;
+        else throw new ClassNotFoundException(name);
+    }
+    
+    public void close() throws IOException  {
+        this.classLoaders = null;
+        super.close();
+    }
+
+
+}
index 082d2fa..021cb0e 100644 (file)
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements.  See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License.  You may obtain a copy of the License at\r
- * \r
- *      http://www.apache.org/licenses/LICENSE-2.0\r
- * \r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-\r
-package org.apache.catalina.tribes.io;\r
-\r
-import java.io.ByteArrayInputStream;\r
-import java.io.ByteArrayOutputStream;\r
-import java.io.IOException;\r
-import java.io.InputStream;\r
-import java.io.ObjectInputStream;\r
-import java.io.ObjectOutputStream;\r
-import java.io.Serializable;\r
-import java.nio.ByteBuffer;\r
-\r
-/**\r
- * The XByteBuffer provides a dual functionality.\r
- * One, it stores message bytes and automatically extends the byte buffer if needed.<BR>\r
- * Two, it can encode and decode packages so that they can be defined and identified\r
- * as they come in on a socket.\r
- * <br>\r
- * <b>THIS CLASS IS NOT THREAD SAFE</B><BR>\r
- * <br/>\r
- * Transfer package:\r
- * <ul>\r
- * <li><b>START_DATA/b> - 7 bytes - <i>FLT2002</i></li>\r
- * <li><b>SIZE</b>      - 4 bytes - size of the data package</li>\r
- * <li><b>DATA</b>      - should be as many bytes as the prev SIZE</li>\r
- * <li><b>END_DATA</b>  - 7 bytes - <i>TLF2003</i></lI>\r
- * </ul>\r
- * @author Filip Hanik\r
- * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $\r
- */\r
-public class XByteBuffer\r
-{\r
-    \r
-    public static org.apache.juli.logging.Log log =\r
-        org.apache.juli.logging.LogFactory.getLog( XByteBuffer.class );\r
-    \r
-    /**\r
-     * This is a package header, 7 bytes (FLT2002)\r
-     */\r
-    public static final byte[] START_DATA = {70,76,84,50,48,48,50};\r
-    \r
-    /**\r
-     * This is the package footer, 7 bytes (TLF2003)\r
-     */\r
-    public static final byte[] END_DATA = {84,76,70,50,48,48,51};\r
\r
-    /**\r
-     * Default size on the initial byte buffer\r
-     */\r
-    private static final int DEF_SIZE = 2048;\r
\r
-    /**\r
-     * Default size to extend the buffer with\r
-     */\r
-    private static final int DEF_EXT  = 1024;\r
-    \r
-    /**\r
-     * Variable to hold the data\r
-     */\r
-    protected byte[] buf = null;\r
-    \r
-    /**\r
-     * Current length of data in the buffer\r
-     */\r
-    protected int bufSize = 0;\r
-    \r
-    /**\r
-     * Flag for discarding invalid packages\r
-     * If this flag is set to true, and append(byte[],...) is called,\r
-     * the data added will be inspected, and if it doesn't start with \r
-     * <code>START_DATA</code> it will be thrown away.\r
-     * \r
-     */\r
-    protected boolean discard = true;\r
-    \r
-    /**\r
-     * Constructs a new XByteBuffer\r
-     * @param size - the initial size of the byte buffer\r
-     * @todo use a pool of byte[] for performance\r
-     */\r
-    public XByteBuffer(int size, boolean discard) {\r
-        buf = new byte[size];\r
-        this.discard = discard;\r
-    }\r
-    \r
-    public XByteBuffer(byte[] data,boolean discard) {\r
-        this(data,data.length+128,discard);\r
-    }\r
-    \r
-    public XByteBuffer(byte[] data, int size,boolean discard) {\r
-        int length = Math.max(data.length,size);\r
-        buf = new byte[length];\r
-        System.arraycopy(data,0,buf,0,data.length);\r
-        bufSize = data.length;\r
-        this.discard = discard;\r
-    }\r
-    \r
-    public int getLength() {\r
-        return bufSize;\r
-    }\r
-\r
-    public void setLength(int size) {\r
-        if ( size > buf.length ) throw new ArrayIndexOutOfBoundsException("Size is larger than existing buffer.");\r
-        bufSize = size;\r
-    }\r
-    \r
-    public void trim(int length) {\r
-        if ( (bufSize - length) < 0 ) \r
-            throw new ArrayIndexOutOfBoundsException("Can't trim more bytes than are available. length:"+bufSize+" trim:"+length);\r
-        bufSize -= length;\r
-    }\r
-    \r
-    public void reset() {\r
-        bufSize = 0;\r
-    }\r
-            \r
-    public byte[] getBytesDirect() {\r
-        return this.buf;\r
-    }\r
-\r
-    /**\r
-     * Returns the bytes in the buffer, in its exact length\r
-     */\r
-    public byte[] getBytes() {\r
-        byte[] b = new byte[bufSize];\r
-        System.arraycopy(buf,0,b,0,bufSize);\r
-        return b;\r
-    }\r
-\r
-    /**\r
-     * Resets the buffer\r
-     */\r
-    public void clear() {\r
-        bufSize = 0;\r
-    }\r
-\r
-    /**\r
-     * Appends the data to the buffer. If the data is incorrectly formatted, ie, the data should always start with the\r
-     * header, false will be returned and the data will be discarded.\r
-     * @param b - bytes to be appended\r
-     * @param off - the offset to extract data from\r
-     * @param len - the number of bytes to append.\r
-     * @return true if the data was appended correctly. Returns false if the package is incorrect, ie missing header or something, or the length of data is 0\r
-     */\r
-    public boolean append(ByteBuffer b, int len) {\r
-        int newcount = bufSize + len;\r
-        if (newcount > buf.length) {\r
-            expand(newcount);\r
-        }\r
-        b.get(buf,bufSize,len);\r
-        \r
-        bufSize = newcount;\r
-        \r
-        if ( discard ) {\r
-            if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {\r
-                bufSize = 0;\r
-                log.error("Discarded the package, invalid header");\r
-                return false;\r
-            }\r
-        }\r
-        return true;\r
-\r
-    }\r
-    \r
-    public boolean append(byte i) {\r
-        int newcount = bufSize + 1;\r
-        if (newcount > buf.length) {\r
-            expand(newcount);\r
-        }\r
-        buf[bufSize] = i;\r
-        bufSize = newcount;\r
-        return true;\r
-    }\r
-\r
-\r
-    public boolean append(boolean i) {\r
-        int newcount = bufSize + 1;\r
-        if (newcount > buf.length) {\r
-            expand(newcount);\r
-        }\r
-        XByteBuffer.toBytes(i,buf,bufSize);\r
-        bufSize = newcount;\r
-        return true;\r
-    }\r
-\r
-    public boolean append(long i) {\r
-        int newcount = bufSize + 8;\r
-        if (newcount > buf.length) {\r
-            expand(newcount);\r
-        }\r
-        XByteBuffer.toBytes(i,buf,bufSize);\r
-        bufSize = newcount;\r
-        return true;\r
-    }\r
-    \r
-    public boolean append(int i) {\r
-        int newcount = bufSize + 4;\r
-        if (newcount > buf.length) {\r
-            expand(newcount);\r
-        }\r
-        XByteBuffer.toBytes(i,buf,bufSize);\r
-        bufSize = newcount;\r
-        return true;\r
-    }\r
-\r
-    public boolean append(byte[] b, int off, int len) {\r
-        if ((off < 0) || (off > b.length) || (len < 0) ||\r
-            ((off + len) > b.length) || ((off + len) < 0))  {\r
-            throw new IndexOutOfBoundsException();\r
-        } else if (len == 0) {\r
-            return false;\r
-        }\r
-\r
-        int newcount = bufSize + len;\r
-        if (newcount > buf.length) {\r
-            expand(newcount);\r
-        }\r
-        System.arraycopy(b, off, buf, bufSize, len);\r
-        bufSize = newcount;\r
-\r
-        if ( discard ) {\r
-            if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {\r
-                bufSize = 0;\r
-                log.error("Discarded the package, invalid header");\r
-                return false;\r
-            }\r
-        }\r
-        return true;\r
-    }\r
-\r
-    public void expand(int newcount) {\r
-        //don't change the allocation strategy\r
-        byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];\r
-        System.arraycopy(buf, 0, newbuf, 0, bufSize);\r
-        buf = newbuf;\r
-    }\r
-    \r
-    public int getCapacity() {\r
-        return buf.length;\r
-    }\r
-\r
-\r
-    /**\r
-     * Internal mechanism to make a check if a complete package exists\r
-     * within the buffer\r
-     * @return - true if a complete package (header,compress,size,data,footer) exists within the buffer\r
-     */\r
-    public int countPackages() {\r
-        return countPackages(false);\r
-    }\r
-\r
-    public int countPackages(boolean first)\r
-    {\r
-        int cnt = 0;\r
-        int pos = START_DATA.length;\r
-        int start = 0;\r
-\r
-        while ( start < bufSize ) {\r
-            //first check start header\r
-            int index = XByteBuffer.firstIndexOf(buf,start,START_DATA);\r
-            //if the header (START_DATA) isn't the first thing or\r
-            //the buffer isn't even 14 bytes\r
-            if ( index != start || ((bufSize-start)<14) ) break;\r
-            //next 4 bytes are compress flag not needed for count packages\r
-            //then get the size 4 bytes\r
-            int size = toInt(buf, pos);\r
-            //now the total buffer has to be long enough to hold\r
-            //START_DATA.length+4+size+END_DATA.length\r
-            pos = start + START_DATA.length + 4 + size;\r
-            if ( (pos + END_DATA.length) > bufSize) break;\r
-            //and finally check the footer of the package END_DATA\r
-            int newpos = firstIndexOf(buf, pos, END_DATA);\r
-            //mismatch, there is no package\r
-            if (newpos != pos) break;\r
-            //increase the packet count\r
-            cnt++;\r
-            //reset the values\r
-            start = pos + END_DATA.length;\r
-            pos = start + START_DATA.length;\r
-            //we only want to verify that we have at least one package\r
-            if ( first ) break;\r
-        }\r
-        return cnt;\r
-    }\r
-\r
-    /**\r
-     * Method to check if a package exists in this byte buffer.\r
-     * @return - true if a complete package (header,options,size,data,footer) exists within the buffer\r
-     */\r
-    public boolean doesPackageExist()  {\r
-        return (countPackages(true)>0);\r
-    }\r
-\r
-    /**\r
-     * Extracts the message bytes from a package.\r
-     * If no package exists, a IllegalStateException will be thrown.\r
-     * @param clearFromBuffer - if true, the package will be removed from the byte buffer\r
-     * @return - returns the actual message bytes (header, compress,size and footer not included).\r
-     */\r
-    public XByteBuffer extractDataPackage(boolean clearFromBuffer) {\r
-        int psize = countPackages(true);\r
-        if (psize == 0) {\r
-            throw new java.lang.IllegalStateException("No package exists in XByteBuffer");\r
-        }\r
-        int size = toInt(buf, START_DATA.length);\r
-        XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false);\r
-        xbuf.setLength(size);\r
-        System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size);\r
-        if (clearFromBuffer) {\r
-            int totalsize = START_DATA.length + 4 + size + END_DATA.length;\r
-            bufSize = bufSize - totalsize;\r
-            System.arraycopy(buf, totalsize, buf, 0, bufSize);\r
-        }\r
-        return xbuf;\r
-\r
-    }\r
-    \r
-    public ChannelData extractPackage(boolean clearFromBuffer) throws java.io.IOException {\r
-        XByteBuffer xbuf = extractDataPackage(clearFromBuffer);\r
-        ChannelData cdata = ChannelData.getDataFromPackage(xbuf);\r
-        return cdata;\r
-    }\r
-    \r
-    /**\r
-     * Creates a complete data package\r
-     * @param indata - the message data to be contained within the package\r
-     * @param compressed - compression flag for the indata buffer\r
-     * @return - a full package (header,size,data,footer)\r
-     * \r
-     */\r
-    public static byte[] createDataPackage(ChannelData cdata) {\r
-//        return createDataPackage(cdata.getDataPackage());\r
-        //avoid one extra byte array creation\r
-        int dlength = cdata.getDataPackageLength();\r
-        int length = getDataPackageLength(dlength);\r
-        byte[] data = new byte[length];\r
-        int offset = 0;\r
-        System.arraycopy(START_DATA, 0, data, offset, START_DATA.length);\r
-        offset += START_DATA.length;\r
-        toBytes(dlength,data, START_DATA.length);\r
-        offset += 4;\r
-        cdata.getDataPackage(data,offset);\r
-        offset += dlength;\r
-        System.arraycopy(END_DATA, 0, data, offset, END_DATA.length);\r
-        offset += END_DATA.length;\r
-        return data;\r
-    }\r
-    \r
-    public static byte[] createDataPackage(byte[] data, int doff, int dlength, byte[] buffer, int bufoff) {\r
-        if ( (buffer.length-bufoff) > getDataPackageLength(dlength) ) {\r
-            throw new ArrayIndexOutOfBoundsException("Unable to create data package, buffer is too small.");\r
-        }\r
-        System.arraycopy(START_DATA, 0, buffer, bufoff, START_DATA.length);\r
-        toBytes(data.length,buffer, bufoff+START_DATA.length);\r
-        System.arraycopy(data, doff, buffer, bufoff+START_DATA.length + 4, dlength);\r
-        System.arraycopy(END_DATA, 0, buffer, bufoff+START_DATA.length + 4 + data.length, END_DATA.length);\r
-        return buffer;\r
-    }\r
-\r
-    \r
-    public static int getDataPackageLength(int datalength) {\r
-        int length = \r
-            START_DATA.length + //header length\r
-            4 + //data length indicator\r
-            datalength + //actual data length\r
-            END_DATA.length; //footer length\r
-        return length;\r
-\r
-    }\r
-    \r
-    public static byte[] createDataPackage(byte[] data) {\r
-        int length = getDataPackageLength(data.length);\r
-        byte[] result = new byte[length];\r
-        return createDataPackage(data,0,data.length,result,0);\r
-    }\r
-    \r
-    \r
-\r
-//    public static void fillDataPackage(byte[] data, int doff, int dlength, XByteBuffer buf) {\r
-//        int pkglen = getDataPackageLength(dlength);\r
-//        if ( buf.getCapacity() <  pkglen ) buf.expand(pkglen);\r
-//        createDataPackage(data,doff,dlength,buf.getBytesDirect(),buf.getLength());\r
-//    }\r
-\r
-    /**\r
-     * Convert four bytes to an int\r
-     * @param b - the byte array containing the four bytes\r
-     * @param off - the offset\r
-     * @return the integer value constructed from the four bytes\r
-     * @exception java.lang.ArrayIndexOutOfBoundsException\r
-     */\r
-    public static int toInt(byte[] b,int off){\r
-        return ( ( (int) b[off+3]) & 0xFF) +\r
-            ( ( ( (int) b[off+2]) & 0xFF) << 8) +\r
-            ( ( ( (int) b[off+1]) & 0xFF) << 16) +\r
-            ( ( ( (int) b[off+0]) & 0xFF) << 24);\r
-    }\r
-\r
-    /**\r
-     * Convert eight bytes to a long\r
-     * @param b - the byte array containing the four bytes\r
-     * @param off - the offset\r
-     * @return the long value constructed from the eight bytes\r
-     * @exception java.lang.ArrayIndexOutOfBoundsException\r
-     */\r
-    public static long toLong(byte[] b,int off){\r
-        return ( ( (long) b[off+7]) & 0xFF) +\r
-            ( ( ( (long) b[off+6]) & 0xFF) << 8) +\r
-            ( ( ( (long) b[off+5]) & 0xFF) << 16) +\r
-            ( ( ( (long) b[off+4]) & 0xFF) << 24) +\r
-            ( ( ( (long) b[off+3]) & 0xFF) << 32) +\r
-            ( ( ( (long) b[off+2]) & 0xFF) << 40) +\r
-            ( ( ( (long) b[off+1]) & 0xFF) << 48) +\r
-            ( ( ( (long) b[off+0]) & 0xFF) << 56);\r
-    }\r
-\r
-    \r
-    /**\r
-     * Converts an integer to four bytes\r
-     * @param n - the integer\r
-     * @return - four bytes in an array\r
-     * @deprecated use toBytes(boolean,byte[],int)\r
-     */\r
-    public static byte[] toBytes(boolean bool) {\r
-        byte[] b = new byte[1] ;\r
-        return toBytes(bool,b,0);\r
-\r
-    }\r
-    \r
-    public static byte[] toBytes(boolean bool, byte[] data, int offset) {\r
-        data[offset] = (byte)(bool?1:0);\r
-        return data;\r
-    }\r
-    \r
-    /**\r
-     * \r
-     * @param <any> long\r
-     * @return use\r
-     */\r
-    public static boolean toBoolean(byte[] b, int offset) {\r
-        return b[offset] != 0;\r
-    }\r
-\r
-    \r
-    /**\r
-     * Converts an integer to four bytes\r
-     * @param n - the integer\r
-     * @return - four bytes in an array\r
-     * @deprecated use toBytes(int,byte[],int)\r
-     */\r
-    public static byte[] toBytes(int n) {\r
-        return toBytes(n,new byte[4],0);\r
-    }\r
-\r
-    public static byte[] toBytes(int n,byte[] b, int offset) {\r
-        b[offset+3] = (byte) (n);\r
-        n >>>= 8;\r
-        b[offset+2] = (byte) (n);\r
-        n >>>= 8;\r
-        b[offset+1] = (byte) (n);\r
-        n >>>= 8;\r
-        b[offset+0] = (byte) (n);\r
-        return b;\r
-    }\r
-\r
-    /**\r
-     * Converts an long to eight bytes\r
-     * @param n - the long\r
-     * @return - eight bytes in an array\r
-     * @deprecated use toBytes(long,byte[],int)\r
-     */\r
-    public static byte[] toBytes(long n) {\r
-        return toBytes(n,new byte[8],0);\r
-    }\r
-    public static byte[] toBytes(long n, byte[] b, int offset) {\r
-        b[offset+7] = (byte) (n);\r
-        n >>>= 8;\r
-        b[offset+6] = (byte) (n);\r
-        n >>>= 8;\r
-        b[offset+5] = (byte) (n);\r
-        n >>>= 8;\r
-        b[offset+4] = (byte) (n);\r
-        n >>>= 8;\r
-        b[offset+3] = (byte) (n);\r
-        n >>>= 8;\r
-        b[offset+2] = (byte) (n);\r
-        n >>>= 8;\r
-        b[offset+1] = (byte) (n);\r
-        n >>>= 8;\r
-        b[offset+0] = (byte) (n);\r
-        return b;\r
-    }\r
-\r
-    /**\r
-     * Similar to a String.IndexOf, but uses pure bytes\r
-     * @param src - the source bytes to be searched\r
-     * @param srcOff - offset on the source buffer\r
-     * @param find - the string to be found within src\r
-     * @return - the index of the first matching byte. -1 if the find array is not found\r
-     */\r
-    public static int firstIndexOf(byte[] src, int srcOff, byte[] find){\r
-        int result = -1;\r
-        if (find.length > src.length) return result;\r
-        if (find.length == 0 || src.length == 0) return result;\r
-        if (srcOff >= src.length ) throw new java.lang.ArrayIndexOutOfBoundsException();\r
-        boolean found = false;\r
-        int srclen = src.length;\r
-        int findlen = find.length;\r
-        byte first = find[0];\r
-        int pos = srcOff;\r
-        while (!found) {\r
-            //find the first byte\r
-            while (pos < srclen){\r
-                if (first == src[pos])\r
-                    break;\r
-                pos++;\r
-            }\r
-            if (pos >= srclen)\r
-                return -1;\r
-\r
-            //we found the first character\r
-            //match the rest of the bytes - they have to match\r
-            if ( (srclen - pos) < findlen)\r
-                return -1;\r
-            //assume it does exist\r
-            found = true;\r
-            for (int i = 1; ( (i < findlen) && found); i++)\r
-                found = found && (find[i] == src[pos + i]);\r
-            if (found)\r
-                result = pos;\r
-            else if ( (srclen - pos) < findlen)\r
-                return -1; //no more matches possible\r
-            else\r
-                pos++;\r
-        }\r
-        return result;\r
-    }\r
-\r
-    \r
-    public static Serializable deserialize(byte[] data) \r
-        throws IOException, ClassNotFoundException, ClassCastException {\r
-        return deserialize(data,0,data.length);\r
-    }\r
-    \r
-    public static Serializable deserialize(byte[] data, int offset, int length)  \r
-        throws IOException, ClassNotFoundException, ClassCastException {\r
-        return deserialize(data,offset,length,null);     \r
-    }\r
-    public static int invokecount = 0;\r
-    public static Serializable deserialize(byte[] data, int offset, int length, ClassLoader[] cls) \r
-        throws IOException, ClassNotFoundException, ClassCastException {\r
-        synchronized (XByteBuffer.class) { invokecount++;}\r
-        Object message = null;\r
-        if ( cls == null ) cls = new ClassLoader[0];\r
-        if (data != null) {\r
-            InputStream  instream = new ByteArrayInputStream(data,offset,length);\r
-            ObjectInputStream stream = null;\r
-            stream = (cls.length>0)? new ReplicationStream(instream,cls):new ObjectInputStream(instream);\r
-            message = stream.readObject();\r
-            instream.close();\r
-            stream.close();\r
-        }\r
-        if ( message == null ) {\r
-            return null;\r
-        } else if (message instanceof Serializable)\r
-            return (Serializable) message;\r
-        else {\r
-            throw new ClassCastException("Message has the wrong class. It should implement Serializable, instead it is:"+message.getClass().getName());\r
-        }\r
-    }\r
-\r
-    /**\r
-     * Serializes a message into cluster data\r
-     * @param msg ClusterMessage\r
-     * @param compress boolean\r
-     * @return \r
-     * @throws IOException\r
-     */\r
-    public static byte[] serialize(Serializable msg) throws IOException {\r
-        ByteArrayOutputStream outs = new ByteArrayOutputStream();\r
-        ObjectOutputStream out = new ObjectOutputStream(outs);\r
-        out.writeObject(msg);\r
-        out.flush();\r
-        byte[] data = outs.toByteArray();\r
-        return data;\r
-    }\r
-\r
-    public void setDiscard(boolean discard) {\r
-        this.discard = discard;\r
-    }\r
-\r
-    public boolean getDiscard() {\r
-        return discard;\r
-    }\r
-\r
-}\r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * The XByteBuffer provides a dual functionality.
+ * One, it stores message bytes and automatically extends the byte buffer if needed.<BR>
+ * Two, it can encode and decode packages so that they can be defined and identified
+ * as they come in on a socket.
+ * <br>
+ * <b>THIS CLASS IS NOT THREAD SAFE</B><BR>
+ * <br/>
+ * Transfer package:
+ * <ul>
+ * <li><b>START_DATA/b> - 7 bytes - <i>FLT2002</i></li>
+ * <li><b>SIZE</b>      - 4 bytes - size of the data package</li>
+ * <li><b>DATA</b>      - should be as many bytes as the prev SIZE</li>
+ * <li><b>END_DATA</b>  - 7 bytes - <i>TLF2003</i></lI>
+ * </ul>
+ * @author Filip Hanik
+ * @version $Revision$, $Date$
+ */
+public class XByteBuffer
+{
+    
+    public static org.apache.juli.logging.Log log =
+        org.apache.juli.logging.LogFactory.getLog( XByteBuffer.class );
+    
+    /**
+     * This is a package header, 7 bytes (FLT2002)
+     */
+    public static final byte[] START_DATA = {70,76,84,50,48,48,50};
+    
+    /**
+     * This is the package footer, 7 bytes (TLF2003)
+     */
+    public static final byte[] END_DATA = {84,76,70,50,48,48,51};
+    /**
+     * Default size on the initial byte buffer
+     */
+    private static final int DEF_SIZE = 2048;
+    /**
+     * Default size to extend the buffer with
+     */
+    private static final int DEF_EXT  = 1024;
+    
+    /**
+     * Variable to hold the data
+     */
+    protected byte[] buf = null;
+    
+    /**
+     * Current length of data in the buffer
+     */
+    protected int bufSize = 0;
+    
+    /**
+     * Flag for discarding invalid packages
+     * If this flag is set to true, and append(byte[],...) is called,
+     * the data added will be inspected, and if it doesn't start with 
+     * <code>START_DATA</code> it will be thrown away.
+     * 
+     */
+    protected boolean discard = true;
+    
+    /**
+     * Constructs a new XByteBuffer
+     * @param size - the initial size of the byte buffer
+     * @todo use a pool of byte[] for performance
+     */
+    public XByteBuffer(int size, boolean discard) {
+        buf = new byte[size];
+        this.discard = discard;
+    }
+    
+    public XByteBuffer(byte[] data,boolean discard) {
+        this(data,data.length+128,discard);
+    }
+    
+    public XByteBuffer(byte[] data, int size,boolean discard) {
+        int length = Math.max(data.length,size);
+        buf = new byte[length];
+        System.arraycopy(data,0,buf,0,data.length);
+        bufSize = data.length;
+        this.discard = discard;
+    }
+    
+    public int getLength() {
+        return bufSize;
+    }
+
+    public void setLength(int size) {
+        if ( size > buf.length ) throw new ArrayIndexOutOfBoundsException("Size is larger than existing buffer.");
+        bufSize = size;
+    }
+    
+    public void trim(int length) {
+        if ( (bufSize - length) < 0 ) 
+            throw new ArrayIndexOutOfBoundsException("Can't trim more bytes than are available. length:"+bufSize+" trim:"+length);
+        bufSize -= length;
+    }
+    
+    public void reset() {
+        bufSize = 0;
+    }
+            
+    public byte[] getBytesDirect() {
+        return this.buf;
+    }
+
+    /**
+     * Returns the bytes in the buffer, in its exact length
+     */
+    public byte[] getBytes() {
+        byte[] b = new byte[bufSize];
+        System.arraycopy(buf,0,b,0,bufSize);
+        return b;
+    }
+
+    /**
+     * Resets the buffer
+     */
+    public void clear() {
+        bufSize = 0;
+    }
+
+    /**
+     * Appends the data to the buffer. If the data is incorrectly formatted, ie, the data should always start with the
+     * header, false will be returned and the data will be discarded.
+     * @param b - bytes to be appended
+     * @param off - the offset to extract data from
+     * @param len - the number of bytes to append.
+     * @return true if the data was appended correctly. Returns false if the package is incorrect, ie missing header or something, or the length of data is 0
+     */
+    public boolean append(ByteBuffer b, int len) {
+        int newcount = bufSize + len;
+        if (newcount > buf.length) {
+            expand(newcount);
+        }
+        b.get(buf,bufSize,len);
+        
+        bufSize = newcount;
+        
+        if ( discard ) {
+            if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {
+                bufSize = 0;
+                log.error("Discarded the package, invalid header");
+                return false;
+            }
+        }
+        return true;
+
+    }
+    
+    public boolean append(byte i) {
+        int newcount = bufSize + 1;
+        if (newcount > buf.length) {
+            expand(newcount);
+        }
+        buf[bufSize] = i;
+        bufSize = newcount;
+        return true;
+    }
+
+
+    public boolean append(boolean i) {
+        int newcount = bufSize + 1;
+        if (newcount > buf.length) {
+            expand(newcount);
+        }
+        XByteBuffer.toBytes(i,buf,bufSize);
+        bufSize = newcount;
+        return true;
+    }
+
+    public boolean append(long i) {
+        int newcount = bufSize + 8;
+        if (newcount > buf.length) {
+            expand(newcount);
+        }
+        XByteBuffer.toBytes(i,buf,bufSize);
+        bufSize = newcount;
+        return true;
+    }
+    
+    public boolean append(int i) {
+        int newcount = bufSize + 4;
+        if (newcount > buf.length) {
+            expand(newcount);
+        }
+        XByteBuffer.toBytes(i,buf,bufSize);
+        bufSize = newcount;
+        return true;
+    }
+
+    public boolean append(byte[] b, int off, int len) {
+        if ((off < 0) || (off > b.length) || (len < 0) ||
+            ((off + len) > b.length) || ((off + len) < 0))  {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return false;
+        }
+
+        int newcount = bufSize + len;
+        if (newcount > buf.length) {
+            expand(newcount);
+        }
+        System.arraycopy(b, off, buf, bufSize, len);
+        bufSize = newcount;
+
+        if ( discard ) {
+            if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {
+                bufSize = 0;
+                log.error("Discarded the package, invalid header");
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public void expand(int newcount) {
+        //don't change the allocation strategy
+        byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+        System.arraycopy(buf, 0, newbuf, 0, bufSize);
+        buf = newbuf;
+    }
+    
+    public int getCapacity() {
+        return buf.length;
+    }
+
+
+    /**
+     * Internal mechanism to make a check if a complete package exists
+     * within the buffer
+     * @return - true if a complete package (header,compress,size,data,footer) exists within the buffer
+     */
+    public int countPackages() {
+        return countPackages(false);
+    }
+
+    public int countPackages(boolean first)
+    {
+        int cnt = 0;
+        int pos = START_DATA.length;
+        int start = 0;
+
+        while ( start < bufSize ) {
+            //first check start header
+            int index = XByteBuffer.firstIndexOf(buf,start,START_DATA);
+            //if the header (START_DATA) isn't the first thing or
+            //the buffer isn't even 14 bytes
+            if ( index != start || ((bufSize-start)<14) ) break;
+            //next 4 bytes are compress flag not needed for count packages
+            //then get the size 4 bytes
+            int size = toInt(buf, pos);
+            //now the total buffer has to be long enough to hold
+            //START_DATA.length+4+size+END_DATA.length
+            pos = start + START_DATA.length + 4 + size;
+            if ( (pos + END_DATA.length) > bufSize) break;
+            //and finally check the footer of the package END_DATA
+            int newpos = firstIndexOf(buf, pos, END_DATA);
+            //mismatch, there is no package
+            if (newpos != pos) break;
+            //increase the packet count
+            cnt++;
+            //reset the values
+            start = pos + END_DATA.length;
+            pos = start + START_DATA.length;
+            //we only want to verify that we have at least one package
+            if ( first ) break;
+        }
+        return cnt;
+    }
+
+    /**
+     * Method to check if a package exists in this byte buffer.
+     * @return - true if a complete package (header,options,size,data,footer) exists within the buffer
+     */
+    public boolean doesPackageExist()  {
+        return (countPackages(true)>0);
+    }
+
+    /**
+     * Extracts the message bytes from a package.
+     * If no package exists, a IllegalStateException will be thrown.
+     * @param clearFromBuffer - if true, the package will be removed from the byte buffer
+     * @return - returns the actual message bytes (header, compress,size and footer not included).
+     */
+    public XByteBuffer extractDataPackage(boolean clearFromBuffer) {
+        int psize = countPackages(true);
+        if (psize == 0) {
+            throw new java.lang.IllegalStateException("No package exists in XByteBuffer");
+        }
+        int size = toInt(buf, START_DATA.length);
+        XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false);
+        xbuf.setLength(size);
+        System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size);
+        if (clearFromBuffer) {
+            int totalsize = START_DATA.length + 4 + size + END_DATA.length;
+            bufSize = bufSize - totalsize;
+            System.arraycopy(buf, totalsize, buf, 0, bufSize);
+        }
+        return xbuf;
+
+    }
+    
+    public ChannelData extractPackage(boolean clearFromBuffer) throws java.io.IOException {
+        XByteBuffer xbuf = extractDataPackage(clearFromBuffer);
+        ChannelData cdata = ChannelData.getDataFromPackage(xbuf);
+        return cdata;
+    }
+    
+    /**
+     * Creates a complete data package
+     * @param indata - the message data to be contained within the package
+     * @param compressed - compression flag for the indata buffer
+     * @return - a full package (header,size,data,footer)
+     * 
+     */
+    public static byte[] createDataPackage(ChannelData cdata) {
+//        return createDataPackage(cdata.getDataPackage());
+        //avoid one extra byte array creation
+        int dlength = cdata.getDataPackageLength();
+        int length = getDataPackageLength(dlength);
+        byte[] data = new byte[length];
+        int offset = 0;
+        System.arraycopy(START_DATA, 0, data, offset, START_DATA.length);
+        offset += START_DATA.length;
+        toBytes(dlength,data, START_DATA.length);
+        offset += 4;
+        cdata.getDataPackage(data,offset);
+        offset += dlength;
+        System.arraycopy(END_DATA, 0, data, offset, END_DATA.length);
+        offset += END_DATA.length;
+        return data;
+    }
+    
+    public static byte[] createDataPackage(byte[] data, int doff, int dlength, byte[] buffer, int bufoff) {
+        if ( (buffer.length-bufoff) > getDataPackageLength(dlength) ) {
+            throw new ArrayIndexOutOfBoundsException("Unable to create data package, buffer is too small.");
+        }
+        System.arraycopy(START_DATA, 0, buffer, bufoff, START_DATA.length);
+        toBytes(data.length,buffer, bufoff+START_DATA.length);
+        System.arraycopy(data, doff, buffer, bufoff+START_DATA.length + 4, dlength);
+        System.arraycopy(END_DATA, 0, buffer, bufoff+START_DATA.length + 4 + data.length, END_DATA.length);
+        return buffer;
+    }
+
+    
+    public static int getDataPackageLength(int datalength) {
+        int length = 
+            START_DATA.length + //header length
+            4 + //data length indicator
+            datalength + //actual data length
+            END_DATA.length; //footer length
+        return length;
+
+    }
+    
+    public static byte[] createDataPackage(byte[] data) {
+        int length = getDataPackageLength(data.length);
+        byte[] result = new byte[length];
+        return createDataPackage(data,0,data.length,result,0);
+    }
+    
+    
+
+//    public static void fillDataPackage(byte[] data, int doff, int dlength, XByteBuffer buf) {
+//        int pkglen = getDataPackageLength(dlength);
+//        if ( buf.getCapacity() <  pkglen ) buf.expand(pkglen);
+//        createDataPackage(data,doff,dlength,buf.getBytesDirect(),buf.getLength());
+//    }
+
+    /**
+     * Convert four bytes to an int
+     * @param b - the byte array containing the four bytes
+     * @param off - the offset
+     * @return the integer value constructed from the four bytes
+     * @exception java.lang.ArrayIndexOutOfBoundsException
+     */
+    public static int toInt(byte[] b,int off){
+        return ( ( (int) b[off+3]) & 0xFF) +
+            ( ( ( (int) b[off+2]) & 0xFF) << 8) +
+            ( ( ( (int) b[off+1]) & 0xFF) << 16) +
+            ( ( ( (int) b[off+0]) & 0xFF) << 24);
+    }
+
+    /**
+     * Convert eight bytes to a long
+     * @param b - the byte array containing the four bytes
+     * @param off - the offset
+     * @return the long value constructed from the eight bytes
+     * @exception java.lang.ArrayIndexOutOfBoundsException
+     */
+    public static long toLong(byte[] b,int off){
+        return ( ( (long) b[off+7]) & 0xFF) +
+            ( ( ( (long) b[off+6]) & 0xFF) << 8) +
+            ( ( ( (long) b[off+5]) & 0xFF) << 16) +
+            ( ( ( (long) b[off+4]) & 0xFF) << 24) +
+            ( ( ( (long) b[off+3]) & 0xFF) << 32) +
+            ( ( ( (long) b[off+2]) & 0xFF) << 40) +
+            ( ( ( (long) b[off+1]) & 0xFF) << 48) +
+            ( ( ( (long) b[off+0]) & 0xFF) << 56);
+    }
+
+    
+    /**
+     * Converts an integer to four bytes
+     * @param n - the integer
+     * @return - four bytes in an array
+     * @deprecated use toBytes(boolean,byte[],int)
+     */
+    public static byte[] toBytes(boolean bool) {
+        byte[] b = new byte[1] ;
+        return toBytes(bool,b,0);
+
+    }
+    
+    public static byte[] toBytes(boolean bool, byte[] data, int offset) {
+        data[offset] = (byte)(bool?1:0);
+        return data;
+    }
+    
+    /**
+     * 
+     * @param <any> long
+     * @return use
+     */
+    public static boolean toBoolean(byte[] b, int offset) {
+        return b[offset] != 0;
+    }
+
+    
+    /**
+     * Converts an integer to four bytes
+     * @param n - the integer
+     * @return - four bytes in an array
+     * @deprecated use toBytes(int,byte[],int)
+     */
+    public static byte[] toBytes(int n) {
+        return toBytes(n,new byte[4],0);
+    }
+
+    public static byte[] toBytes(int n,byte[] b, int offset) {
+        b[offset+3] = (byte) (n);
+        n >>>= 8;
+        b[offset+2] = (byte) (n);
+        n >>>= 8;
+        b[offset+1] = (byte) (n);
+        n >>>= 8;
+        b[offset+0] = (byte) (n);
+        return b;
+    }
+
+    /**
+     * Converts an long to eight bytes
+     * @param n - the long
+     * @return - eight bytes in an array
+     * @deprecated use toBytes(long,byte[],int)
+     */
+    public static byte[] toBytes(long n) {
+        return toBytes(n,new byte[8],0);
+    }
+    public static byte[] toBytes(long n, byte[] b, int offset) {
+        b[offset+7] = (byte) (n);
+        n >>>= 8;
+        b[offset+6] = (byte) (n);
+        n >>>= 8;
+        b[offset+5] = (byte) (n);
+        n >>>= 8;
+        b[offset+4] = (byte) (n);
+        n >>>= 8;
+        b[offset+3] = (byte) (n);
+        n >>>= 8;
+        b[offset+2] = (byte) (n);
+        n >>>= 8;
+        b[offset+1] = (byte) (n);
+        n >>>= 8;
+        b[offset+0] = (byte) (n);
+        return b;
+    }
+
+    /**
+     * Similar to a String.IndexOf, but uses pure bytes
+     * @param src - the source bytes to be searched
+     * @param srcOff - offset on the source buffer
+     * @param find - the string to be found within src
+     * @return - the index of the first matching byte. -1 if the find array is not found
+     */
+    public static int firstIndexOf(byte[] src, int srcOff, byte[] find){
+        int result = -1;
+        if (find.length > src.length) return result;
+        if (find.length == 0 || src.length == 0) return result;
+        if (srcOff >= src.length ) throw new java.lang.ArrayIndexOutOfBoundsException();
+        boolean found = false;
+        int srclen = src.length;
+        int findlen = find.length;
+        byte first = find[0];
+        int pos = srcOff;
+        while (!found) {
+            //find the first byte
+            while (pos < srclen){
+                if (first == src[pos])
+                    break;
+                pos++;
+            }
+            if (pos >= srclen)
+                return -1;
+
+            //we found the first character
+            //match the rest of the bytes - they have to match
+            if ( (srclen - pos) < findlen)
+                return -1;
+            //assume it does exist
+            found = true;
+            for (int i = 1; ( (i < findlen) && found); i++)
+                found = found && (find[i] == src[pos + i]);
+            if (found)
+                result = pos;
+            else if ( (srclen - pos) < findlen)
+                return -1; //no more matches possible
+            else
+                pos++;
+        }
+        return result;
+    }
+
+    
+    public static Serializable deserialize(byte[] data) 
+        throws IOException, ClassNotFoundException, ClassCastException {
+        return deserialize(data,0,data.length);
+    }
+    
+    public static Serializable deserialize(byte[] data, int offset, int length)  
+        throws IOException, ClassNotFoundException, ClassCastException {
+        return deserialize(data,offset,length,null);     
+    }
+    public static int invokecount = 0;
+    public static Serializable deserialize(byte[] data, int offset, int length, ClassLoader[] cls) 
+        throws IOException, ClassNotFoundException, ClassCastException {
+        synchronized (XByteBuffer.class) { invokecount++;}
+        Object message = null;
+        if ( cls == null ) cls = new ClassLoader[0];
+        if (data != null) {
+            InputStream  instream = new ByteArrayInputStream(data,offset,length);
+            ObjectInputStream stream = null;
+            stream = (cls.length>0)? new ReplicationStream(instream,cls):new ObjectInputStream(instream);
+            message = stream.readObject();
+            instream.close();
+            stream.close();
+        }
+        if ( message == null ) {
+            return null;
+        } else if (message instanceof Serializable)
+            return (Serializable) message;
+        else {
+            throw new ClassCastException("Message has the wrong class. It should implement Serializable, instead it is:"+message.getClass().getName());
+        }
+    }
+
+    /**
+     * Serializes a message into cluster data
+     * @param msg ClusterMessage
+     * @param compress boolean
+     * @return 
+     * @throws IOException
+     */
+    public static byte[] serialize(Serializable msg) throws IOException {
+        ByteArrayOutputStream outs = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(outs);
+        out.writeObject(msg);
+        out.flush();
+        byte[] data = outs.toByteArray();
+        return data;
+    }
+
+    public void setDiscard(boolean discard) {
+        this.discard = discard;
+    }
+
+    public boolean getDiscard() {
+        return discard;
+    }
+
+}