From: markt Date: Thu, 26 May 2011 15:27:52 +0000 (+0000) Subject: Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=51240 X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=1debefbeabc034be2070736292a2b42d8639e2f3;p=tomcat7.0 Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=51240 Replace the more generic CounterLatch (that has concurrency issues) with a more specific LimitLatch that (mostly) only provides the functionality required by the connectors to implement maxConnections. It also adds support for dynamically modifying maxConnections. git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1127962 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/tomcat/util/net/AbstractEndpoint.java b/java/org/apache/tomcat/util/net/AbstractEndpoint.java index b31f3f38c..bbc7c0a04 100644 --- a/java/org/apache/tomcat/util/net/AbstractEndpoint.java +++ b/java/org/apache/tomcat/util/net/AbstractEndpoint.java @@ -31,7 +31,7 @@ import javax.net.ssl.KeyManagerFactory; import org.apache.juli.logging.Log; import org.apache.tomcat.util.IntrospectionUtils; import org.apache.tomcat.util.res.StringManager; -import org.apache.tomcat.util.threads.CounterLatch; +import org.apache.tomcat.util.threads.LimitLatch; import org.apache.tomcat.util.threads.ResizableExecutor; import org.apache.tomcat.util.threads.TaskQueue; import org.apache.tomcat.util.threads.TaskThreadFactory; @@ -97,7 +97,7 @@ public abstract class AbstractEndpoint { /** * counter for nr of connections handled by an endpoint */ - private volatile CounterLatch connectionCounterLatch = null; + private volatile LimitLatch connectionLimitLatch = null; /** * Socket properties @@ -111,7 +111,13 @@ public abstract class AbstractEndpoint { // ----------------------------------------------------------------- Properties private int maxConnections = 10000; - public void setMaxConnections(int maxCon) { this.maxConnections = maxCon; } + public void setMaxConnections(int maxCon) { + this.maxConnections = maxCon; + LimitLatch latch = this.connectionLimitLatch; + // Update the latch that enforces this + latch.setLimit(maxCon); + } + public int getMaxConnections() { return this.maxConnections; } /** * External Executor based thread pool. @@ -550,32 +556,26 @@ public abstract class AbstractEndpoint { protected abstract Log getLog(); public abstract boolean getUseSendfile(); - protected CounterLatch initializeConnectionLatch() { - if (connectionCounterLatch==null) { - connectionCounterLatch = new CounterLatch(0,getMaxConnections()); + protected LimitLatch initializeConnectionLatch() { + if (connectionLimitLatch==null) { + connectionLimitLatch = new LimitLatch(getMaxConnections()); } - return connectionCounterLatch; + return connectionLimitLatch; } protected void releaseConnectionLatch() { - CounterLatch latch = connectionCounterLatch; + LimitLatch latch = connectionLimitLatch; if (latch!=null) latch.releaseAll(); - connectionCounterLatch = null; - } - - protected void awaitConnection() throws InterruptedException { - CounterLatch latch = connectionCounterLatch; - if (latch!=null) latch.await(); + connectionLimitLatch = null; } - protected long countUpConnection() { - CounterLatch latch = connectionCounterLatch; - if (latch!=null) return latch.countUp(); - else return -1; + protected void countUpOrAwaitConnection() throws InterruptedException { + LimitLatch latch = connectionLimitLatch; + if (latch!=null) latch.countUpOrAwait(); } protected long countDownConnection() { - CounterLatch latch = connectionCounterLatch; + LimitLatch latch = connectionLimitLatch; if (latch!=null) { long result = latch.countDown(); if (result<0) { diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java index 5b806e425..ffa5c73cf 100644 --- a/java/org/apache/tomcat/util/net/AprEndpoint.java +++ b/java/org/apache/tomcat/util/net/AprEndpoint.java @@ -949,7 +949,7 @@ public class AprEndpoint extends AbstractEndpoint { } try { //if we have reached max connections, wait - awaitConnection(); + countUpOrAwaitConnection(); long socket = 0; try { @@ -965,8 +965,6 @@ public class AprEndpoint extends AbstractEndpoint { // Successful accept, reset the error delay errorDelay = 0; - //increment socket count - countUpConnection(); /* * In the case of a deferred accept unlockAccept needs to * send data. This data will be rubbish, so destroy the diff --git a/java/org/apache/tomcat/util/net/JIoEndpoint.java b/java/org/apache/tomcat/util/net/JIoEndpoint.java index 440b7e64c..cbff57b0a 100644 --- a/java/org/apache/tomcat/util/net/JIoEndpoint.java +++ b/java/org/apache/tomcat/util/net/JIoEndpoint.java @@ -211,8 +211,8 @@ public class JIoEndpoint extends AbstractEndpoint { } try { //if we have reached max connections, wait - awaitConnection(); - + countUpOrAwaitConnection(); + Socket socket = null; try { // Accept the next incoming connection from the server @@ -237,8 +237,6 @@ public class JIoEndpoint extends AbstractEndpoint { } catch (IOException e) { // Ignore } - } else { - countUpConnection(); } } else { // Close socket right away diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 775b8bb4e..6e1e8508a 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -761,7 +761,7 @@ public class NioEndpoint extends AbstractEndpoint { } try { //if we have reached max connections, wait - awaitConnection(); + countUpOrAwaitConnection(); SocketChannel socket = null; try { @@ -791,8 +791,6 @@ public class NioEndpoint extends AbstractEndpoint { if (log.isDebugEnabled()) log.debug("", ix); } - } else { - countUpConnection(); } } } catch (SocketTimeoutException sx) { diff --git a/java/org/apache/tomcat/util/threads/CounterLatch.java b/java/org/apache/tomcat/util/threads/CounterLatch.java deleted file mode 100644 index 2b3718c9d..000000000 --- a/java/org/apache/tomcat/util/threads/CounterLatch.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tomcat.util.threads; - -import java.util.Collection; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; -/** - * Simple counter latch that allows code to keep an up and down counter, and waits while the latch holds a certain wait value. - * and threads using the latch to wait if the count has reached a certain value. - * The counter latch can be used to keep track of an atomic counter, since the operations {@link #countDown()} - * and {@link #countUp()} are atomic. - * When the latch reaches the wait value, threads will block. The counter latch can hence act like a - * count down latch or a count up latch, while letting you keep track of the counter as well. - * This counter latch works opposite as the java.util.concurrent.CountDownLatch, since the CounterLatch only blocks on a single value and releases the threads on all other values. - * @author fhanik - * @see CountDownLatch - * - */ -public class CounterLatch { - - private class Sync extends AbstractQueuedSynchronizer { - private static final long serialVersionUID = 1L; - - public Sync() { - } - - @Override - protected int tryAcquireShared(int arg) { - return ((!released) && count.get() == signal) ? -1 : 1; - } - - @Override - protected boolean tryReleaseShared(int arg) { - return true; - } - } - - private final Sync sync; - private final AtomicLong count; - private volatile long signal; - private volatile boolean released = false; - - /** - * Instantiates a CounterLatch object with an initial value and a wait value. - * @param initial - initial value of the counter - * @param waitValue - when the counter holds this value, - * threads calling {@link #await()} or {@link #await(long, TimeUnit)} - * will wait until the counter changes value or until they are interrupted. - */ - public CounterLatch(long initial, long waitValue) { - this.signal = waitValue; - this.count = new AtomicLong(initial); - this.sync = new Sync(); - } - - /** - * Causes the calling thread to wait if the counter holds the waitValue. - * If the counter holds any other value, the thread will return - * If the thread is interrupted or becomes interrupted an InterruptedException is thrown - * @throws InterruptedException - */ - public void await() throws InterruptedException { - sync.acquireSharedInterruptibly(1); - } - - /** - * Causes the calling thread to wait if the counter holds the waitValue. - * If the counter holds any other value, the thread will return - * If the thread is interrupted or becomes interrupted an InterruptedException is thrown - * @return true if the value changed, false if the timeout has elapsed - * @throws InterruptedException - */ - public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); - } - - /** - * Increments the counter - * @return the previous counter value - */ - public long countUp() { - long previous = count.getAndIncrement(); - if (previous == signal) { - sync.releaseShared(0); - } - return previous; - } - - /** - * Decrements the counter - * @return the previous counter value - */ - public long countDown() { - long previous = count.getAndDecrement(); - if (previous == signal) { - sync.releaseShared(0); - } - return previous; - } - - /** - * Returns the current counter value - * @return the current counter value - */ - public long getCount() { - return count.get(); - } - - /** - * Performs an atomic update of the counter - * If the operation is successful and {@code expect==waitValue && expect!=update} waiting threads will be released. - * @param expect - the expected counter value - * @param update - the new counter value - * @return true if successful, false if the - * current value wasn't as expected - */ - public boolean compareAndSet(long expect, long update) { - boolean result = count.compareAndSet(expect, update); - if (result && expect==signal && expect != update) { - sync.releaseShared(0); - } - return result; - } - - /** - * returns true if there are threads blocked by this latch - * @return true if there are threads blocked by this latch - */ - public boolean hasQueuedThreads() { - return sync.hasQueuedThreads(); - } - - /** - * Returns a collection of the blocked threads - * @return a collection of the blocked threads - */ - public Collection getQueuedThreads() { - return sync.getQueuedThreads(); - } - - /** - * releases all waiting threads. This operation is permanent, and no threads will block, - * even if the counter hits the {@code waitValue} until {@link #reset(long)} has been called. - * @return true if this release of shared mode may permit a - * waiting acquire (shared or exclusive) to succeed; and - * false otherwise - */ - public boolean releaseAll() { - released = true; - return sync.releaseShared(0); - } - - /** - * Resets the latch and initializes the counter with the new value. - * @param value the new counter value - * @see #releaseAll() - */ - public void reset(long value) { - this.count.set(value); - released = false; - } - -} diff --git a/java/org/apache/tomcat/util/threads/LimitLatch.java b/java/org/apache/tomcat/util/threads/LimitLatch.java new file mode 100644 index 000000000..c2687de1c --- /dev/null +++ b/java/org/apache/tomcat/util/threads/LimitLatch.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.threads; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +/** + * Shared latch that allows the latch to be acquired a limited number of times + * after which all subsequent requests to acquire the latch will be placed in a + * FIFO queue until one of the shares is returned. + */ +public class LimitLatch { + + private class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = 1L; + + public Sync() { + } + + @Override + protected int tryAcquireShared(int ignored) { + long newCount = count.incrementAndGet(); + if (!released && newCount > limit) { + // Limit exceeded + count.decrementAndGet(); + return -1; + } else { + return 1; + } + } + + @Override + protected boolean tryReleaseShared(int arg) { + count.decrementAndGet(); + return true; + } + } + + private final Sync sync; + private final AtomicLong count; + private volatile long limit; + private volatile boolean released = false; + + /** + * Instantiates a LimitLatch object with an initial limit. + * @param limit - maximum number of concurrent acquisitions of this latch + */ + public LimitLatch(long limit) { + this.limit = limit; + this.count = new AtomicLong(0); + this.sync = new Sync(); + } + + /** + * Obtain the current limit. + */ + public long getLimit() { + return limit; + } + + + /** + * Sets a new limit. If the limit is decreased there may be a period where + * more shares of the latch are acquired than the limit. In this case no + * more shares of the latch will be issued until sufficient shares have been + * returned to reduce the number of acquired shares of the latch to below + * the new limit. If the limit is increased, threads currently in the queue + * may not be issued one of the newly available shares until the next + * request is made for a latch. + * + * @param limit The new limit + */ + public void setLimit(long limit) { + this.limit = limit; + } + + + /** + * Acquires a shared latch if one is available or waits for one if no shared + * latch is current available. + */ + public void countUpOrAwait() throws InterruptedException { + sync.acquireSharedInterruptibly(1); + } + + /** + * Releases a shared latch, making it available for another thread to use. + * @return the previous counter value + */ + public long countDown() { + sync.releaseShared(0); + return count.get(); + } + + /** + * Releases all waiting threads and causes the {@link #limit} to be ignored + * until {@link #reset()} is called. + */ + public boolean releaseAll() { + released = true; + return sync.releaseShared(0); + } + + /** + * Resets the latch and initializes the shared acquisition counter to zero. + * @see #releaseAll() + */ + public void reset() { + this.count.set(0); + released = false; + } + + /** + * Returns true if there is at least one thread waiting to + * acquire the shared lock, otherwise returns false. + */ + public boolean hasQueuedThreads() { + return sync.hasQueuedThreads(); + } + + /** + * Provide access to the list of threads waiting to acquire this limited + * shared latch. + */ + public Collection getQueuedThreads() { + return sync.getQueuedThreads(); + } +} diff --git a/test/org/apache/tomcat/util/threads/TestCounterLatch.java b/test/org/apache/tomcat/util/threads/TestCounterLatch.java deleted file mode 100644 index 34ac94cc4..000000000 --- a/test/org/apache/tomcat/util/threads/TestCounterLatch.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tomcat.util.threads; - -import junit.framework.TestCase; - -public class TestCounterLatch extends TestCase { - - private volatile CounterLatch latch = null; - - @Override - public void tearDown() { - CounterLatch temp = latch; - if (temp!=null) temp.releaseAll(); - latch = null; - } - - public void testNoThreads() throws Exception { - latch = new CounterLatch(0,0); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - } - - public void testOneThreadNoWait() throws Exception { - latch = new CounterLatch(0,1); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - Thread testThread = new Thread() { - @Override - public void run() { - try { - latch.await(); - } catch (InterruptedException x) { - x.printStackTrace(); - } - } - }; - testThread.start(); - Thread.sleep(50); - assertEquals("0 threads should be waiting", 0, latch.getQueuedThreads().size()); - latch.countUp(); - Thread.sleep(50); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - } - - public void testOneThreadWaitCountUp() throws Exception { - latch = new CounterLatch(0,1); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - Thread testThread = new Thread() { - @Override - public void run() { - try { - latch.await(); - } catch (InterruptedException x) { - x.printStackTrace(); - } - } - }; - latch.countUp(); - testThread.start(); - Thread.sleep(50); - assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size()); - latch.countUp(); - Thread.sleep(50); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - } - - public void testOneThreadWaitCountDown() throws Exception { - latch = new CounterLatch(1,0); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - Thread testThread = new Thread() { - @Override - public void run() { - try { - //System.out.println("Entering ["+Thread.currentThread().getName()+"]"); - latch.await(); - } catch (InterruptedException x) { - x.printStackTrace(); - } - //System.out.println("Exiting ["+Thread.currentThread().getName()+"]"); - } - }; - latch.countDown(); - testThread.start(); - Thread.sleep(50); - assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size()); - latch.countDown(); - Thread.sleep(50); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - } - - public void testOneRelease() throws Exception { - latch = new CounterLatch(1,0); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - Thread testThread = new Thread() { - @Override - public void run() { - try { - latch.await(); - } catch (InterruptedException x) { - x.printStackTrace(); - } - } - }; - latch.countDown(); - testThread.start(); - Thread.sleep(50); - assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size()); - latch.releaseAll(); - Thread.sleep(50); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - } -} diff --git a/test/org/apache/tomcat/util/threads/TestLimitLatch.java b/test/org/apache/tomcat/util/threads/TestLimitLatch.java new file mode 100644 index 000000000..783f36e90 --- /dev/null +++ b/test/org/apache/tomcat/util/threads/TestLimitLatch.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.threads; + +import junit.framework.TestCase; + +public class TestLimitLatch extends TestCase { + + private volatile LimitLatch latch = null; + + @Override + public void tearDown() { + LimitLatch temp = latch; + if (temp!=null) temp.releaseAll(); + latch = null; + } + + public void testNoThreads() throws Exception { + latch = new LimitLatch(0); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + } + + public void testOneThreadNoWait() throws Exception { + latch = new LimitLatch(1); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + Thread testThread = new TestThread(); + testThread.start(); + Thread.sleep(50); + assertEquals("0 threads should be waiting", 0, + latch.getQueuedThreads().size()); + latch.countUpOrAwait(); + Thread.sleep(50); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + } + + public void testOneThreadWaitCountUp() throws Exception { + latch = new LimitLatch(1); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + Thread testThread = new TestThread(); + latch.countUpOrAwait(); + testThread.start(); + Thread.sleep(50); + assertEquals("1 threads should be waiting", 1, + latch.getQueuedThreads().size()); + latch.countDown(); + Thread.sleep(50); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + } + + public void testOneRelease() throws Exception { + latch = new LimitLatch(1); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + Thread testThread = new TestThread(); + latch.countUpOrAwait(); + testThread.start(); + Thread.sleep(50); + assertEquals("1 threads should be waiting", 1, + latch.getQueuedThreads().size()); + latch.releaseAll(); + Thread.sleep(50); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + } + + public void testTenWait() throws Exception { + latch = new LimitLatch(10); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + Thread[] testThread = new TestThread[30]; + for (int i = 0; i < 30; i++) { + testThread[i] = new TestThread(1000); + testThread[i].start(); + } + Thread.sleep(50); + assertEquals("20 threads should be waiting", 20, + latch.getQueuedThreads().size()); + Thread.sleep(1000); + assertEquals("10 threads should be waiting", 10, + latch.getQueuedThreads().size()); + Thread.sleep(1000); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + } + + private class TestThread extends Thread { + + private int holdTime; + + public TestThread() { + this(100); + } + + public TestThread(int holdTime) { + this.holdTime = holdTime; + } + + @Override + public void run() { + try { + latch.countUpOrAwait(); + Thread.sleep(holdTime); + latch.countDown(); + } catch (InterruptedException x) { + x.printStackTrace(); + } + } + } +} diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index fd49f6c09..ba5bcf813 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -82,6 +82,10 @@ Include a comment header in generated java files that indicates when the file was generated and which version of Tomcat generated it. (markt) + + 51240: Ensure that maxConnections limit is enforced when + multiple acceptor threads are configured. (markt) +