import java.util.Iterator;
import javax.servlet.ServletContextAttributeListener;
import javax.servlet.ServletContextAttributeEvent;
+import org.apache.catalina.LifecycleEvent;
+import org.apache.catalina.LifecycleListener;
+import java.util.Enumeration;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.catalina.util.Enumerator;
/**
* @author Filip Hanik
* @version 1.0
*/
-public class ReplicatedContext extends StandardContext {
+public class ReplicatedContext extends StandardContext implements LifecycleListener {
private int mapSendOptions = Channel.SEND_OPTIONS_DEFAULT;
public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( ReplicatedContext.class );
-
+ protected boolean startComplete = false;
protected static long DEFAULT_REPL_TIMEOUT = 15000;//15 seconds
-
+ public void lifecycleEvent(LifecycleEvent event) {
+ if ( event.getType() == AFTER_START_EVENT )
+ startComplete = true;
+ }
public synchronized void start() throws LifecycleException {
if ( this.started ) return;
+ super.addLifecycleListener(this);
try {
CatalinaCluster catclust = (CatalinaCluster)this.getCluster();
if (this.context == null) this.context = new ReplApplContext(this.getBasePath(), this);
}
if ( !this.started ) return;
try {
+ super.lifecycle.removeLifecycleListener(this);
} catch ( Exception x ){
log.error("Unable to stop ReplicatedContext",x);
throw new LifecycleException("Failed to stop ReplicatedContext",x);
} finally {
+ this.startComplete = false;
super.stop();
}
+
}
protected static class ReplApplContext extends ApplicationContext {
- public ReplApplContext(String basePath, StandardContext context) {
+ protected ConcurrentHashMap tomcatAttributes = new ConcurrentHashMap();
+
+ public ReplApplContext(String basePath, ReplicatedContext context) {
super(basePath,context);
}
- protected ServletContext getFacade() {
+ protected ReplicatedContext getParent() {
+ return (ReplicatedContext)getContext();
+ }
+
+ protected ServletContext getFacade() {
return super.getFacade();
}
}
public void removeAttribute(String name) {
+ tomcatAttributes.remove(name);
//do nothing
super.removeAttribute(name);
}
public void setAttribute(String name, Object value) {
- //do nothing
- super.setAttribute(name,value);
+ if ( (!getParent().startComplete) || "org.apache.jasper.runtime.JspApplicationContextImpl".equals(name) ){
+ tomcatAttributes.put(name,value);
+ } else
+ super.setAttribute(name,value);
+ }
+
+ public Object getAttribute(String name) {
+ if (tomcatAttributes.containsKey(name) )
+ return tomcatAttributes.get(name);
+ else
+ return super.getAttribute(name);
+ }
+
+ public Enumeration getAttributeNames() {
+ return new MultiEnumeration(new Enumeration[] {super.getAttributeNames(),new Enumerator(tomcatAttributes.keySet(), true)});
}
}
+ protected static class MultiEnumeration implements Enumeration {
+ Enumeration[] e=null;
+ public MultiEnumeration(Enumeration[] lists) {
+ e = lists;
+ }
+ public boolean hasMoreElements() {
+ for ( int i=0; i<e.length; i++ ) {
+ if ( e[i].hasMoreElements() ) return true;
+ }
+ return false;
+ }
+ public Object nextElement() {
+ for ( int i=0; i<e.length; i++ ) {
+ if ( e[i].hasMoreElements() ) return e[i].nextElement();
+ }
+ return null;
+ }
+ }
}
\ No newline at end of file
//------------------------------------------------------------------------------
// INSTANCE VARIABLES
//------------------------------------------------------------------------------
+ protected abstract int getStateMessageType();
+
/**
* Timeout for RPC messages, how long we will wait for a reply
Member[] members = getMapMembers();
Member backup = members.length > 0 ? (Member) members[0] : null;
if (backup != null) {
- MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false,
+ MapMessage msg = new MapMessage(mapContextName, getStateMessageType(), false,
null, null, null, null);
Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
if (resp.length > 0) {
}
//state transfer request
- if (mapmsg.getMsgType() == mapmsg.MSG_STATE) {
+ if (mapmsg.getMsgType() == mapmsg.MSG_STATE || mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY) {
synchronized (stateMutex) { //make sure we dont do two things at the same time
ArrayList list = new ArrayList();
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
- MapEntry entry = (MapEntry) e.getValue();
+ MapEntry entry = (MapEntry) super.get(e.getKey());
if ( entry.isSerializable() ) {
- MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY,
- false, (Serializable) entry.getKey(), null, null, entry.getBackupNodes());
+ boolean copy = (mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY);
+ MapMessage me = new MapMessage(mapContextName,
+ copy?MapMessage.MSG_COPY:MapMessage.MSG_PROXY,
+ false, (Serializable) entry.getKey(), copy?(Serializable) entry.getValue():null, null, entry.getBackupNodes());
list.add(me);
}
}
super.remove(mapmsg.getKey());
}
- if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) {
+ if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP || mapmsg.getMsgType() == MapMessage.MSG_COPY) {
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if (entry == null) {
entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
- entry.setBackup(true);
+ entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
entry.setProxy(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
}
} else {
- entry.setBackup(true);
+ entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
entry.setProxy(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
if (entry.getValue() instanceof ReplicatedMapEntry) {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
- MapEntry entry = (MapEntry) e.getValue();
+ MapEntry entry = (MapEntry) super.get(e.getKey());
if ( entry == null ) continue;
if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
try {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
- MapEntry entry = (MapEntry) e.getValue();
+ MapEntry entry = (MapEntry) super.get(e.getKey());
if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) {
try {
Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
- System.out.println( (++cnt) + ". " + e.getValue());
+ System.out.println( (++cnt) + ". " + super.get(e.getKey()));
}
System.out.println("EndMap]\n\n");
}catch ( Exception ignore) {
return super.containsKey(key);
}
-
public Object put(Object key, Object value) {
+ return put(key,value,true);
+ }
+
+ public Object put(Object key, Object value, boolean notify) {
MapEntry entry = new MapEntry(key,value);
entry.setBackup(false);
entry.setProxy(false);
//make sure that any old values get removed
if ( containsKey(key) ) old = remove(key);
try {
- Member[] backup = publishEntryInfo(key, value);
- entry.setBackupNodes(backup);
+ if ( notify ) {
+ Member[] backup = publishEntryInfo(key, value);
+ entry.setBackupNodes(backup);
+ }
} catch (ChannelException x) {
log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
}
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
- MapEntry entry = (MapEntry) e.getValue();
+ MapEntry entry = (MapEntry) super.get(e.getKey());
if (entry.isPrimary() && value.equals(entry.getValue())) return true;
}//while
return false;
while (it.hasNext() ) {
Map.Entry e = (Map.Entry) it.next();
if ( e != null ) {
- MapEntry entry = (MapEntry) e.getValue();
+ MapEntry entry = (MapEntry) super.get(e.getKey());
if (entry.isPrimary() && entry.getValue() != null) counter++;
}
}
Iterator i = super.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry)i.next();
- MapEntry entry = (MapEntry)e.getValue();
+ MapEntry entry = (MapEntry)super.get(e.getKey());
if ( entry.isPrimary() && entry.getValue()!=null) values.add(entry.getValue());
}
return Collections.unmodifiableCollection(values);
public static final int MSG_START = 6;
public static final int MSG_STOP = 7;
public static final int MSG_INIT = 8;
+ public static final int MSG_COPY = 9;
+ public static final int MSG_STATE_COPY = 10;
private byte[] mapId;
private int msgtype;
case MSG_START: return "MSG_START";
case MSG_STOP: return "MSG_STOP";
case MSG_INIT: return "MSG_INIT";
+ case MSG_STATE_COPY: return "MSG_STATE_COPY";
+ case MSG_COPY: return "MSG_COPY";
default : return "UNKNOWN";
}
}