Refactoring of async logging handler. Much cleaner code achieves the exact same funct...
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Mon, 14 Sep 2009 20:06:03 +0000 (20:06 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Mon, 14 Sep 2009 20:06:03 +0000 (20:06 +0000)
The fact that the queue is bounded means that we never have to worry about the size

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@814823 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/juli/AsyncFileHandler.java
webapps/docs/config/systemprops.xml

index 8668b53..ece00e9 100644 (file)
  */
 package org.apache.juli;
 
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.LogRecord;
 /**
  * 
@@ -31,20 +29,20 @@ public class AsyncFileHandler extends FileHandler {
     public static final int OVERFLOW_DROP_LAST = 1;
     public static final int OVERFLOW_DROP_FIRST = 2;
     public static final int OVERFLOW_DROP_FLUSH = 3;
+    public static final int OVERFLOW_DROP_CURRENT = 4;
     
     public static final int OVERFLOW_DROP_TYPE = Integer.parseInt(System.getProperty("org.apache.juli.AsyncOverflowDropType","1"));
-    public static final int DEFAULT_MAX_RECORDS = Integer.parseInt(System.getProperty("org.apache.juli.AsyncMaxRecordCount","1000"));
-    public static final int RECORD_BATCH_COUNT = Integer.parseInt(System.getProperty("org.apache.juli.AsyncRecordBatchCount","100"));
+    public static final int DEFAULT_MAX_RECORDS = Integer.parseInt(System.getProperty("org.apache.juli.AsyncMaxRecordCount","10000"));
+    public static final int LOGGER_SLEEP_TIME = Integer.parseInt(System.getProperty("org.apache.juli.AsyncLoggerPollInterval","1000"));
+   
+    protected static LinkedBlockingDeque<LogEntry> queue = new LinkedBlockingDeque<LogEntry>(DEFAULT_MAX_RECORDS);
     
-    protected static ConcurrentLinkedQueue<FileHandler> handlers = new ConcurrentLinkedQueue<FileHandler>();
-    protected static SignalAtomicLong recordCounter = new SignalAtomicLong();
     protected static LoggerThread logger = new LoggerThread();
     
     static {
         logger.start();
     }
     
-    protected LogQueue<LogRecord> queue = new LogQueue<LogRecord>();
     protected volatile boolean closed = false;
     
     public AsyncFileHandler() {
@@ -62,9 +60,6 @@ public class AsyncFileHandler extends FileHandler {
         closed = true;
         // TODO Auto-generated method stub
         super.close();
-        handlers.remove(this);
-        //empty the queue of log entries for this log
-        while (queue.poll()!=null) recordCounter.addAndGet(-1);
     }
     
     @Override
@@ -73,7 +68,6 @@ public class AsyncFileHandler extends FileHandler {
         closed = false;
         // TODO Auto-generated method stub
         super.open();
-        handlers.add(this);
     }
     
 
@@ -82,91 +76,43 @@ public class AsyncFileHandler extends FileHandler {
         if (!isLoggable(record)) {
             return;
         }
-        this.queue.offer(record);
+        LogEntry entry = new LogEntry(record,this);
+        boolean added = false;
+        try {
+            while (!added && !queue.offer(entry)) {
+                switch (OVERFLOW_DROP_TYPE) {
+                    case OVERFLOW_DROP_LAST: {
+                        //remove the last added element
+                        queue.pollLast(); 
+                        break;
+                    }
+                    case OVERFLOW_DROP_FIRST: {
+                        //remove the first element in the queue
+                        queue.pollFirst();
+                        break;
+                    }
+                    case OVERFLOW_DROP_FLUSH: {
+                        added = queue.offer(entry,1000,TimeUnit.MILLISECONDS);
+                        break;
+                    }
+                    case OVERFLOW_DROP_CURRENT: {
+                        added = true;
+                        break;
+                    }
+                }//switch
+            }//while
+        }catch (InterruptedException x) {
+            //allow thread to be interrupted and back out of the publish operation
+            //after this we clear the flag
+            Thread.interrupted();
+        }
+        
     }
     
     protected void publishInternal(LogRecord record) {
-        recordCounter.addAndGet(-1);
         super.publish(record);
     }
 
-    @Override
-    protected void finalize() throws Throwable {
-        // TODO Auto-generated method stub
-        super.finalize();
-    }
-
-    public int getMaxRecords() {
-        return this.queue.max;
-    }
-
-    public void setMaxRecords(int maxRecords) {
-        this.queue.max = maxRecords;
-    }
-
-    public int getOverflowAction() {
-        return this.queue.type;
-    }
-
-    public void setOverflowAction(int type) {
-        this.queue.type = type;
-    }
-    
-    protected static class SignalAtomicLong {
-        AtomicLong delegate = new AtomicLong(0);
-        ReentrantLock lock = new ReentrantLock();
-        Condition sleepUntilPositiveCond = lock.newCondition();
-        Condition sleepUntilEmpty = lock.newCondition();
-        
-        public long addAndGet(long i) {
-            long prevValue = delegate.getAndAdd(i);
-            if (prevValue<=0 && i>0) {
-                lock.lock();
-                try {
-                    sleepUntilPositiveCond.signalAll();
-                } finally {
-                    lock.unlock();
-                }
-            } else if (prevValue>0 && delegate.get()<=0) {
-                lock.lock();
-                try {
-                    sleepUntilEmpty.signalAll();
-                } finally {
-                    lock.unlock();
-                }
-            }
-            return delegate.get();
-        }
-        
-        public void sleepUntilPositive() throws InterruptedException {
-            if (delegate.get()>0) return;
-            lock.lock();
-            try {
-                if (delegate.get()>0) return;
-                sleepUntilPositiveCond.await();
-            } finally {
-                lock.unlock();
-            }
-        }
-        
-        public void sleepUntilEmpty() throws InterruptedException {
-            if (delegate.get()<=0) return;
-            lock.lock();
-            try {
-                if (delegate.get()<=0) return;
-                sleepUntilPositiveCond.await();
-            } finally {
-                lock.unlock();
-            }
-            
-        }
-        
-        public long get() {
-            return delegate.get();
-        }
-        
-    }
-    
     protected static class LoggerThread extends Thread {
         protected boolean run = true;
         public LoggerThread() {
@@ -177,81 +123,36 @@ public class AsyncFileHandler extends FileHandler {
         public void run() {
             while (run) {
                 try {
-                    AsyncFileHandler.recordCounter.sleepUntilPositive();
-                } catch (InterruptedException x) {
+                    LogEntry entry = queue.poll(LOGGER_SLEEP_TIME, TimeUnit.MILLISECONDS);
+                    entry.flush();
+                }catch (InterruptedException x) {
                     Thread.interrupted();
-                    continue;
+                }catch (Exception x) {
+                    x.printStackTrace();
                 }
-                AsyncFileHandler[] handlers = AsyncFileHandler.handlers.toArray(new AsyncFileHandler[0]);
-                for (int i=0; run && i<handlers.length; i++) {
-                    int counter = 0;
-                    while (run && (counter++)<RECORD_BATCH_COUNT) {
-                        if (handlers[i].closed) break;
-                        LogRecord record = handlers[i].queue.poll();
-                        if (record==null) break;
-                        handlers[i].publishInternal(record);
-                    }//while
-                }//for
             }//while
         }
     }
     
-    protected static class LogQueue<E> {
-        protected int max = DEFAULT_MAX_RECORDS;
-        protected int type = OVERFLOW_DROP_TYPE;
-        protected ConcurrentLinkedQueue<E> delegate = new ConcurrentLinkedQueue<E>(); 
+    protected static class LogEntry {
+        private LogRecord record;
+        private AsyncFileHandler handler;
+        public LogEntry(LogRecord record, AsyncFileHandler handler) {
+            super();
+            this.record = record;
+            this.handler = handler;
+        }
         
-        public boolean offer(E e) {
-            if (delegate.size()>=max) {
-                switch (type) {
-                    case OVERFLOW_DROP_LAST:
-                        return false;
-                    case OVERFLOW_DROP_FIRST: {
-                        this.poll();
-                        if (delegate.offer(e)) {
-                            recordCounter.addAndGet(1);
-                            return true;
-                        } else {
-                            return false;
-                        }
-                    }
-                    case OVERFLOW_DROP_FLUSH: {
-                        try {
-                            recordCounter.sleepUntilEmpty();
-                        }catch (InterruptedException x) {
-                            //no op - simply continue the operation
-                        }
-                        if (delegate.offer(e)) {
-                            recordCounter.addAndGet(1);
-                            return true;
-                        } else {
-                            return false;
-                        }
-                    }
-                    default:
-                        return false;
-                }
+        public boolean flush() {
+            if (handler.closed) {
+                return false;
             } else {
-                if (delegate.offer(e)) {
-                    recordCounter.addAndGet(1);
-                    return true;
-                } else {
-                    return false;
-                }
-
+                handler.publishInternal(record);
+                return true;
             }
         }
-
-        public E peek() {
-            return delegate.peek();
-        }
-
-        public E poll() {
-            // TODO Auto-generated method stub
-            return delegate.poll();
-        }
         
     }
-
+    
     
 }
index d32d19e..d5a8f92 100644 (file)
            <li><code>int OVERFLOW_DROP_LAST = 1</code> - the record that caused the overflow will be dropped and not logged</li>
            <li><code>int OVERFLOW_DROP_FIRST = 2</code> - the record that is next in line to be logged will be dropped to make room for the latest record on the queue</li>
            <li><code>int OVERFLOW_DROP_FLUSH = 3</code> - suspend the thread while the queue empties out and flushes the entries to the write buffer</li>
+           <li><code>int OVERFLOW_DROP_CURRENT = 4</code> - drop the current log entry</li>
          </ul>
          Default value is <code>1</code> (OVERFLOW_DROP_LAST).
       </p>
     <property name="org.apache.juli.AsyncMaxRecordCount">
       <p>The max number of log records that the async logger will keep in memory. When this limit is reached and a new record is being logged by the 
          JULI framework the system will take an action based on the <code>org.apache.juli.AsyncOverflowDropType</code> setting.
-         The default value is <code>1000</code> records
+         The default value is <code>10000</code> records.<br/>
+         This number represents the global number of records, not on a per handler basis.
       </p>
     </property>
 
-    <property name="org.apache.juli.AsyncRecordBatchCount">
-      <p>
+    <property name="org.apache.juli.AsyncLoggerPollInterval">
+      <p>The poll interval in milliseconds for the asynchronous logger thread in milliseconds.
+         If the log queue is empty, the async thread will issue a poll(poll interval)
+         in order to not wake up to often.
+         The default value is <code>1000</code> milliseconds.
       </p>
     </property>