-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements. See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License. You may obtain a copy of the License at\r
- * \r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- * \r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-package org.apache.catalina.tribes.io;\r
-\r
-import java.util.Arrays;\r
-\r
-import org.apache.catalina.tribes.ChannelMessage;\r
-import org.apache.catalina.tribes.Member;\r
-import org.apache.catalina.tribes.membership.MemberImpl;\r
-import org.apache.catalina.tribes.util.UUIDGenerator;\r
-import org.apache.catalina.tribes.Channel;\r
-import java.sql.Timestamp;\r
-\r
-/**\r
- * The <code>ChannelData</code> object is used to transfer a message through the \r
- * channel interceptor stack and eventually out on a transport to be sent \r
- * to another node. While the message is being processed by the different \r
- * interceptors, the message data can be manipulated as each interceptor seems appropriate.\r
- * @author Peter Rossbach\r
- * @author Filip Hanik\r
- * @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $\r
- * \r
- */\r
-public class ChannelData implements ChannelMessage {\r
- public static ChannelData[] EMPTY_DATA_ARRAY = new ChannelData[0];\r
- \r
- public static boolean USE_SECURE_RANDOM_FOR_UUID = false;\r
- \r
- /**\r
- * The options this message was sent with\r
- */\r
- private int options = 0 ;\r
- /**\r
- * The message data, stored in a dynamic buffer\r
- */\r
- private XByteBuffer message ;\r
- /**\r
- * The timestamp that goes with this message\r
- */\r
- private long timestamp ;\r
- /**\r
- * A unique message id\r
- */\r
- private byte[] uniqueId ;\r
- /**\r
- * The source or reply-to address for this message\r
- */\r
- private Member address;\r
-\r
- /**\r
- * Creates an empty channel data with a new unique Id\r
- * @see #ChannelData(boolean)\r
- */\r
- public ChannelData() {\r
- this(true);\r
- }\r
- \r
- /**\r
- * Create an empty channel data object\r
- * @param generateUUID boolean - if true, a unique Id will be generated\r
- */\r
- public ChannelData(boolean generateUUID) {\r
- if ( generateUUID ) generateUUID();\r
- }\r
- \r
- \r
- \r
- /**\r
- * Creates a new channel data object with data\r
- * @param uniqueId - unique message id\r
- * @param message - message data\r
- * @param timestamp - message timestamp\r
- */\r
- public ChannelData(byte[] uniqueId, XByteBuffer message, long timestamp) {\r
- this.uniqueId = uniqueId;\r
- this.message = message;\r
- this.timestamp = timestamp;\r
- }\r
- \r
- /**\r
- * @return Returns the message byte buffer\r
- */\r
- public XByteBuffer getMessage() {\r
- return message;\r
- }\r
- /**\r
- * @param message The message to send.\r
- */\r
- public void setMessage(XByteBuffer message) {\r
- this.message = message;\r
- }\r
- /**\r
- * @return Returns the timestamp.\r
- */\r
- public long getTimestamp() {\r
- return timestamp;\r
- }\r
- /**\r
- * @param timestamp The timestamp to send\r
- */\r
- public void setTimestamp(long timestamp) {\r
- this.timestamp = timestamp;\r
- }\r
- /**\r
- * @return Returns the uniqueId.\r
- */\r
- public byte[] getUniqueId() {\r
- return uniqueId;\r
- }\r
- /**\r
- * @param uniqueId The uniqueId to send.\r
- */\r
- public void setUniqueId(byte[] uniqueId) {\r
- this.uniqueId = uniqueId;\r
- }\r
- /**\r
- * @return returns the message options \r
- * see org.apache.catalina.tribes.Channel#sendMessage(org.apache.catalina.tribes.Member[], java.io.Serializable, int)\r
- * \r
- */\r
- public int getOptions() {\r
- return options;\r
- }\r
- /**\r
- * @param sets the message options\r
- */\r
- public void setOptions(int options) {\r
- this.options = options;\r
- }\r
- \r
- /**\r
- * Returns the source or reply-to address\r
- * @return Member\r
- */\r
- public Member getAddress() {\r
- return address;\r
- }\r
-\r
- /**\r
- * Sets the source or reply-to address\r
- * @param address Member\r
- */\r
- public void setAddress(Member address) {\r
- this.address = address;\r
- }\r
- \r
- /**\r
- * Generates a UUID and invokes setUniqueId\r
- */\r
- public void generateUUID() {\r
- byte[] data = new byte[16];\r
- UUIDGenerator.randomUUID(USE_SECURE_RANDOM_FOR_UUID,data,0);\r
- setUniqueId(data);\r
- }\r
-\r
- public int getDataPackageLength() {\r
- int length = \r
- 4 + //options\r
- 8 + //timestamp off=4\r
- 4 + //unique id length off=12\r
- uniqueId.length+ //id data off=12+uniqueId.length\r
- 4 + //addr length off=12+uniqueId.length+4\r
- ((MemberImpl)address).getDataLength()+ //member data off=12+uniqueId.length+4+add.length\r
- 4 + //message length off=12+uniqueId.length+4+add.length+4\r
- message.getLength();\r
- return length;\r
-\r
- }\r
- \r
- /**\r
- * Serializes the ChannelData object into a byte[] array\r
- * @return byte[]\r
- */\r
- public byte[] getDataPackage() {\r
- int length = getDataPackageLength();\r
- byte[] data = new byte[length];\r
- int offset = 0;\r
- return getDataPackage(data,offset);\r
- }\r
-\r
- public byte[] getDataPackage(byte[] data, int offset) {\r
- byte[] addr = ((MemberImpl)address).getData(false);\r
- XByteBuffer.toBytes(options,data,offset);\r
- offset += 4; //options\r
- XByteBuffer.toBytes(timestamp,data,offset);\r
- offset += 8; //timestamp\r
- XByteBuffer.toBytes(uniqueId.length,data,offset);\r
- offset += 4; //uniqueId.length\r
- System.arraycopy(uniqueId,0,data,offset,uniqueId.length);\r
- offset += uniqueId.length; //uniqueId data\r
- XByteBuffer.toBytes(addr.length,data,offset);\r
- offset += 4; //addr.length\r
- System.arraycopy(addr,0,data,offset,addr.length);\r
- offset += addr.length; //addr data\r
- XByteBuffer.toBytes(message.getLength(),data,offset);\r
- offset += 4; //message.length\r
- System.arraycopy(message.getBytesDirect(),0,data,offset,message.getLength());\r
- offset += message.getLength(); //message data\r
- return data;\r
- }\r
- \r
- /**\r
- * Deserializes a ChannelData object from a byte array\r
- * @param b byte[]\r
- * @return ChannelData\r
- */\r
- public static ChannelData getDataFromPackage(XByteBuffer xbuf) {\r
- ChannelData data = new ChannelData(false);\r
- int offset = 0;\r
- data.setOptions(XByteBuffer.toInt(xbuf.getBytesDirect(),offset));\r
- offset += 4; //options\r
- data.setTimestamp(XByteBuffer.toLong(xbuf.getBytesDirect(),offset));\r
- offset += 8; //timestamp\r
- data.uniqueId = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];\r
- offset += 4; //uniqueId length\r
- System.arraycopy(xbuf.getBytesDirect(),offset,data.uniqueId,0,data.uniqueId.length);\r
- offset += data.uniqueId.length; //uniqueId data\r
- byte[] addr = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];\r
- offset += 4; //addr length\r
- System.arraycopy(xbuf.getBytesDirect(),offset,addr,0,addr.length);\r
- data.setAddress(MemberImpl.getMember(addr));\r
- offset += addr.length; //addr data\r
- int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(),offset);\r
- offset += 4; //xsize length\r
- System.arraycopy(xbuf.getBytesDirect(),offset,xbuf.getBytesDirect(),0,xsize);\r
- xbuf.setLength(xsize);\r
- data.message = xbuf;\r
- return data;\r
-\r
- }\r
-\r
- public static ChannelData getDataFromPackage(byte[] b) {\r
- ChannelData data = new ChannelData(false);\r
- int offset = 0;\r
- data.setOptions(XByteBuffer.toInt(b,offset));\r
- offset += 4; //options\r
- data.setTimestamp(XByteBuffer.toLong(b,offset));\r
- offset += 8; //timestamp\r
- data.uniqueId = new byte[XByteBuffer.toInt(b,offset)];\r
- offset += 4; //uniqueId length\r
- System.arraycopy(b,offset,data.uniqueId,0,data.uniqueId.length);\r
- offset += data.uniqueId.length; //uniqueId data\r
- byte[] addr = new byte[XByteBuffer.toInt(b,offset)];\r
- offset += 4; //addr length\r
- System.arraycopy(b,offset,addr,0,addr.length);\r
- data.setAddress(MemberImpl.getMember(addr));\r
- offset += addr.length; //addr data\r
- int xsize = XByteBuffer.toInt(b,offset);\r
- //data.message = new XByteBuffer(new byte[xsize],false);\r
- data.message = BufferPool.getBufferPool().getBuffer(xsize,false);\r
- offset += 4; //message length\r
- System.arraycopy(b,offset,data.message.getBytesDirect(),0,xsize);\r
- data.message.append(b,offset,xsize);\r
- offset += xsize; //message data\r
- return data;\r
- }\r
- \r
- public int hashCode() {\r
- return XByteBuffer.toInt(getUniqueId(),0);\r
- }\r
- \r
- /**\r
- * Compares to ChannelData objects, only compares on getUniqueId().equals(o.getUniqueId())\r
- * @param o Object\r
- * @return boolean\r
- */\r
- public boolean equals(Object o) {\r
- if ( o instanceof ChannelData ) {\r
- return Arrays.equals(getUniqueId(),((ChannelData)o).getUniqueId());\r
- } else return false;\r
- }\r
- \r
- /**\r
- * Create a shallow clone, only the data gets recreated\r
- * @return ClusterData\r
- */\r
- public Object clone() {\r
-// byte[] d = this.getDataPackage();\r
-// return ClusterData.getDataFromPackage(d);\r
- ChannelData clone = new ChannelData(false);\r
- clone.options = this.options;\r
- clone.message = new XByteBuffer(this.message.getBytesDirect(),false);\r
- clone.timestamp = this.timestamp;\r
- clone.uniqueId = this.uniqueId;\r
- clone.address = this.address;\r
- return clone;\r
- }\r
- \r
- /**\r
- * Complete clone\r
- * @return ClusterData\r
- */\r
- public Object deepclone() {\r
- byte[] d = this.getDataPackage();\r
- return ChannelData.getDataFromPackage(d);\r
- }\r
- \r
- /**\r
- * Utility method, returns true if the options flag indicates that an ack\r
- * is to be sent after the message has been received and processed\r
- * @param options int - the options for the message\r
- * @return boolean \r
- * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_USE_ACK\r
- * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_SYNCHRONIZED_ACK\r
- */\r
- public static boolean sendAckSync(int options) {\r
- return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) &&\r
- ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) == Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);\r
- }\r
-\r
-\r
- /**\r
- * Utility method, returns true if the options flag indicates that an ack\r
- * is to be sent after the message has been received but not yet processed\r
- * @param options int - the options for the message\r
- * @return boolean \r
- * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_USE_ACK\r
- * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_SYNCHRONIZED_ACK\r
- */\r
- public static boolean sendAckAsync(int options) {\r
- return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) &&\r
- ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) != Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);\r
- }\r
- \r
- public String toString() {\r
- StringBuffer buf = new StringBuffer();\r
- buf.append("ClusterData[src=");\r
- buf.append(getAddress()).append("; id=");\r
- buf.append(bToS(getUniqueId())).append("; sent=");\r
- buf.append(new Timestamp(this.getTimestamp()).toString()).append("]");\r
- return buf.toString();\r
- }\r
- \r
- public static String bToS(byte[] data) {\r
- StringBuffer buf = new StringBuffer(4*16);\r
- buf.append("{");\r
- for (int i=0; data!=null && i<data.length; i++ ) buf.append(String.valueOf(data[i])).append(" ");\r
- buf.append("}");\r
- return buf.toString();\r
- }\r
-\r
- \r
-}\r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import java.util.Arrays;
+
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+import org.apache.catalina.tribes.Channel;
+import java.sql.Timestamp;
+
+/**
+ * The <code>ChannelData</code> object is used to transfer a message through the
+ * channel interceptor stack and eventually out on a transport to be sent
+ * to another node. While the message is being processed by the different
+ * interceptors, the message data can be manipulated as each interceptor seems appropriate.
+ * @author Peter Rossbach
+ * @author Filip Hanik
+ * @version $Revision$ $Date$
+ *
+ */
+public class ChannelData implements ChannelMessage {
+ public static ChannelData[] EMPTY_DATA_ARRAY = new ChannelData[0];
+
+ public static boolean USE_SECURE_RANDOM_FOR_UUID = false;
+
+ /**
+ * The options this message was sent with
+ */
+ private int options = 0 ;
+ /**
+ * The message data, stored in a dynamic buffer
+ */
+ private XByteBuffer message ;
+ /**
+ * The timestamp that goes with this message
+ */
+ private long timestamp ;
+ /**
+ * A unique message id
+ */
+ private byte[] uniqueId ;
+ /**
+ * The source or reply-to address for this message
+ */
+ private Member address;
+
+ /**
+ * Creates an empty channel data with a new unique Id
+ * @see #ChannelData(boolean)
+ */
+ public ChannelData() {
+ this(true);
+ }
+
+ /**
+ * Create an empty channel data object
+ * @param generateUUID boolean - if true, a unique Id will be generated
+ */
+ public ChannelData(boolean generateUUID) {
+ if ( generateUUID ) generateUUID();
+ }
+
+
+
+ /**
+ * Creates a new channel data object with data
+ * @param uniqueId - unique message id
+ * @param message - message data
+ * @param timestamp - message timestamp
+ */
+ public ChannelData(byte[] uniqueId, XByteBuffer message, long timestamp) {
+ this.uniqueId = uniqueId;
+ this.message = message;
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * @return Returns the message byte buffer
+ */
+ public XByteBuffer getMessage() {
+ return message;
+ }
+ /**
+ * @param message The message to send.
+ */
+ public void setMessage(XByteBuffer message) {
+ this.message = message;
+ }
+ /**
+ * @return Returns the timestamp.
+ */
+ public long getTimestamp() {
+ return timestamp;
+ }
+ /**
+ * @param timestamp The timestamp to send
+ */
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+ /**
+ * @return Returns the uniqueId.
+ */
+ public byte[] getUniqueId() {
+ return uniqueId;
+ }
+ /**
+ * @param uniqueId The uniqueId to send.
+ */
+ public void setUniqueId(byte[] uniqueId) {
+ this.uniqueId = uniqueId;
+ }
+ /**
+ * @return returns the message options
+ * see org.apache.catalina.tribes.Channel#sendMessage(org.apache.catalina.tribes.Member[], java.io.Serializable, int)
+ *
+ */
+ public int getOptions() {
+ return options;
+ }
+ /**
+ * @param sets the message options
+ */
+ public void setOptions(int options) {
+ this.options = options;
+ }
+
+ /**
+ * Returns the source or reply-to address
+ * @return Member
+ */
+ public Member getAddress() {
+ return address;
+ }
+
+ /**
+ * Sets the source or reply-to address
+ * @param address Member
+ */
+ public void setAddress(Member address) {
+ this.address = address;
+ }
+
+ /**
+ * Generates a UUID and invokes setUniqueId
+ */
+ public void generateUUID() {
+ byte[] data = new byte[16];
+ UUIDGenerator.randomUUID(USE_SECURE_RANDOM_FOR_UUID,data,0);
+ setUniqueId(data);
+ }
+
+ public int getDataPackageLength() {
+ int length =
+ 4 + //options
+ 8 + //timestamp off=4
+ 4 + //unique id length off=12
+ uniqueId.length+ //id data off=12+uniqueId.length
+ 4 + //addr length off=12+uniqueId.length+4
+ ((MemberImpl)address).getDataLength()+ //member data off=12+uniqueId.length+4+add.length
+ 4 + //message length off=12+uniqueId.length+4+add.length+4
+ message.getLength();
+ return length;
+
+ }
+
+ /**
+ * Serializes the ChannelData object into a byte[] array
+ * @return byte[]
+ */
+ public byte[] getDataPackage() {
+ int length = getDataPackageLength();
+ byte[] data = new byte[length];
+ int offset = 0;
+ return getDataPackage(data,offset);
+ }
+
+ public byte[] getDataPackage(byte[] data, int offset) {
+ byte[] addr = ((MemberImpl)address).getData(false);
+ XByteBuffer.toBytes(options,data,offset);
+ offset += 4; //options
+ XByteBuffer.toBytes(timestamp,data,offset);
+ offset += 8; //timestamp
+ XByteBuffer.toBytes(uniqueId.length,data,offset);
+ offset += 4; //uniqueId.length
+ System.arraycopy(uniqueId,0,data,offset,uniqueId.length);
+ offset += uniqueId.length; //uniqueId data
+ XByteBuffer.toBytes(addr.length,data,offset);
+ offset += 4; //addr.length
+ System.arraycopy(addr,0,data,offset,addr.length);
+ offset += addr.length; //addr data
+ XByteBuffer.toBytes(message.getLength(),data,offset);
+ offset += 4; //message.length
+ System.arraycopy(message.getBytesDirect(),0,data,offset,message.getLength());
+ offset += message.getLength(); //message data
+ return data;
+ }
+
+ /**
+ * Deserializes a ChannelData object from a byte array
+ * @param b byte[]
+ * @return ChannelData
+ */
+ public static ChannelData getDataFromPackage(XByteBuffer xbuf) {
+ ChannelData data = new ChannelData(false);
+ int offset = 0;
+ data.setOptions(XByteBuffer.toInt(xbuf.getBytesDirect(),offset));
+ offset += 4; //options
+ data.setTimestamp(XByteBuffer.toLong(xbuf.getBytesDirect(),offset));
+ offset += 8; //timestamp
+ data.uniqueId = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];
+ offset += 4; //uniqueId length
+ System.arraycopy(xbuf.getBytesDirect(),offset,data.uniqueId,0,data.uniqueId.length);
+ offset += data.uniqueId.length; //uniqueId data
+ byte[] addr = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];
+ offset += 4; //addr length
+ System.arraycopy(xbuf.getBytesDirect(),offset,addr,0,addr.length);
+ data.setAddress(MemberImpl.getMember(addr));
+ offset += addr.length; //addr data
+ int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(),offset);
+ offset += 4; //xsize length
+ System.arraycopy(xbuf.getBytesDirect(),offset,xbuf.getBytesDirect(),0,xsize);
+ xbuf.setLength(xsize);
+ data.message = xbuf;
+ return data;
+
+ }
+
+ public static ChannelData getDataFromPackage(byte[] b) {
+ ChannelData data = new ChannelData(false);
+ int offset = 0;
+ data.setOptions(XByteBuffer.toInt(b,offset));
+ offset += 4; //options
+ data.setTimestamp(XByteBuffer.toLong(b,offset));
+ offset += 8; //timestamp
+ data.uniqueId = new byte[XByteBuffer.toInt(b,offset)];
+ offset += 4; //uniqueId length
+ System.arraycopy(b,offset,data.uniqueId,0,data.uniqueId.length);
+ offset += data.uniqueId.length; //uniqueId data
+ byte[] addr = new byte[XByteBuffer.toInt(b,offset)];
+ offset += 4; //addr length
+ System.arraycopy(b,offset,addr,0,addr.length);
+ data.setAddress(MemberImpl.getMember(addr));
+ offset += addr.length; //addr data
+ int xsize = XByteBuffer.toInt(b,offset);
+ //data.message = new XByteBuffer(new byte[xsize],false);
+ data.message = BufferPool.getBufferPool().getBuffer(xsize,false);
+ offset += 4; //message length
+ System.arraycopy(b,offset,data.message.getBytesDirect(),0,xsize);
+ data.message.append(b,offset,xsize);
+ offset += xsize; //message data
+ return data;
+ }
+
+ public int hashCode() {
+ return XByteBuffer.toInt(getUniqueId(),0);
+ }
+
+ /**
+ * Compares to ChannelData objects, only compares on getUniqueId().equals(o.getUniqueId())
+ * @param o Object
+ * @return boolean
+ */
+ public boolean equals(Object o) {
+ if ( o instanceof ChannelData ) {
+ return Arrays.equals(getUniqueId(),((ChannelData)o).getUniqueId());
+ } else return false;
+ }
+
+ /**
+ * Create a shallow clone, only the data gets recreated
+ * @return ClusterData
+ */
+ public Object clone() {
+// byte[] d = this.getDataPackage();
+// return ClusterData.getDataFromPackage(d);
+ ChannelData clone = new ChannelData(false);
+ clone.options = this.options;
+ clone.message = new XByteBuffer(this.message.getBytesDirect(),false);
+ clone.timestamp = this.timestamp;
+ clone.uniqueId = this.uniqueId;
+ clone.address = this.address;
+ return clone;
+ }
+
+ /**
+ * Complete clone
+ * @return ClusterData
+ */
+ public Object deepclone() {
+ byte[] d = this.getDataPackage();
+ return ChannelData.getDataFromPackage(d);
+ }
+
+ /**
+ * Utility method, returns true if the options flag indicates that an ack
+ * is to be sent after the message has been received and processed
+ * @param options int - the options for the message
+ * @return boolean
+ * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_USE_ACK
+ * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_SYNCHRONIZED_ACK
+ */
+ public static boolean sendAckSync(int options) {
+ return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) &&
+ ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) == Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+ }
+
+
+ /**
+ * Utility method, returns true if the options flag indicates that an ack
+ * is to be sent after the message has been received but not yet processed
+ * @param options int - the options for the message
+ * @return boolean
+ * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_USE_ACK
+ * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_SYNCHRONIZED_ACK
+ */
+ public static boolean sendAckAsync(int options) {
+ return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) &&
+ ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) != Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("ClusterData[src=");
+ buf.append(getAddress()).append("; id=");
+ buf.append(bToS(getUniqueId())).append("; sent=");
+ buf.append(new Timestamp(this.getTimestamp()).toString()).append("]");
+ return buf.toString();
+ }
+
+ public static String bToS(byte[] data) {
+ StringBuffer buf = new StringBuffer(4*16);
+ buf.append("{");
+ for (int i=0; data!=null && i<data.length; i++ ) buf.append(String.valueOf(data[i])).append(" ");
+ buf.append("}");
+ return buf.toString();
+ }
+
+
+}
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements. See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License. You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-\r
-package org.apache.catalina.tribes.io;\r
-\r
-import java.io.IOException;\r
-import java.io.OutputStream;\r
-\r
-/**\r
- * Byte array output stream that exposes the byte array directly\r
- *\r
- * @author not attributable\r
- * @version 1.0\r
- */\r
-public class DirectByteArrayOutputStream extends OutputStream {\r
- \r
- private XByteBuffer buffer;\r
- \r
- public DirectByteArrayOutputStream(int size) {\r
- buffer = new XByteBuffer(size,false);\r
- }\r
-\r
- /**\r
- * Writes the specified byte to this output stream.\r
- *\r
- * @param b the <code>byte</code>.\r
- * @throws IOException if an I/O error occurs. In particular, an\r
- * <code>IOException</code> may be thrown if the output stream has\r
- * been closed.\r
- * @todo Implement this java.io.OutputStream method\r
- */\r
- public void write(int b) throws IOException {\r
- buffer.append((byte)b);\r
- }\r
- \r
- public int size() {\r
- return buffer.getLength();\r
- }\r
- \r
- public byte[] getArrayDirect() {\r
- return buffer.getBytesDirect();\r
- }\r
- \r
- public byte[] getArray() {\r
- return buffer.getBytes();\r
- }\r
-\r
-\r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Byte array output stream that exposes the byte array directly
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class DirectByteArrayOutputStream extends OutputStream {
+
+ private XByteBuffer buffer;
+
+ public DirectByteArrayOutputStream(int size) {
+ buffer = new XByteBuffer(size,false);
+ }
+
+ /**
+ * Writes the specified byte to this output stream.
+ *
+ * @param b the <code>byte</code>.
+ * @throws IOException if an I/O error occurs. In particular, an
+ * <code>IOException</code> may be thrown if the output stream has
+ * been closed.
+ * @todo Implement this java.io.OutputStream method
+ */
+ public void write(int b) throws IOException {
+ buffer.append((byte)b);
+ }
+
+ public int size() {
+ return buffer.getLength();
+ }
+
+ public byte[] getArrayDirect() {
+ return buffer.getBytesDirect();
+ }
+
+ public byte[] getArray() {
+ return buffer.getBytes();
+ }
+
+
}
\ No newline at end of file
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements. See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License. You may obtain a copy of the License at\r
- * \r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- * \r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-\r
-package org.apache.catalina.tribes.io;\r
-\r
-import org.apache.catalina.tribes.ChannelMessage;\r
-\r
-\r
-\r
-/**\r
- * Internal interface, similar to the MessageListener but used \r
- * at the IO base\r
- * The listen callback interface is used by the replication system\r
- * when data has been received. The interface does not care about\r
- * objects and marshalling and just passes the bytes straight through.\r
- * @author Filip Hanik\r
- * @version $Revision: 303987 $, $Date: 2005-07-08 15:50:30 -0500 (Fri, 08 Jul 2005) $\r
- */\r
-public interface ListenCallback\r
-{\r
- /**\r
- * This method is invoked on the callback object to notify it that new data has\r
- * been received from one of the cluster nodes.\r
- * @param data - the message bytes received from the cluster/replication system\r
- */\r
- public void messageDataReceived(ChannelMessage data);\r
- \r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.io;
+
+import org.apache.catalina.tribes.ChannelMessage;
+
+
+
+/**
+ * Internal interface, similar to the MessageListener but used
+ * at the IO base
+ * The listen callback interface is used by the replication system
+ * when data has been received. The interface does not care about
+ * objects and marshalling and just passes the bytes straight through.
+ * @author Filip Hanik
+ * @version $Revision$, $Date$
+ */
+public interface ListenCallback
+{
+ /**
+ * This method is invoked on the callback object to notify it that new data has
+ * been received from one of the cluster nodes.
+ * @param data - the message bytes received from the cluster/replication system
+ */
+ public void messageDataReceived(ChannelMessage data);
+
}
\ No newline at end of file
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements. See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License. You may obtain a copy of the License at\r
- * \r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- * \r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-package org.apache.catalina.tribes.io;\r
-\r
-import java.io.IOException;\r
-import java.net.Socket;\r
-import java.nio.ByteBuffer;\r
-import java.nio.channels.SocketChannel;\r
-\r
-import org.apache.catalina.tribes.ChannelMessage;\r
-\r
-\r
-\r
-/**\r
- * The object reader object is an object used in conjunction with\r
- * java.nio TCP messages. This object stores the message bytes in a\r
- * <code>XByteBuffer</code> until a full package has been received.\r
- * This object uses an XByteBuffer which is an extendable object buffer that also allows\r
- * for message encoding and decoding.\r
- *\r
- * @author Filip Hanik\r
- * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $\r
- */\r
-public class ObjectReader {\r
-\r
- protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ObjectReader.class);\r
-\r
- private XByteBuffer buffer;\r
- \r
- protected long lastAccess = System.currentTimeMillis();\r
- \r
- protected boolean accessed = false;\r
- private boolean cancelled;\r
-\r
- /**\r
- * Creates an <code>ObjectReader</code> for a TCP NIO socket channel\r
- * @param channel - the channel to be read.\r
- */\r
- public ObjectReader(SocketChannel channel) {\r
- this(channel.socket());\r
- }\r
- \r
- /**\r
- * Creates an <code>ObjectReader</code> for a TCP socket\r
- * @param socket Socket\r
- */\r
- public ObjectReader(Socket socket) {\r
- try{\r
- this.buffer = new XByteBuffer(socket.getReceiveBufferSize(), true);\r
- }catch ( IOException x ) {\r
- //unable to get buffer size\r
- log.warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes.");\r
- this.buffer = new XByteBuffer(43800,true);\r
- }\r
- }\r
- \r
- public synchronized void access() {\r
- this.accessed = true;\r
- this.lastAccess = System.currentTimeMillis();\r
- }\r
- \r
- public synchronized void finish() {\r
- this.accessed = false;\r
- this.lastAccess = System.currentTimeMillis();\r
- }\r
- \r
- public boolean isAccessed() {\r
- return this.accessed;\r
- }\r
-\r
- /**\r
- * Append new bytes to buffer. \r
- * @see XByteBuffer#countPackages()\r
- * @param data new transfer buffer\r
- * @param off offset\r
- * @param len length in buffer\r
- * @return number of messages that sended to callback\r
- * @throws java.io.IOException\r
- */\r
- public int append(ByteBuffer data, int len, boolean count) throws java.io.IOException {\r
- buffer.append(data,len);\r
- int pkgCnt = -1;\r
- if ( count ) pkgCnt = buffer.countPackages();\r
- return pkgCnt;\r
- }\r
-\r
- public int append(byte[] data,int off,int len, boolean count) throws java.io.IOException {\r
- buffer.append(data,off,len);\r
- int pkgCnt = -1;\r
- if ( count ) pkgCnt = buffer.countPackages();\r
- return pkgCnt;\r
- }\r
-\r
- /**\r
- * Send buffer to cluster listener (callback).\r
- * Is message complete receiver send message to callback?\r
- *\r
- * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#messageDataReceived(ChannelMessage)\r
- * @see XByteBuffer#doesPackageExist()\r
- * @see XByteBuffer#extractPackage(boolean)\r
- *\r
- * @return number of received packages/messages\r
- * @throws java.io.IOException\r
- */\r
- public ChannelMessage[] execute() throws java.io.IOException {\r
- int pkgCnt = buffer.countPackages();\r
- ChannelMessage[] result = new ChannelMessage[pkgCnt];\r
- for (int i=0; i<pkgCnt; i++) {\r
- ChannelMessage data = buffer.extractPackage(true);\r
- result[i] = data;\r
- }\r
- return result;\r
- }\r
- \r
- public int bufferSize() {\r
- return buffer.getLength();\r
- }\r
- \r
-\r
- public boolean hasPackage() {\r
- return buffer.countPackages(true)>0;\r
- }\r
- /**\r
- * Returns the number of packages that the reader has read\r
- * @return int\r
- */\r
- public int count() {\r
- return buffer.countPackages();\r
- }\r
- \r
- public void close() {\r
- this.buffer = null;\r
- }\r
-\r
- public long getLastAccess() {\r
- return lastAccess;\r
- }\r
-\r
- public boolean isCancelled() {\r
- return cancelled;\r
- }\r
-\r
- public void setLastAccess(long lastAccess) {\r
- this.lastAccess = lastAccess;\r
- }\r
-\r
- public void setCancelled(boolean cancelled) {\r
- this.cancelled = cancelled;\r
- }\r
-\r
-}\r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.catalina.tribes.ChannelMessage;
+
+
+
+/**
+ * The object reader object is an object used in conjunction with
+ * java.nio TCP messages. This object stores the message bytes in a
+ * <code>XByteBuffer</code> until a full package has been received.
+ * This object uses an XByteBuffer which is an extendable object buffer that also allows
+ * for message encoding and decoding.
+ *
+ * @author Filip Hanik
+ * @version $Revision$, $Date$
+ */
+public class ObjectReader {
+
+ protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ObjectReader.class);
+
+ private XByteBuffer buffer;
+
+ protected long lastAccess = System.currentTimeMillis();
+
+ protected boolean accessed = false;
+ private boolean cancelled;
+
+ /**
+ * Creates an <code>ObjectReader</code> for a TCP NIO socket channel
+ * @param channel - the channel to be read.
+ */
+ public ObjectReader(SocketChannel channel) {
+ this(channel.socket());
+ }
+
+ /**
+ * Creates an <code>ObjectReader</code> for a TCP socket
+ * @param socket Socket
+ */
+ public ObjectReader(Socket socket) {
+ try{
+ this.buffer = new XByteBuffer(socket.getReceiveBufferSize(), true);
+ }catch ( IOException x ) {
+ //unable to get buffer size
+ log.warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes.");
+ this.buffer = new XByteBuffer(43800,true);
+ }
+ }
+
+ public synchronized void access() {
+ this.accessed = true;
+ this.lastAccess = System.currentTimeMillis();
+ }
+
+ public synchronized void finish() {
+ this.accessed = false;
+ this.lastAccess = System.currentTimeMillis();
+ }
+
+ public boolean isAccessed() {
+ return this.accessed;
+ }
+
+ /**
+ * Append new bytes to buffer.
+ * @see XByteBuffer#countPackages()
+ * @param data new transfer buffer
+ * @param off offset
+ * @param len length in buffer
+ * @return number of messages that sended to callback
+ * @throws java.io.IOException
+ */
+ public int append(ByteBuffer data, int len, boolean count) throws java.io.IOException {
+ buffer.append(data,len);
+ int pkgCnt = -1;
+ if ( count ) pkgCnt = buffer.countPackages();
+ return pkgCnt;
+ }
+
+ public int append(byte[] data,int off,int len, boolean count) throws java.io.IOException {
+ buffer.append(data,off,len);
+ int pkgCnt = -1;
+ if ( count ) pkgCnt = buffer.countPackages();
+ return pkgCnt;
+ }
+
+ /**
+ * Send buffer to cluster listener (callback).
+ * Is message complete receiver send message to callback?
+ *
+ * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#messageDataReceived(ChannelMessage)
+ * @see XByteBuffer#doesPackageExist()
+ * @see XByteBuffer#extractPackage(boolean)
+ *
+ * @return number of received packages/messages
+ * @throws java.io.IOException
+ */
+ public ChannelMessage[] execute() throws java.io.IOException {
+ int pkgCnt = buffer.countPackages();
+ ChannelMessage[] result = new ChannelMessage[pkgCnt];
+ for (int i=0; i<pkgCnt; i++) {
+ ChannelMessage data = buffer.extractPackage(true);
+ result[i] = data;
+ }
+ return result;
+ }
+
+ public int bufferSize() {
+ return buffer.getLength();
+ }
+
+
+ public boolean hasPackage() {
+ return buffer.countPackages(true)>0;
+ }
+ /**
+ * Returns the number of packages that the reader has read
+ * @return int
+ */
+ public int count() {
+ return buffer.countPackages();
+ }
+
+ public void close() {
+ this.buffer = null;
+ }
+
+ public long getLastAccess() {
+ return lastAccess;
+ }
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ public void setLastAccess(long lastAccess) {
+ this.lastAccess = lastAccess;
+ }
+
+ public void setCancelled(boolean cancelled) {
+ this.cancelled = cancelled;
+ }
+
+}
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements. See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License. You may obtain a copy of the License at\r
- * \r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- * \r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-\r
-\r
-package org.apache.catalina.tribes.io;\r
-\r
-import java.io.IOException;\r
-import java.io.InputStream;\r
-import java.io.ObjectInputStream;\r
-import java.io.ObjectStreamClass;\r
-\r
-/**\r
- * Custom subclass of <code>ObjectInputStream</code> that loads from the\r
- * class loader for this web application. This allows classes defined only\r
- * with the web application to be found correctly.\r
- *\r
- * @author Craig R. McClanahan\r
- * @author Bip Thelin\r
- * @author Filip Hanik\r
- * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $\r
- */\r
-\r
-public final class ReplicationStream extends ObjectInputStream {\r
-\r
- \r
- /**\r
- * The class loader we will use to resolve classes.\r
- */\r
- private ClassLoader[] classLoaders = null;\r
- \r
-\r
- /**\r
- * Construct a new instance of CustomObjectInputStream\r
- *\r
- * @param stream The input stream we will read from\r
- * @param classLoader The class loader used to instantiate objects\r
- *\r
- * @exception IOException if an input/output error occurs\r
- */\r
- public ReplicationStream(InputStream stream,\r
- ClassLoader[] classLoaders)\r
- throws IOException {\r
-\r
- super(stream);\r
- this.classLoaders = classLoaders;\r
- }\r
-\r
- /**\r
- * Load the local class equivalent of the specified stream class\r
- * description, by using the class loader assigned to this Context.\r
- *\r
- * @param classDesc Class description from the input stream\r
- *\r
- * @exception ClassNotFoundException if this class cannot be found\r
- * @exception IOException if an input/output error occurs\r
- */\r
- public Class resolveClass(ObjectStreamClass classDesc)\r
- throws ClassNotFoundException, IOException {\r
- String name = classDesc.getName();\r
- boolean tryRepFirst = name.startsWith("org.apache.catalina.tribes");\r
- try {\r
- try\r
- {\r
- if ( tryRepFirst ) return findReplicationClass(name);\r
- else return findExternalClass(name);\r
- }\r
- catch ( Exception x )\r
- {\r
- if ( tryRepFirst ) return findExternalClass(name);\r
- else return findReplicationClass(name);\r
- }\r
- } catch (ClassNotFoundException e) {\r
- return super.resolveClass(classDesc);\r
- }\r
- }\r
- \r
- public Class findReplicationClass(String name)\r
- throws ClassNotFoundException, IOException {\r
- Class clazz = Class.forName(name, false, getClass().getClassLoader());\r
- return clazz;\r
- }\r
-\r
- public Class findExternalClass(String name) throws ClassNotFoundException {\r
- ClassNotFoundException cnfe = null;\r
- for (int i=0; i<classLoaders.length; i++ ) {\r
- try {\r
- Class clazz = Class.forName(name, false, classLoaders[i]);\r
- return clazz;\r
- } catch ( ClassNotFoundException x ) {\r
- cnfe = x;\r
- } \r
- }\r
- if ( cnfe != null ) throw cnfe;\r
- else throw new ClassNotFoundException(name);\r
- }\r
- \r
- public void close() throws IOException {\r
- this.classLoaders = null;\r
- super.close();\r
- }\r
-\r
-\r
-}\r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.tribes.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * Custom subclass of <code>ObjectInputStream</code> that loads from the
+ * class loader for this web application. This allows classes defined only
+ * with the web application to be found correctly.
+ *
+ * @author Craig R. McClanahan
+ * @author Bip Thelin
+ * @author Filip Hanik
+ * @version $Revision$, $Date$
+ */
+
+public final class ReplicationStream extends ObjectInputStream {
+
+
+ /**
+ * The class loader we will use to resolve classes.
+ */
+ private ClassLoader[] classLoaders = null;
+
+
+ /**
+ * Construct a new instance of CustomObjectInputStream
+ *
+ * @param stream The input stream we will read from
+ * @param classLoader The class loader used to instantiate objects
+ *
+ * @exception IOException if an input/output error occurs
+ */
+ public ReplicationStream(InputStream stream,
+ ClassLoader[] classLoaders)
+ throws IOException {
+
+ super(stream);
+ this.classLoaders = classLoaders;
+ }
+
+ /**
+ * Load the local class equivalent of the specified stream class
+ * description, by using the class loader assigned to this Context.
+ *
+ * @param classDesc Class description from the input stream
+ *
+ * @exception ClassNotFoundException if this class cannot be found
+ * @exception IOException if an input/output error occurs
+ */
+ public Class resolveClass(ObjectStreamClass classDesc)
+ throws ClassNotFoundException, IOException {
+ String name = classDesc.getName();
+ boolean tryRepFirst = name.startsWith("org.apache.catalina.tribes");
+ try {
+ try
+ {
+ if ( tryRepFirst ) return findReplicationClass(name);
+ else return findExternalClass(name);
+ }
+ catch ( Exception x )
+ {
+ if ( tryRepFirst ) return findExternalClass(name);
+ else return findReplicationClass(name);
+ }
+ } catch (ClassNotFoundException e) {
+ return super.resolveClass(classDesc);
+ }
+ }
+
+ public Class findReplicationClass(String name)
+ throws ClassNotFoundException, IOException {
+ Class clazz = Class.forName(name, false, getClass().getClassLoader());
+ return clazz;
+ }
+
+ public Class findExternalClass(String name) throws ClassNotFoundException {
+ ClassNotFoundException cnfe = null;
+ for (int i=0; i<classLoaders.length; i++ ) {
+ try {
+ Class clazz = Class.forName(name, false, classLoaders[i]);
+ return clazz;
+ } catch ( ClassNotFoundException x ) {
+ cnfe = x;
+ }
+ }
+ if ( cnfe != null ) throw cnfe;
+ else throw new ClassNotFoundException(name);
+ }
+
+ public void close() throws IOException {
+ this.classLoaders = null;
+ super.close();
+ }
+
+
+}
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements. See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License. You may obtain a copy of the License at\r
- * \r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- * \r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-\r
-package org.apache.catalina.tribes.io;\r
-\r
-import java.io.ByteArrayInputStream;\r
-import java.io.ByteArrayOutputStream;\r
-import java.io.IOException;\r
-import java.io.InputStream;\r
-import java.io.ObjectInputStream;\r
-import java.io.ObjectOutputStream;\r
-import java.io.Serializable;\r
-import java.nio.ByteBuffer;\r
-\r
-/**\r
- * The XByteBuffer provides a dual functionality.\r
- * One, it stores message bytes and automatically extends the byte buffer if needed.<BR>\r
- * Two, it can encode and decode packages so that they can be defined and identified\r
- * as they come in on a socket.\r
- * <br>\r
- * <b>THIS CLASS IS NOT THREAD SAFE</B><BR>\r
- * <br/>\r
- * Transfer package:\r
- * <ul>\r
- * <li><b>START_DATA/b> - 7 bytes - <i>FLT2002</i></li>\r
- * <li><b>SIZE</b> - 4 bytes - size of the data package</li>\r
- * <li><b>DATA</b> - should be as many bytes as the prev SIZE</li>\r
- * <li><b>END_DATA</b> - 7 bytes - <i>TLF2003</i></lI>\r
- * </ul>\r
- * @author Filip Hanik\r
- * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $\r
- */\r
-public class XByteBuffer\r
-{\r
- \r
- public static org.apache.juli.logging.Log log =\r
- org.apache.juli.logging.LogFactory.getLog( XByteBuffer.class );\r
- \r
- /**\r
- * This is a package header, 7 bytes (FLT2002)\r
- */\r
- public static final byte[] START_DATA = {70,76,84,50,48,48,50};\r
- \r
- /**\r
- * This is the package footer, 7 bytes (TLF2003)\r
- */\r
- public static final byte[] END_DATA = {84,76,70,50,48,48,51};\r
- \r
- /**\r
- * Default size on the initial byte buffer\r
- */\r
- private static final int DEF_SIZE = 2048;\r
- \r
- /**\r
- * Default size to extend the buffer with\r
- */\r
- private static final int DEF_EXT = 1024;\r
- \r
- /**\r
- * Variable to hold the data\r
- */\r
- protected byte[] buf = null;\r
- \r
- /**\r
- * Current length of data in the buffer\r
- */\r
- protected int bufSize = 0;\r
- \r
- /**\r
- * Flag for discarding invalid packages\r
- * If this flag is set to true, and append(byte[],...) is called,\r
- * the data added will be inspected, and if it doesn't start with \r
- * <code>START_DATA</code> it will be thrown away.\r
- * \r
- */\r
- protected boolean discard = true;\r
- \r
- /**\r
- * Constructs a new XByteBuffer\r
- * @param size - the initial size of the byte buffer\r
- * @todo use a pool of byte[] for performance\r
- */\r
- public XByteBuffer(int size, boolean discard) {\r
- buf = new byte[size];\r
- this.discard = discard;\r
- }\r
- \r
- public XByteBuffer(byte[] data,boolean discard) {\r
- this(data,data.length+128,discard);\r
- }\r
- \r
- public XByteBuffer(byte[] data, int size,boolean discard) {\r
- int length = Math.max(data.length,size);\r
- buf = new byte[length];\r
- System.arraycopy(data,0,buf,0,data.length);\r
- bufSize = data.length;\r
- this.discard = discard;\r
- }\r
- \r
- public int getLength() {\r
- return bufSize;\r
- }\r
-\r
- public void setLength(int size) {\r
- if ( size > buf.length ) throw new ArrayIndexOutOfBoundsException("Size is larger than existing buffer.");\r
- bufSize = size;\r
- }\r
- \r
- public void trim(int length) {\r
- if ( (bufSize - length) < 0 ) \r
- throw new ArrayIndexOutOfBoundsException("Can't trim more bytes than are available. length:"+bufSize+" trim:"+length);\r
- bufSize -= length;\r
- }\r
- \r
- public void reset() {\r
- bufSize = 0;\r
- }\r
- \r
- public byte[] getBytesDirect() {\r
- return this.buf;\r
- }\r
-\r
- /**\r
- * Returns the bytes in the buffer, in its exact length\r
- */\r
- public byte[] getBytes() {\r
- byte[] b = new byte[bufSize];\r
- System.arraycopy(buf,0,b,0,bufSize);\r
- return b;\r
- }\r
-\r
- /**\r
- * Resets the buffer\r
- */\r
- public void clear() {\r
- bufSize = 0;\r
- }\r
-\r
- /**\r
- * Appends the data to the buffer. If the data is incorrectly formatted, ie, the data should always start with the\r
- * header, false will be returned and the data will be discarded.\r
- * @param b - bytes to be appended\r
- * @param off - the offset to extract data from\r
- * @param len - the number of bytes to append.\r
- * @return true if the data was appended correctly. Returns false if the package is incorrect, ie missing header or something, or the length of data is 0\r
- */\r
- public boolean append(ByteBuffer b, int len) {\r
- int newcount = bufSize + len;\r
- if (newcount > buf.length) {\r
- expand(newcount);\r
- }\r
- b.get(buf,bufSize,len);\r
- \r
- bufSize = newcount;\r
- \r
- if ( discard ) {\r
- if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {\r
- bufSize = 0;\r
- log.error("Discarded the package, invalid header");\r
- return false;\r
- }\r
- }\r
- return true;\r
-\r
- }\r
- \r
- public boolean append(byte i) {\r
- int newcount = bufSize + 1;\r
- if (newcount > buf.length) {\r
- expand(newcount);\r
- }\r
- buf[bufSize] = i;\r
- bufSize = newcount;\r
- return true;\r
- }\r
-\r
-\r
- public boolean append(boolean i) {\r
- int newcount = bufSize + 1;\r
- if (newcount > buf.length) {\r
- expand(newcount);\r
- }\r
- XByteBuffer.toBytes(i,buf,bufSize);\r
- bufSize = newcount;\r
- return true;\r
- }\r
-\r
- public boolean append(long i) {\r
- int newcount = bufSize + 8;\r
- if (newcount > buf.length) {\r
- expand(newcount);\r
- }\r
- XByteBuffer.toBytes(i,buf,bufSize);\r
- bufSize = newcount;\r
- return true;\r
- }\r
- \r
- public boolean append(int i) {\r
- int newcount = bufSize + 4;\r
- if (newcount > buf.length) {\r
- expand(newcount);\r
- }\r
- XByteBuffer.toBytes(i,buf,bufSize);\r
- bufSize = newcount;\r
- return true;\r
- }\r
-\r
- public boolean append(byte[] b, int off, int len) {\r
- if ((off < 0) || (off > b.length) || (len < 0) ||\r
- ((off + len) > b.length) || ((off + len) < 0)) {\r
- throw new IndexOutOfBoundsException();\r
- } else if (len == 0) {\r
- return false;\r
- }\r
-\r
- int newcount = bufSize + len;\r
- if (newcount > buf.length) {\r
- expand(newcount);\r
- }\r
- System.arraycopy(b, off, buf, bufSize, len);\r
- bufSize = newcount;\r
-\r
- if ( discard ) {\r
- if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {\r
- bufSize = 0;\r
- log.error("Discarded the package, invalid header");\r
- return false;\r
- }\r
- }\r
- return true;\r
- }\r
-\r
- public void expand(int newcount) {\r
- //don't change the allocation strategy\r
- byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];\r
- System.arraycopy(buf, 0, newbuf, 0, bufSize);\r
- buf = newbuf;\r
- }\r
- \r
- public int getCapacity() {\r
- return buf.length;\r
- }\r
-\r
-\r
- /**\r
- * Internal mechanism to make a check if a complete package exists\r
- * within the buffer\r
- * @return - true if a complete package (header,compress,size,data,footer) exists within the buffer\r
- */\r
- public int countPackages() {\r
- return countPackages(false);\r
- }\r
-\r
- public int countPackages(boolean first)\r
- {\r
- int cnt = 0;\r
- int pos = START_DATA.length;\r
- int start = 0;\r
-\r
- while ( start < bufSize ) {\r
- //first check start header\r
- int index = XByteBuffer.firstIndexOf(buf,start,START_DATA);\r
- //if the header (START_DATA) isn't the first thing or\r
- //the buffer isn't even 14 bytes\r
- if ( index != start || ((bufSize-start)<14) ) break;\r
- //next 4 bytes are compress flag not needed for count packages\r
- //then get the size 4 bytes\r
- int size = toInt(buf, pos);\r
- //now the total buffer has to be long enough to hold\r
- //START_DATA.length+4+size+END_DATA.length\r
- pos = start + START_DATA.length + 4 + size;\r
- if ( (pos + END_DATA.length) > bufSize) break;\r
- //and finally check the footer of the package END_DATA\r
- int newpos = firstIndexOf(buf, pos, END_DATA);\r
- //mismatch, there is no package\r
- if (newpos != pos) break;\r
- //increase the packet count\r
- cnt++;\r
- //reset the values\r
- start = pos + END_DATA.length;\r
- pos = start + START_DATA.length;\r
- //we only want to verify that we have at least one package\r
- if ( first ) break;\r
- }\r
- return cnt;\r
- }\r
-\r
- /**\r
- * Method to check if a package exists in this byte buffer.\r
- * @return - true if a complete package (header,options,size,data,footer) exists within the buffer\r
- */\r
- public boolean doesPackageExist() {\r
- return (countPackages(true)>0);\r
- }\r
-\r
- /**\r
- * Extracts the message bytes from a package.\r
- * If no package exists, a IllegalStateException will be thrown.\r
- * @param clearFromBuffer - if true, the package will be removed from the byte buffer\r
- * @return - returns the actual message bytes (header, compress,size and footer not included).\r
- */\r
- public XByteBuffer extractDataPackage(boolean clearFromBuffer) {\r
- int psize = countPackages(true);\r
- if (psize == 0) {\r
- throw new java.lang.IllegalStateException("No package exists in XByteBuffer");\r
- }\r
- int size = toInt(buf, START_DATA.length);\r
- XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false);\r
- xbuf.setLength(size);\r
- System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size);\r
- if (clearFromBuffer) {\r
- int totalsize = START_DATA.length + 4 + size + END_DATA.length;\r
- bufSize = bufSize - totalsize;\r
- System.arraycopy(buf, totalsize, buf, 0, bufSize);\r
- }\r
- return xbuf;\r
-\r
- }\r
- \r
- public ChannelData extractPackage(boolean clearFromBuffer) throws java.io.IOException {\r
- XByteBuffer xbuf = extractDataPackage(clearFromBuffer);\r
- ChannelData cdata = ChannelData.getDataFromPackage(xbuf);\r
- return cdata;\r
- }\r
- \r
- /**\r
- * Creates a complete data package\r
- * @param indata - the message data to be contained within the package\r
- * @param compressed - compression flag for the indata buffer\r
- * @return - a full package (header,size,data,footer)\r
- * \r
- */\r
- public static byte[] createDataPackage(ChannelData cdata) {\r
-// return createDataPackage(cdata.getDataPackage());\r
- //avoid one extra byte array creation\r
- int dlength = cdata.getDataPackageLength();\r
- int length = getDataPackageLength(dlength);\r
- byte[] data = new byte[length];\r
- int offset = 0;\r
- System.arraycopy(START_DATA, 0, data, offset, START_DATA.length);\r
- offset += START_DATA.length;\r
- toBytes(dlength,data, START_DATA.length);\r
- offset += 4;\r
- cdata.getDataPackage(data,offset);\r
- offset += dlength;\r
- System.arraycopy(END_DATA, 0, data, offset, END_DATA.length);\r
- offset += END_DATA.length;\r
- return data;\r
- }\r
- \r
- public static byte[] createDataPackage(byte[] data, int doff, int dlength, byte[] buffer, int bufoff) {\r
- if ( (buffer.length-bufoff) > getDataPackageLength(dlength) ) {\r
- throw new ArrayIndexOutOfBoundsException("Unable to create data package, buffer is too small.");\r
- }\r
- System.arraycopy(START_DATA, 0, buffer, bufoff, START_DATA.length);\r
- toBytes(data.length,buffer, bufoff+START_DATA.length);\r
- System.arraycopy(data, doff, buffer, bufoff+START_DATA.length + 4, dlength);\r
- System.arraycopy(END_DATA, 0, buffer, bufoff+START_DATA.length + 4 + data.length, END_DATA.length);\r
- return buffer;\r
- }\r
-\r
- \r
- public static int getDataPackageLength(int datalength) {\r
- int length = \r
- START_DATA.length + //header length\r
- 4 + //data length indicator\r
- datalength + //actual data length\r
- END_DATA.length; //footer length\r
- return length;\r
-\r
- }\r
- \r
- public static byte[] createDataPackage(byte[] data) {\r
- int length = getDataPackageLength(data.length);\r
- byte[] result = new byte[length];\r
- return createDataPackage(data,0,data.length,result,0);\r
- }\r
- \r
- \r
-\r
-// public static void fillDataPackage(byte[] data, int doff, int dlength, XByteBuffer buf) {\r
-// int pkglen = getDataPackageLength(dlength);\r
-// if ( buf.getCapacity() < pkglen ) buf.expand(pkglen);\r
-// createDataPackage(data,doff,dlength,buf.getBytesDirect(),buf.getLength());\r
-// }\r
-\r
- /**\r
- * Convert four bytes to an int\r
- * @param b - the byte array containing the four bytes\r
- * @param off - the offset\r
- * @return the integer value constructed from the four bytes\r
- * @exception java.lang.ArrayIndexOutOfBoundsException\r
- */\r
- public static int toInt(byte[] b,int off){\r
- return ( ( (int) b[off+3]) & 0xFF) +\r
- ( ( ( (int) b[off+2]) & 0xFF) << 8) +\r
- ( ( ( (int) b[off+1]) & 0xFF) << 16) +\r
- ( ( ( (int) b[off+0]) & 0xFF) << 24);\r
- }\r
-\r
- /**\r
- * Convert eight bytes to a long\r
- * @param b - the byte array containing the four bytes\r
- * @param off - the offset\r
- * @return the long value constructed from the eight bytes\r
- * @exception java.lang.ArrayIndexOutOfBoundsException\r
- */\r
- public static long toLong(byte[] b,int off){\r
- return ( ( (long) b[off+7]) & 0xFF) +\r
- ( ( ( (long) b[off+6]) & 0xFF) << 8) +\r
- ( ( ( (long) b[off+5]) & 0xFF) << 16) +\r
- ( ( ( (long) b[off+4]) & 0xFF) << 24) +\r
- ( ( ( (long) b[off+3]) & 0xFF) << 32) +\r
- ( ( ( (long) b[off+2]) & 0xFF) << 40) +\r
- ( ( ( (long) b[off+1]) & 0xFF) << 48) +\r
- ( ( ( (long) b[off+0]) & 0xFF) << 56);\r
- }\r
-\r
- \r
- /**\r
- * Converts an integer to four bytes\r
- * @param n - the integer\r
- * @return - four bytes in an array\r
- * @deprecated use toBytes(boolean,byte[],int)\r
- */\r
- public static byte[] toBytes(boolean bool) {\r
- byte[] b = new byte[1] ;\r
- return toBytes(bool,b,0);\r
-\r
- }\r
- \r
- public static byte[] toBytes(boolean bool, byte[] data, int offset) {\r
- data[offset] = (byte)(bool?1:0);\r
- return data;\r
- }\r
- \r
- /**\r
- * \r
- * @param <any> long\r
- * @return use\r
- */\r
- public static boolean toBoolean(byte[] b, int offset) {\r
- return b[offset] != 0;\r
- }\r
-\r
- \r
- /**\r
- * Converts an integer to four bytes\r
- * @param n - the integer\r
- * @return - four bytes in an array\r
- * @deprecated use toBytes(int,byte[],int)\r
- */\r
- public static byte[] toBytes(int n) {\r
- return toBytes(n,new byte[4],0);\r
- }\r
-\r
- public static byte[] toBytes(int n,byte[] b, int offset) {\r
- b[offset+3] = (byte) (n);\r
- n >>>= 8;\r
- b[offset+2] = (byte) (n);\r
- n >>>= 8;\r
- b[offset+1] = (byte) (n);\r
- n >>>= 8;\r
- b[offset+0] = (byte) (n);\r
- return b;\r
- }\r
-\r
- /**\r
- * Converts an long to eight bytes\r
- * @param n - the long\r
- * @return - eight bytes in an array\r
- * @deprecated use toBytes(long,byte[],int)\r
- */\r
- public static byte[] toBytes(long n) {\r
- return toBytes(n,new byte[8],0);\r
- }\r
- public static byte[] toBytes(long n, byte[] b, int offset) {\r
- b[offset+7] = (byte) (n);\r
- n >>>= 8;\r
- b[offset+6] = (byte) (n);\r
- n >>>= 8;\r
- b[offset+5] = (byte) (n);\r
- n >>>= 8;\r
- b[offset+4] = (byte) (n);\r
- n >>>= 8;\r
- b[offset+3] = (byte) (n);\r
- n >>>= 8;\r
- b[offset+2] = (byte) (n);\r
- n >>>= 8;\r
- b[offset+1] = (byte) (n);\r
- n >>>= 8;\r
- b[offset+0] = (byte) (n);\r
- return b;\r
- }\r
-\r
- /**\r
- * Similar to a String.IndexOf, but uses pure bytes\r
- * @param src - the source bytes to be searched\r
- * @param srcOff - offset on the source buffer\r
- * @param find - the string to be found within src\r
- * @return - the index of the first matching byte. -1 if the find array is not found\r
- */\r
- public static int firstIndexOf(byte[] src, int srcOff, byte[] find){\r
- int result = -1;\r
- if (find.length > src.length) return result;\r
- if (find.length == 0 || src.length == 0) return result;\r
- if (srcOff >= src.length ) throw new java.lang.ArrayIndexOutOfBoundsException();\r
- boolean found = false;\r
- int srclen = src.length;\r
- int findlen = find.length;\r
- byte first = find[0];\r
- int pos = srcOff;\r
- while (!found) {\r
- //find the first byte\r
- while (pos < srclen){\r
- if (first == src[pos])\r
- break;\r
- pos++;\r
- }\r
- if (pos >= srclen)\r
- return -1;\r
-\r
- //we found the first character\r
- //match the rest of the bytes - they have to match\r
- if ( (srclen - pos) < findlen)\r
- return -1;\r
- //assume it does exist\r
- found = true;\r
- for (int i = 1; ( (i < findlen) && found); i++)\r
- found = found && (find[i] == src[pos + i]);\r
- if (found)\r
- result = pos;\r
- else if ( (srclen - pos) < findlen)\r
- return -1; //no more matches possible\r
- else\r
- pos++;\r
- }\r
- return result;\r
- }\r
-\r
- \r
- public static Serializable deserialize(byte[] data) \r
- throws IOException, ClassNotFoundException, ClassCastException {\r
- return deserialize(data,0,data.length);\r
- }\r
- \r
- public static Serializable deserialize(byte[] data, int offset, int length) \r
- throws IOException, ClassNotFoundException, ClassCastException {\r
- return deserialize(data,offset,length,null); \r
- }\r
- public static int invokecount = 0;\r
- public static Serializable deserialize(byte[] data, int offset, int length, ClassLoader[] cls) \r
- throws IOException, ClassNotFoundException, ClassCastException {\r
- synchronized (XByteBuffer.class) { invokecount++;}\r
- Object message = null;\r
- if ( cls == null ) cls = new ClassLoader[0];\r
- if (data != null) {\r
- InputStream instream = new ByteArrayInputStream(data,offset,length);\r
- ObjectInputStream stream = null;\r
- stream = (cls.length>0)? new ReplicationStream(instream,cls):new ObjectInputStream(instream);\r
- message = stream.readObject();\r
- instream.close();\r
- stream.close();\r
- }\r
- if ( message == null ) {\r
- return null;\r
- } else if (message instanceof Serializable)\r
- return (Serializable) message;\r
- else {\r
- throw new ClassCastException("Message has the wrong class. It should implement Serializable, instead it is:"+message.getClass().getName());\r
- }\r
- }\r
-\r
- /**\r
- * Serializes a message into cluster data\r
- * @param msg ClusterMessage\r
- * @param compress boolean\r
- * @return \r
- * @throws IOException\r
- */\r
- public static byte[] serialize(Serializable msg) throws IOException {\r
- ByteArrayOutputStream outs = new ByteArrayOutputStream();\r
- ObjectOutputStream out = new ObjectOutputStream(outs);\r
- out.writeObject(msg);\r
- out.flush();\r
- byte[] data = outs.toByteArray();\r
- return data;\r
- }\r
-\r
- public void setDiscard(boolean discard) {\r
- this.discard = discard;\r
- }\r
-\r
- public boolean getDiscard() {\r
- return discard;\r
- }\r
-\r
-}\r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * The XByteBuffer provides a dual functionality.
+ * One, it stores message bytes and automatically extends the byte buffer if needed.<BR>
+ * Two, it can encode and decode packages so that they can be defined and identified
+ * as they come in on a socket.
+ * <br>
+ * <b>THIS CLASS IS NOT THREAD SAFE</B><BR>
+ * <br/>
+ * Transfer package:
+ * <ul>
+ * <li><b>START_DATA/b> - 7 bytes - <i>FLT2002</i></li>
+ * <li><b>SIZE</b> - 4 bytes - size of the data package</li>
+ * <li><b>DATA</b> - should be as many bytes as the prev SIZE</li>
+ * <li><b>END_DATA</b> - 7 bytes - <i>TLF2003</i></lI>
+ * </ul>
+ * @author Filip Hanik
+ * @version $Revision$, $Date$
+ */
+public class XByteBuffer
+{
+
+ public static org.apache.juli.logging.Log log =
+ org.apache.juli.logging.LogFactory.getLog( XByteBuffer.class );
+
+ /**
+ * This is a package header, 7 bytes (FLT2002)
+ */
+ public static final byte[] START_DATA = {70,76,84,50,48,48,50};
+
+ /**
+ * This is the package footer, 7 bytes (TLF2003)
+ */
+ public static final byte[] END_DATA = {84,76,70,50,48,48,51};
+
+ /**
+ * Default size on the initial byte buffer
+ */
+ private static final int DEF_SIZE = 2048;
+
+ /**
+ * Default size to extend the buffer with
+ */
+ private static final int DEF_EXT = 1024;
+
+ /**
+ * Variable to hold the data
+ */
+ protected byte[] buf = null;
+
+ /**
+ * Current length of data in the buffer
+ */
+ protected int bufSize = 0;
+
+ /**
+ * Flag for discarding invalid packages
+ * If this flag is set to true, and append(byte[],...) is called,
+ * the data added will be inspected, and if it doesn't start with
+ * <code>START_DATA</code> it will be thrown away.
+ *
+ */
+ protected boolean discard = true;
+
+ /**
+ * Constructs a new XByteBuffer
+ * @param size - the initial size of the byte buffer
+ * @todo use a pool of byte[] for performance
+ */
+ public XByteBuffer(int size, boolean discard) {
+ buf = new byte[size];
+ this.discard = discard;
+ }
+
+ public XByteBuffer(byte[] data,boolean discard) {
+ this(data,data.length+128,discard);
+ }
+
+ public XByteBuffer(byte[] data, int size,boolean discard) {
+ int length = Math.max(data.length,size);
+ buf = new byte[length];
+ System.arraycopy(data,0,buf,0,data.length);
+ bufSize = data.length;
+ this.discard = discard;
+ }
+
+ public int getLength() {
+ return bufSize;
+ }
+
+ public void setLength(int size) {
+ if ( size > buf.length ) throw new ArrayIndexOutOfBoundsException("Size is larger than existing buffer.");
+ bufSize = size;
+ }
+
+ public void trim(int length) {
+ if ( (bufSize - length) < 0 )
+ throw new ArrayIndexOutOfBoundsException("Can't trim more bytes than are available. length:"+bufSize+" trim:"+length);
+ bufSize -= length;
+ }
+
+ public void reset() {
+ bufSize = 0;
+ }
+
+ public byte[] getBytesDirect() {
+ return this.buf;
+ }
+
+ /**
+ * Returns the bytes in the buffer, in its exact length
+ */
+ public byte[] getBytes() {
+ byte[] b = new byte[bufSize];
+ System.arraycopy(buf,0,b,0,bufSize);
+ return b;
+ }
+
+ /**
+ * Resets the buffer
+ */
+ public void clear() {
+ bufSize = 0;
+ }
+
+ /**
+ * Appends the data to the buffer. If the data is incorrectly formatted, ie, the data should always start with the
+ * header, false will be returned and the data will be discarded.
+ * @param b - bytes to be appended
+ * @param off - the offset to extract data from
+ * @param len - the number of bytes to append.
+ * @return true if the data was appended correctly. Returns false if the package is incorrect, ie missing header or something, or the length of data is 0
+ */
+ public boolean append(ByteBuffer b, int len) {
+ int newcount = bufSize + len;
+ if (newcount > buf.length) {
+ expand(newcount);
+ }
+ b.get(buf,bufSize,len);
+
+ bufSize = newcount;
+
+ if ( discard ) {
+ if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {
+ bufSize = 0;
+ log.error("Discarded the package, invalid header");
+ return false;
+ }
+ }
+ return true;
+
+ }
+
+ public boolean append(byte i) {
+ int newcount = bufSize + 1;
+ if (newcount > buf.length) {
+ expand(newcount);
+ }
+ buf[bufSize] = i;
+ bufSize = newcount;
+ return true;
+ }
+
+
+ public boolean append(boolean i) {
+ int newcount = bufSize + 1;
+ if (newcount > buf.length) {
+ expand(newcount);
+ }
+ XByteBuffer.toBytes(i,buf,bufSize);
+ bufSize = newcount;
+ return true;
+ }
+
+ public boolean append(long i) {
+ int newcount = bufSize + 8;
+ if (newcount > buf.length) {
+ expand(newcount);
+ }
+ XByteBuffer.toBytes(i,buf,bufSize);
+ bufSize = newcount;
+ return true;
+ }
+
+ public boolean append(int i) {
+ int newcount = bufSize + 4;
+ if (newcount > buf.length) {
+ expand(newcount);
+ }
+ XByteBuffer.toBytes(i,buf,bufSize);
+ bufSize = newcount;
+ return true;
+ }
+
+ public boolean append(byte[] b, int off, int len) {
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return false;
+ }
+
+ int newcount = bufSize + len;
+ if (newcount > buf.length) {
+ expand(newcount);
+ }
+ System.arraycopy(b, off, buf, bufSize, len);
+ bufSize = newcount;
+
+ if ( discard ) {
+ if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {
+ bufSize = 0;
+ log.error("Discarded the package, invalid header");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void expand(int newcount) {
+ //don't change the allocation strategy
+ byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, bufSize);
+ buf = newbuf;
+ }
+
+ public int getCapacity() {
+ return buf.length;
+ }
+
+
+ /**
+ * Internal mechanism to make a check if a complete package exists
+ * within the buffer
+ * @return - true if a complete package (header,compress,size,data,footer) exists within the buffer
+ */
+ public int countPackages() {
+ return countPackages(false);
+ }
+
+ public int countPackages(boolean first)
+ {
+ int cnt = 0;
+ int pos = START_DATA.length;
+ int start = 0;
+
+ while ( start < bufSize ) {
+ //first check start header
+ int index = XByteBuffer.firstIndexOf(buf,start,START_DATA);
+ //if the header (START_DATA) isn't the first thing or
+ //the buffer isn't even 14 bytes
+ if ( index != start || ((bufSize-start)<14) ) break;
+ //next 4 bytes are compress flag not needed for count packages
+ //then get the size 4 bytes
+ int size = toInt(buf, pos);
+ //now the total buffer has to be long enough to hold
+ //START_DATA.length+4+size+END_DATA.length
+ pos = start + START_DATA.length + 4 + size;
+ if ( (pos + END_DATA.length) > bufSize) break;
+ //and finally check the footer of the package END_DATA
+ int newpos = firstIndexOf(buf, pos, END_DATA);
+ //mismatch, there is no package
+ if (newpos != pos) break;
+ //increase the packet count
+ cnt++;
+ //reset the values
+ start = pos + END_DATA.length;
+ pos = start + START_DATA.length;
+ //we only want to verify that we have at least one package
+ if ( first ) break;
+ }
+ return cnt;
+ }
+
+ /**
+ * Method to check if a package exists in this byte buffer.
+ * @return - true if a complete package (header,options,size,data,footer) exists within the buffer
+ */
+ public boolean doesPackageExist() {
+ return (countPackages(true)>0);
+ }
+
+ /**
+ * Extracts the message bytes from a package.
+ * If no package exists, a IllegalStateException will be thrown.
+ * @param clearFromBuffer - if true, the package will be removed from the byte buffer
+ * @return - returns the actual message bytes (header, compress,size and footer not included).
+ */
+ public XByteBuffer extractDataPackage(boolean clearFromBuffer) {
+ int psize = countPackages(true);
+ if (psize == 0) {
+ throw new java.lang.IllegalStateException("No package exists in XByteBuffer");
+ }
+ int size = toInt(buf, START_DATA.length);
+ XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false);
+ xbuf.setLength(size);
+ System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size);
+ if (clearFromBuffer) {
+ int totalsize = START_DATA.length + 4 + size + END_DATA.length;
+ bufSize = bufSize - totalsize;
+ System.arraycopy(buf, totalsize, buf, 0, bufSize);
+ }
+ return xbuf;
+
+ }
+
+ public ChannelData extractPackage(boolean clearFromBuffer) throws java.io.IOException {
+ XByteBuffer xbuf = extractDataPackage(clearFromBuffer);
+ ChannelData cdata = ChannelData.getDataFromPackage(xbuf);
+ return cdata;
+ }
+
+ /**
+ * Creates a complete data package
+ * @param indata - the message data to be contained within the package
+ * @param compressed - compression flag for the indata buffer
+ * @return - a full package (header,size,data,footer)
+ *
+ */
+ public static byte[] createDataPackage(ChannelData cdata) {
+// return createDataPackage(cdata.getDataPackage());
+ //avoid one extra byte array creation
+ int dlength = cdata.getDataPackageLength();
+ int length = getDataPackageLength(dlength);
+ byte[] data = new byte[length];
+ int offset = 0;
+ System.arraycopy(START_DATA, 0, data, offset, START_DATA.length);
+ offset += START_DATA.length;
+ toBytes(dlength,data, START_DATA.length);
+ offset += 4;
+ cdata.getDataPackage(data,offset);
+ offset += dlength;
+ System.arraycopy(END_DATA, 0, data, offset, END_DATA.length);
+ offset += END_DATA.length;
+ return data;
+ }
+
+ public static byte[] createDataPackage(byte[] data, int doff, int dlength, byte[] buffer, int bufoff) {
+ if ( (buffer.length-bufoff) > getDataPackageLength(dlength) ) {
+ throw new ArrayIndexOutOfBoundsException("Unable to create data package, buffer is too small.");
+ }
+ System.arraycopy(START_DATA, 0, buffer, bufoff, START_DATA.length);
+ toBytes(data.length,buffer, bufoff+START_DATA.length);
+ System.arraycopy(data, doff, buffer, bufoff+START_DATA.length + 4, dlength);
+ System.arraycopy(END_DATA, 0, buffer, bufoff+START_DATA.length + 4 + data.length, END_DATA.length);
+ return buffer;
+ }
+
+
+ public static int getDataPackageLength(int datalength) {
+ int length =
+ START_DATA.length + //header length
+ 4 + //data length indicator
+ datalength + //actual data length
+ END_DATA.length; //footer length
+ return length;
+
+ }
+
+ public static byte[] createDataPackage(byte[] data) {
+ int length = getDataPackageLength(data.length);
+ byte[] result = new byte[length];
+ return createDataPackage(data,0,data.length,result,0);
+ }
+
+
+
+// public static void fillDataPackage(byte[] data, int doff, int dlength, XByteBuffer buf) {
+// int pkglen = getDataPackageLength(dlength);
+// if ( buf.getCapacity() < pkglen ) buf.expand(pkglen);
+// createDataPackage(data,doff,dlength,buf.getBytesDirect(),buf.getLength());
+// }
+
+ /**
+ * Convert four bytes to an int
+ * @param b - the byte array containing the four bytes
+ * @param off - the offset
+ * @return the integer value constructed from the four bytes
+ * @exception java.lang.ArrayIndexOutOfBoundsException
+ */
+ public static int toInt(byte[] b,int off){
+ return ( ( (int) b[off+3]) & 0xFF) +
+ ( ( ( (int) b[off+2]) & 0xFF) << 8) +
+ ( ( ( (int) b[off+1]) & 0xFF) << 16) +
+ ( ( ( (int) b[off+0]) & 0xFF) << 24);
+ }
+
+ /**
+ * Convert eight bytes to a long
+ * @param b - the byte array containing the four bytes
+ * @param off - the offset
+ * @return the long value constructed from the eight bytes
+ * @exception java.lang.ArrayIndexOutOfBoundsException
+ */
+ public static long toLong(byte[] b,int off){
+ return ( ( (long) b[off+7]) & 0xFF) +
+ ( ( ( (long) b[off+6]) & 0xFF) << 8) +
+ ( ( ( (long) b[off+5]) & 0xFF) << 16) +
+ ( ( ( (long) b[off+4]) & 0xFF) << 24) +
+ ( ( ( (long) b[off+3]) & 0xFF) << 32) +
+ ( ( ( (long) b[off+2]) & 0xFF) << 40) +
+ ( ( ( (long) b[off+1]) & 0xFF) << 48) +
+ ( ( ( (long) b[off+0]) & 0xFF) << 56);
+ }
+
+
+ /**
+ * Converts an integer to four bytes
+ * @param n - the integer
+ * @return - four bytes in an array
+ * @deprecated use toBytes(boolean,byte[],int)
+ */
+ public static byte[] toBytes(boolean bool) {
+ byte[] b = new byte[1] ;
+ return toBytes(bool,b,0);
+
+ }
+
+ public static byte[] toBytes(boolean bool, byte[] data, int offset) {
+ data[offset] = (byte)(bool?1:0);
+ return data;
+ }
+
+ /**
+ *
+ * @param <any> long
+ * @return use
+ */
+ public static boolean toBoolean(byte[] b, int offset) {
+ return b[offset] != 0;
+ }
+
+
+ /**
+ * Converts an integer to four bytes
+ * @param n - the integer
+ * @return - four bytes in an array
+ * @deprecated use toBytes(int,byte[],int)
+ */
+ public static byte[] toBytes(int n) {
+ return toBytes(n,new byte[4],0);
+ }
+
+ public static byte[] toBytes(int n,byte[] b, int offset) {
+ b[offset+3] = (byte) (n);
+ n >>>= 8;
+ b[offset+2] = (byte) (n);
+ n >>>= 8;
+ b[offset+1] = (byte) (n);
+ n >>>= 8;
+ b[offset+0] = (byte) (n);
+ return b;
+ }
+
+ /**
+ * Converts an long to eight bytes
+ * @param n - the long
+ * @return - eight bytes in an array
+ * @deprecated use toBytes(long,byte[],int)
+ */
+ public static byte[] toBytes(long n) {
+ return toBytes(n,new byte[8],0);
+ }
+ public static byte[] toBytes(long n, byte[] b, int offset) {
+ b[offset+7] = (byte) (n);
+ n >>>= 8;
+ b[offset+6] = (byte) (n);
+ n >>>= 8;
+ b[offset+5] = (byte) (n);
+ n >>>= 8;
+ b[offset+4] = (byte) (n);
+ n >>>= 8;
+ b[offset+3] = (byte) (n);
+ n >>>= 8;
+ b[offset+2] = (byte) (n);
+ n >>>= 8;
+ b[offset+1] = (byte) (n);
+ n >>>= 8;
+ b[offset+0] = (byte) (n);
+ return b;
+ }
+
+ /**
+ * Similar to a String.IndexOf, but uses pure bytes
+ * @param src - the source bytes to be searched
+ * @param srcOff - offset on the source buffer
+ * @param find - the string to be found within src
+ * @return - the index of the first matching byte. -1 if the find array is not found
+ */
+ public static int firstIndexOf(byte[] src, int srcOff, byte[] find){
+ int result = -1;
+ if (find.length > src.length) return result;
+ if (find.length == 0 || src.length == 0) return result;
+ if (srcOff >= src.length ) throw new java.lang.ArrayIndexOutOfBoundsException();
+ boolean found = false;
+ int srclen = src.length;
+ int findlen = find.length;
+ byte first = find[0];
+ int pos = srcOff;
+ while (!found) {
+ //find the first byte
+ while (pos < srclen){
+ if (first == src[pos])
+ break;
+ pos++;
+ }
+ if (pos >= srclen)
+ return -1;
+
+ //we found the first character
+ //match the rest of the bytes - they have to match
+ if ( (srclen - pos) < findlen)
+ return -1;
+ //assume it does exist
+ found = true;
+ for (int i = 1; ( (i < findlen) && found); i++)
+ found = found && (find[i] == src[pos + i]);
+ if (found)
+ result = pos;
+ else if ( (srclen - pos) < findlen)
+ return -1; //no more matches possible
+ else
+ pos++;
+ }
+ return result;
+ }
+
+
+ public static Serializable deserialize(byte[] data)
+ throws IOException, ClassNotFoundException, ClassCastException {
+ return deserialize(data,0,data.length);
+ }
+
+ public static Serializable deserialize(byte[] data, int offset, int length)
+ throws IOException, ClassNotFoundException, ClassCastException {
+ return deserialize(data,offset,length,null);
+ }
+ public static int invokecount = 0;
+ public static Serializable deserialize(byte[] data, int offset, int length, ClassLoader[] cls)
+ throws IOException, ClassNotFoundException, ClassCastException {
+ synchronized (XByteBuffer.class) { invokecount++;}
+ Object message = null;
+ if ( cls == null ) cls = new ClassLoader[0];
+ if (data != null) {
+ InputStream instream = new ByteArrayInputStream(data,offset,length);
+ ObjectInputStream stream = null;
+ stream = (cls.length>0)? new ReplicationStream(instream,cls):new ObjectInputStream(instream);
+ message = stream.readObject();
+ instream.close();
+ stream.close();
+ }
+ if ( message == null ) {
+ return null;
+ } else if (message instanceof Serializable)
+ return (Serializable) message;
+ else {
+ throw new ClassCastException("Message has the wrong class. It should implement Serializable, instead it is:"+message.getClass().getName());
+ }
+ }
+
+ /**
+ * Serializes a message into cluster data
+ * @param msg ClusterMessage
+ * @param compress boolean
+ * @return
+ * @throws IOException
+ */
+ public static byte[] serialize(Serializable msg) throws IOException {
+ ByteArrayOutputStream outs = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(outs);
+ out.writeObject(msg);
+ out.flush();
+ byte[] data = outs.toByteArray();
+ return data;
+ }
+
+ public void setDiscard(boolean discard) {
+ this.discard = discard;
+ }
+
+ public boolean getDiscard() {
+ return discard;
+ }
+
+}