*/
package org.apache.tomcat.jdbc.pool.interceptor;
+import java.lang.management.ManagementFactory;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import javax.management.DynamicMBean;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
+import javax.management.ListenerNotFoundException;
import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.MBeanRegistrationException;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.NotificationEmitter;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.RuntimeOperationsException;
import javax.management.openmbean.CompositeData;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.jdbc.pool.ConnectionPool;
import org.apache.tomcat.jdbc.pool.PooledConnection;
-import org.apache.tomcat.util.modeler.BaseModelMBean;
-import org.apache.tomcat.util.modeler.ManagedBean;
-import org.apache.tomcat.util.modeler.Registry;
+import org.apache.tomcat.jdbc.pool.PoolProperties.InterceptorProperty;
/**
* Publishes data to JMX and provides notifications
* when failures happen.
* @author fhanik
*
*/
-public class SlowQueryReportJmx extends SlowQueryReport {
+public class SlowQueryReportJmx extends SlowQueryReport implements NotificationEmitter, SlowQueryReportJmxMBean{
public static final String SLOW_QUERY_NOTIFICATION = "SLOW QUERY";
public static final String FAILED_QUERY_NOTIFICATION = "FAILED QUERY";
protected static Log log = LogFactory.getLog(SlowQueryReportJmx.class);
- protected static ConcurrentHashMap<String,DynamicMBean> mbeans =
- new ConcurrentHashMap<String,DynamicMBean>();
+ protected static ConcurrentHashMap<String,SlowQueryReportJmxMBean> mbeans =
+ new ConcurrentHashMap<String,SlowQueryReportJmxMBean>();
+
+
+ //==============================JMX STUFF========================
+ protected volatile NotificationBroadcasterSupport notifier = new NotificationBroadcasterSupport();
+
+ public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) throws IllegalArgumentException {
+ notifier.addNotificationListener(listener, filter, handback);
+ }
+
+
+ public MBeanNotificationInfo[] getNotificationInfo() {
+ return notifier.getNotificationInfo();
+ }
+
+ public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException {
+ notifier.removeNotificationListener(listener);
+
+ }
+
+ public void removeNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) throws ListenerNotFoundException {
+ notifier.removeNotificationListener(listener, filter, handback);
+
+ }
+
+
+
+ //==============================JMX STUFF========================
protected String poolName = null;
if (parent!=null) {
poolName = parent.getName();
pool = parent;
+ registerJmx();
}
}
this.pool = pool;
super.poolStarted(pool);
this.poolName = pool.getName();
- registerJmx();
}
@Override
protected void notifyJmx(String query, String type) {
try {
- DynamicMBean mbean = mbeans.get(poolName);
long sequence = notifySequence.incrementAndGet();
if (isNotifyPool()) {
this.pool.getJmxPool().notify(type, query);
}
} else {
- if (mbean!=null && mbean instanceof BaseModelMBean) {
+ if (notifier!=null) {
Notification notification =
new Notification(type,
- mbean,
+ this,
sequence,
System.currentTimeMillis(),
query);
- BaseModelMBean bmbean = (BaseModelMBean)mbean;
- bmbean.sendNotification(notification);
+
+ notifier.sendNotification(notification);
}
}
} catch (RuntimeOperationsException e) {
if (log.isDebugEnabled()) {
log.debug("Unable to send failed query notification.",e);
}
- } catch (MBeanException e) {
- if (log.isDebugEnabled()) {
- log.debug("Unable to send failed query notification.",e);
- }
}
}
protected void deregisterJmx() {
try {
- DynamicMBean mbean = null;
- if ((mbean=mbeans.remove(poolName))!=null) {
- Registry registry = Registry.getRegistry(null, null);
- ManagedBean managed = registry.findManagedBean(this.getClass().getName());
- if (managed!=null) {
- ObjectName oname = new ObjectName(ConnectionPool.POOL_JMX_TYPE_PREFIX+getClass().getName()+",name=" + poolName);
- registry.unregisterComponent(oname);
- registry.removeManagedBean(managed);
- }
+ if (mbeans.remove(poolName)!=null) {
+ ObjectName oname = getObjectName(getClass(),poolName);
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(oname);
}
+ } catch (MBeanRegistrationException e) {
+ log.debug("Jmx deregistration failed.",e);
+ } catch (InstanceNotFoundException e) {
+ log.debug("Jmx deregistration failed.",e);
} catch (MalformedObjectNameException e) {
log.warn("Jmx deregistration failed.",e);
} catch (RuntimeOperationsException e) {
}
}
+
+
+ public static ObjectName getObjectName(Class clazz, String poolName) throws MalformedObjectNameException {
+ ObjectName oname = new ObjectName(ConnectionPool.POOL_JMX_TYPE_PREFIX+clazz.getName()+",name=" + poolName);
+ return oname;
+ }
protected void registerJmx() {
try {
if (isNotifyPool()) {
} else if (getCompositeType()!=null) {
- ObjectName oname = new ObjectName(ConnectionPool.POOL_JMX_TYPE_PREFIX+"SlowQueryReport"+",name=" + poolName);
- Registry registry = Registry.getRegistry(null, null);
- registry.loadDescriptors(getClass().getPackage().getName(),getClass().getClassLoader());
- ManagedBean managed = registry.findManagedBean(this.getClass().getName());
- DynamicMBean mbean = managed!=null?managed.createMBean(this):null;
- if (mbean!=null && mbeans.putIfAbsent(poolName, mbean)==null) {
- registry.getMBeanServer().registerMBean( mbean, oname);
- } else if (mbean==null){
- log.warn(SlowQueryReport.class.getName()+ "- No JMX support, unable to initiate Tomcat JMX. This requires the system to run inside the Tomcat container.");
+ ObjectName oname = getObjectName(getClass(),poolName);
+ if (mbeans.putIfAbsent(poolName, this)==null) {
+ ManagementFactory.getPlatformMBeanServer().registerMBean(this, oname);
}
} else {
log.warn(SlowQueryReport.class.getName()+ "- No JMX support, composite type was not found.");
}
} catch (MalformedObjectNameException e) {
log.error("Jmx registration failed, no JMX data will be exposed for the query stats.",e);
- } catch (InstanceNotFoundException e) {
- log.error("Jmx registration failed, no JMX data will be exposed for the query stats.",e);
} catch (RuntimeOperationsException e) {
log.error("Jmx registration failed, no JMX data will be exposed for the query stats.",e);
} catch (MBeanException e) {
log.error("Jmx registration failed, no JMX data will be exposed for the query stats.",e);
}
}
+
+ @Override
+ public void setProperties(Map<String, InterceptorProperty> properties) {
+ super.setProperties(properties);
+ final String threshold = "notifyPool";
+ InterceptorProperty p1 = properties.get(threshold);
+ if (p1!=null) {
+ this.setNotifyPool(Boolean.parseBoolean(p1.getValue()));
+ }
+ }
+
+
}
*/
package org.apache.tomcat.jdbc.test;
+import java.lang.management.ManagementFactory;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.Map;
+import javax.management.AttributeChangeNotification;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+
import org.apache.tomcat.jdbc.pool.ConnectionPool;
import org.apache.tomcat.jdbc.pool.interceptor.SlowQueryReport;
+import org.apache.tomcat.jdbc.pool.interceptor.SlowQueryReportJmx;
public class TestSlowQueryReport extends DefaultTestCase {
assertNull(SlowQueryReport.getPoolStats(pool.getName()));
}
+ public void testSlowSqlJmx() throws Exception {
+ int count = 1;
+ this.init();
+ this.datasource.setMaxActive(1);
+ this.datasource.setJdbcInterceptors(SlowQueryReportJmx.class.getName()+"(threshold=50,notifyPool=false)");
+ Connection con = this.datasource.getConnection();
+ String slowSql = "select count(1) from test where val1 like 'ewq%eq'";
+ for (int i=0; i<count; i++) {
+ Statement st = con.createStatement();
+ ResultSet rs = st.executeQuery(slowSql);
+ rs.close();
+ st.close();
+ }
+ Map<String,SlowQueryReport.QueryStats> map = SlowQueryReport.getPoolStats(datasource.getPool().getName());
+ assertNotNull(map);
+ assertEquals(1,map.size());
+ String key = map.keySet().iterator().next();
+ SlowQueryReport.QueryStats stats = map.get(key);
+ System.out.println("Stats:"+stats);
+ ClientListener listener = new ClientListener();
+ ConnectionPool pool = datasource.getPool();
+ ManagementFactory.getPlatformMBeanServer().addNotificationListener(
+ SlowQueryReportJmx.getObjectName(SlowQueryReportJmx.class, pool.getName()),
+ listener,
+ null,
+ null);
+
+ for (int i=0; i<count; i++) {
+ PreparedStatement st = con.prepareStatement(slowSql);
+ ResultSet rs = st.executeQuery();
+ rs.close();
+ st.close();
+ }
+ System.out.println("Stats:"+stats);
+
+ for (int i=0; i<count; i++) {
+ CallableStatement st = con.prepareCall(slowSql);
+ ResultSet rs = st.executeQuery();
+ rs.close();
+ st.close();
+ }
+ System.out.println("Stats:"+stats);
+ assertEquals("Expecting to have received "+(2*count)+" notifications.",2*count, listener.notificationCount);
+ con.close();
+ tearDown();
+ //make sure we actually did clean up when the pool closed
+ assertNull(SlowQueryReport.getPoolStats(pool.getName()));
+ }
+
+
public void testFastSql() throws Exception {
int count = 3;
this.init();
con.close();
tearDown();
assertNull(SlowQueryReport.getPoolStats(pool.getName()));
- }
+ }
+
+
+ public class ClientListener implements NotificationListener {
+ volatile int notificationCount = 0;
+ public void handleNotification(Notification notification,
+ Object handback) {
+ notificationCount++;
+ System.out.println("\nReceived notification:");
+ System.out.println("\tClassName: " + notification.getClass().getName());
+ System.out.println("\tSource: " + notification.getSource());
+ System.out.println("\tType: " + notification.getType());
+ System.out.println("\tMessage: " + notification.getMessage());
+ if (notification instanceof AttributeChangeNotification) {
+ AttributeChangeNotification acn =
+ (AttributeChangeNotification) notification;
+ System.out.println("\tAttributeName: " + acn.getAttributeName());
+ System.out.println("\tAttributeType: " + acn.getAttributeType());
+ System.out.println("\tNewValue: " + acn.getNewValue());
+ System.out.println("\tOldValue: " + acn.getOldValue());
+ }
+ }
+ }
}