*/
package org.apache.juli;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import java.util.logging.LogRecord;
/**
*
public static final int OVERFLOW_DROP_LAST = 1;
public static final int OVERFLOW_DROP_FIRST = 2;
public static final int OVERFLOW_DROP_FLUSH = 3;
+ public static final int OVERFLOW_DROP_CURRENT = 4;
public static final int OVERFLOW_DROP_TYPE = Integer.parseInt(System.getProperty("org.apache.juli.AsyncOverflowDropType","1"));
- public static final int DEFAULT_MAX_RECORDS = Integer.parseInt(System.getProperty("org.apache.juli.AsyncMaxRecordCount","1000"));
- public static final int RECORD_BATCH_COUNT = Integer.parseInt(System.getProperty("org.apache.juli.AsyncRecordBatchCount","100"));
+ public static final int DEFAULT_MAX_RECORDS = Integer.parseInt(System.getProperty("org.apache.juli.AsyncMaxRecordCount","10000"));
+ public static final int LOGGER_SLEEP_TIME = Integer.parseInt(System.getProperty("org.apache.juli.AsyncLoggerPollInterval","1000"));
+
+ protected static LinkedBlockingDeque<LogEntry> queue = new LinkedBlockingDeque<LogEntry>(DEFAULT_MAX_RECORDS);
- protected static ConcurrentLinkedQueue<FileHandler> handlers = new ConcurrentLinkedQueue<FileHandler>();
- protected static SignalAtomicLong recordCounter = new SignalAtomicLong();
protected static LoggerThread logger = new LoggerThread();
static {
logger.start();
}
- protected LogQueue<LogRecord> queue = new LogQueue<LogRecord>();
protected volatile boolean closed = false;
public AsyncFileHandler() {
closed = true;
// TODO Auto-generated method stub
super.close();
- handlers.remove(this);
- //empty the queue of log entries for this log
- while (queue.poll()!=null) recordCounter.addAndGet(-1);
}
@Override
closed = false;
// TODO Auto-generated method stub
super.open();
- handlers.add(this);
}
if (!isLoggable(record)) {
return;
}
- this.queue.offer(record);
+ LogEntry entry = new LogEntry(record,this);
+ boolean added = false;
+ try {
+ while (!added && !queue.offer(entry)) {
+ switch (OVERFLOW_DROP_TYPE) {
+ case OVERFLOW_DROP_LAST: {
+ //remove the last added element
+ queue.pollLast();
+ break;
+ }
+ case OVERFLOW_DROP_FIRST: {
+ //remove the first element in the queue
+ queue.pollFirst();
+ break;
+ }
+ case OVERFLOW_DROP_FLUSH: {
+ added = queue.offer(entry,1000,TimeUnit.MILLISECONDS);
+ break;
+ }
+ case OVERFLOW_DROP_CURRENT: {
+ added = true;
+ break;
+ }
+ }//switch
+ }//while
+ }catch (InterruptedException x) {
+ //allow thread to be interrupted and back out of the publish operation
+ //after this we clear the flag
+ Thread.interrupted();
+ }
+
}
protected void publishInternal(LogRecord record) {
- recordCounter.addAndGet(-1);
super.publish(record);
}
- @Override
- protected void finalize() throws Throwable {
- // TODO Auto-generated method stub
- super.finalize();
- }
-
- public int getMaxRecords() {
- return this.queue.max;
- }
-
- public void setMaxRecords(int maxRecords) {
- this.queue.max = maxRecords;
- }
-
- public int getOverflowAction() {
- return this.queue.type;
- }
-
- public void setOverflowAction(int type) {
- this.queue.type = type;
- }
-
- protected static class SignalAtomicLong {
- AtomicLong delegate = new AtomicLong(0);
- ReentrantLock lock = new ReentrantLock();
- Condition sleepUntilPositiveCond = lock.newCondition();
- Condition sleepUntilEmpty = lock.newCondition();
-
- public long addAndGet(long i) {
- long prevValue = delegate.getAndAdd(i);
- if (prevValue<=0 && i>0) {
- lock.lock();
- try {
- sleepUntilPositiveCond.signalAll();
- } finally {
- lock.unlock();
- }
- } else if (prevValue>0 && delegate.get()<=0) {
- lock.lock();
- try {
- sleepUntilEmpty.signalAll();
- } finally {
- lock.unlock();
- }
- }
- return delegate.get();
- }
-
- public void sleepUntilPositive() throws InterruptedException {
- if (delegate.get()>0) return;
- lock.lock();
- try {
- if (delegate.get()>0) return;
- sleepUntilPositiveCond.await();
- } finally {
- lock.unlock();
- }
- }
-
- public void sleepUntilEmpty() throws InterruptedException {
- if (delegate.get()<=0) return;
- lock.lock();
- try {
- if (delegate.get()<=0) return;
- sleepUntilPositiveCond.await();
- } finally {
- lock.unlock();
- }
-
- }
-
- public long get() {
- return delegate.get();
- }
-
- }
-
protected static class LoggerThread extends Thread {
protected boolean run = true;
public LoggerThread() {
public void run() {
while (run) {
try {
- AsyncFileHandler.recordCounter.sleepUntilPositive();
- } catch (InterruptedException x) {
+ LogEntry entry = queue.poll(LOGGER_SLEEP_TIME, TimeUnit.MILLISECONDS);
+ entry.flush();
+ }catch (InterruptedException x) {
Thread.interrupted();
- continue;
+ }catch (Exception x) {
+ x.printStackTrace();
}
- AsyncFileHandler[] handlers = AsyncFileHandler.handlers.toArray(new AsyncFileHandler[0]);
- for (int i=0; run && i<handlers.length; i++) {
- int counter = 0;
- while (run && (counter++)<RECORD_BATCH_COUNT) {
- if (handlers[i].closed) break;
- LogRecord record = handlers[i].queue.poll();
- if (record==null) break;
- handlers[i].publishInternal(record);
- }//while
- }//for
}//while
}
}
- protected static class LogQueue<E> {
- protected int max = DEFAULT_MAX_RECORDS;
- protected int type = OVERFLOW_DROP_TYPE;
- protected ConcurrentLinkedQueue<E> delegate = new ConcurrentLinkedQueue<E>();
+ protected static class LogEntry {
+ private LogRecord record;
+ private AsyncFileHandler handler;
+ public LogEntry(LogRecord record, AsyncFileHandler handler) {
+ super();
+ this.record = record;
+ this.handler = handler;
+ }
- public boolean offer(E e) {
- if (delegate.size()>=max) {
- switch (type) {
- case OVERFLOW_DROP_LAST:
- return false;
- case OVERFLOW_DROP_FIRST: {
- this.poll();
- if (delegate.offer(e)) {
- recordCounter.addAndGet(1);
- return true;
- } else {
- return false;
- }
- }
- case OVERFLOW_DROP_FLUSH: {
- try {
- recordCounter.sleepUntilEmpty();
- }catch (InterruptedException x) {
- //no op - simply continue the operation
- }
- if (delegate.offer(e)) {
- recordCounter.addAndGet(1);
- return true;
- } else {
- return false;
- }
- }
- default:
- return false;
- }
+ public boolean flush() {
+ if (handler.closed) {
+ return false;
} else {
- if (delegate.offer(e)) {
- recordCounter.addAndGet(1);
- return true;
- } else {
- return false;
- }
-
+ handler.publishInternal(record);
+ return true;
}
}
-
- public E peek() {
- return delegate.peek();
- }
-
- public E poll() {
- // TODO Auto-generated method stub
- return delegate.poll();
- }
}
-
+
}