From: fhanik Date: Thu, 16 Dec 2010 21:09:49 +0000 (+0000) Subject: Starting to work on maxConnections attribute for BIO/NIO connectors to allow administ... X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=768a4050de983ef0f5b51c0c95ed19657a591b74;p=tomcat7.0 Starting to work on maxConnections attribute for BIO/NIO connectors to allow administrators to throttle how accepting connections is handled. Implement a CounterLatch to keep track of the connection count while also allowing the acceptor thread block while the max has been reached git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1050161 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/coyote/AbstractProtocolHandler.java b/java/org/apache/coyote/AbstractProtocolHandler.java index 304011b4a..e645d4efa 100644 --- a/java/org/apache/coyote/AbstractProtocolHandler.java +++ b/java/org/apache/coyote/AbstractProtocolHandler.java @@ -122,6 +122,11 @@ public abstract class AbstractProtocolHandler implements ProtocolHandler, public void setMaxThreads(int maxThreads) { endpoint.setMaxThreads(maxThreads); } + + public int getMaxConnections() { return endpoint.getMaxConnections(); } + public void setMaxConnections(int maxConnections) { + endpoint.setMaxConnections(maxConnections); + } public int getMinSpareThreads() { return endpoint.getMinSpareThreads(); } diff --git a/java/org/apache/tomcat/util/threads/CounterLatch.java b/java/org/apache/tomcat/util/threads/CounterLatch.java new file mode 100644 index 000000000..b034b692a --- /dev/null +++ b/java/org/apache/tomcat/util/threads/CounterLatch.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.threads; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; +/** + * Simple counter latch that allows code to keep an up and down counter, and waits while the latch holds a certain wait value. + * and threads using the latch to wait if the count has reached a certain value. + * The counter latch can be used to keep track of an atomic counter, since the operations {@link #countDown()} + * and {@link #countUp()} are atomic. + * When the latch reaches the wait value, threads will block. The counter latch can hence act like a + * count down latch or a count up latch, while letting you keep track of the counter as well. + * This counter latch works opposite as the java.util.concurrent.CountDownLatch, since the CounterLatch only blocks on a single value and releases the threads on all other values. + * @author fhanik + * @see CountDownLatch + * + */ +public class CounterLatch { + + private class Sync extends AbstractQueuedSynchronizer { + public Sync() { + } + + protected int tryAcquireShared(int arg) { + return ((!released) && count.get() == signal) ? -1 : 1; + } + + protected boolean tryReleaseShared(int arg) { + return true; + } + } + + private final Sync sync; + private final AtomicLong count; + private long signal; + private volatile boolean released = false; + + /** + * Instantiates a CounterLatch object with an initial value and a wait value. + * @param initial - initial value of the counter + * @param waitValue - when the counter holds this value, + * threads calling {@link #await()} or {@link #await(long, TimeUnit)} + * will wait until the counter changes value or until they are interrupted. + */ + public CounterLatch(long initial, long waitValue) { + this.signal = waitValue; + this.count = new AtomicLong(initial); + this.sync = new Sync(); + } + + /** + * Causes the calling thread to wait if the counter holds the waitValue. + * If the counter holds any other value, the thread will return + * If the thread is interrupted or becomes interrupted an InterruptedException is thrown + * @throws InterruptedException + */ + public void await() throws InterruptedException { + sync.acquireSharedInterruptibly(1); + } + + /** + * Causes the calling thread to wait if the counter holds the waitValue. + * If the counter holds any other value, the thread will return + * If the thread is interrupted or becomes interrupted an InterruptedException is thrown + * @return true if the value changed, false if the timeout has elapsed + * @throws InterruptedException + */ + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); + } + + /** + * Increments the counter + * @return the previous counter value + */ + public long countUp() { + long previous = count.getAndIncrement(); + if (previous == signal) { + sync.releaseShared(0); + } + return previous; + } + + /** + * Decrements the counter + * @return the previous counter value + */ + public long countDown() { + long previous = count.getAndDecrement(); + if (previous == signal) { + sync.releaseShared(0); + } + return previous; + } + + /** + * Returns the current counter value + * @return the current counter value + */ + public long getCount() { + return count.get(); + } + + /** + * Performs an atomic update of the counter + * If the operation is successful and {@code expect==waitValue && expect!=update} waiting threads will be released. + * @param expect - the expected counter value + * @param update - the new counter value + * @return + */ + public boolean compareAndSet(long expect, long update) { + boolean result = count.compareAndSet(expect, update); + if (result && expect==signal && expect != update) { + sync.releaseShared(0); + } + return result; + } + + /** + * returns true if there are threads blocked by this latch + * @return true if there are threads blocked by this latch + */ + public boolean hasQueuedThreads() { + return sync.hasQueuedThreads(); + } + + /** + * Returns a collection of the blocked threads + * @return a collection of the blocked threads + */ + public Collection getQueuedThreads() { + return sync.getQueuedThreads(); + } + + /** + * releases all waiting threads. This operation is permanent, and no threads will block, + * even if the counter hits the {@code waitValue} until {@link #reset(long)} has been called. + * @return + */ + public boolean releaseAll() { + released = true; + return sync.releaseShared(0); + } + + /** + * Resets the latch and initializes the counter with the new value. + * @param value the new counter value + * @see {@link #releaseAll()} + */ + public void reset(long value) { + this.count.set(value); + released = false; + } + +} diff --git a/test/org/apache/tomcat/util/threads/TestCounterLatch.java b/test/org/apache/tomcat/util/threads/TestCounterLatch.java new file mode 100644 index 000000000..e5110c791 --- /dev/null +++ b/test/org/apache/tomcat/util/threads/TestCounterLatch.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.threads; + +import junit.framework.TestCase; + +public class TestCounterLatch extends TestCase { + + private volatile CounterLatch latch = null; + + public void setUp() { + + } + + public void tearDown() { + CounterLatch temp = latch; + if (temp!=null) temp.releaseAll(); + latch = null; + } + + public void testNoThreads() throws Exception { + latch = new CounterLatch(0,0); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + } + + public void testOneThreadNoWait() throws Exception { + latch = new CounterLatch(0,1); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + Thread testThread = new Thread() { + public void run() { + try { + latch.await(); + } catch (InterruptedException x) { + x.printStackTrace(); + } + } + }; + testThread.start(); + Thread.sleep(50); + assertEquals("0 threads should be waiting", 0, latch.getQueuedThreads().size()); + latch.countUp(); + Thread.sleep(50); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + } + + public void testOneThreadWaitCountUp() throws Exception { + latch = new CounterLatch(0,1); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + Thread testThread = new Thread() { + public void run() { + try { + latch.await(); + } catch (InterruptedException x) { + x.printStackTrace(); + } + } + }; + latch.countUp(); + testThread.start(); + Thread.sleep(50); + assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size()); + latch.countUp(); + Thread.sleep(50); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + } + + public void testOneThreadWaitCountDown() throws Exception { + latch = new CounterLatch(1,0); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + Thread testThread = new Thread() { + public void run() { + try { + //System.out.println("Entering ["+Thread.currentThread().getName()+"]"); + latch.await(); + } catch (InterruptedException x) { + x.printStackTrace(); + } + //System.out.println("Exiting ["+Thread.currentThread().getName()+"]"); + } + }; + latch.countDown(); + testThread.start(); + Thread.sleep(50); + assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size()); + latch.countDown(); + Thread.sleep(50); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + } + + public void testOneRelease() throws Exception { + latch = new CounterLatch(1,0); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + Thread testThread = new Thread() { + public void run() { + try { + latch.await(); + } catch (InterruptedException x) { + x.printStackTrace(); + } + } + }; + latch.countDown(); + testThread.start(); + Thread.sleep(50); + assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size()); + latch.releaseAll(); + Thread.sleep(50); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + } +}