import org.apache.juli.logging.Log;
import org.apache.tomcat.util.IntrospectionUtils;
import org.apache.tomcat.util.res.StringManager;
-import org.apache.tomcat.util.threads.CounterLatch;
+import org.apache.tomcat.util.threads.LimitLatch;
import org.apache.tomcat.util.threads.ResizableExecutor;
import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.TaskThreadFactory;
/**
* counter for nr of connections handled by an endpoint
*/
- private volatile CounterLatch connectionCounterLatch = null;
+ private volatile LimitLatch connectionLimitLatch = null;
/**
* Socket properties
// ----------------------------------------------------------------- Properties
private int maxConnections = 10000;
- public void setMaxConnections(int maxCon) { this.maxConnections = maxCon; }
+ public void setMaxConnections(int maxCon) {
+ this.maxConnections = maxCon;
+ LimitLatch latch = this.connectionLimitLatch;
+ // Update the latch that enforces this
+ latch.setLimit(maxCon);
+ }
+
public int getMaxConnections() { return this.maxConnections; }
/**
* External Executor based thread pool.
protected abstract Log getLog();
public abstract boolean getUseSendfile();
- protected CounterLatch initializeConnectionLatch() {
- if (connectionCounterLatch==null) {
- connectionCounterLatch = new CounterLatch(0,getMaxConnections());
+ protected LimitLatch initializeConnectionLatch() {
+ if (connectionLimitLatch==null) {
+ connectionLimitLatch = new LimitLatch(getMaxConnections());
}
- return connectionCounterLatch;
+ return connectionLimitLatch;
}
protected void releaseConnectionLatch() {
- CounterLatch latch = connectionCounterLatch;
+ LimitLatch latch = connectionLimitLatch;
if (latch!=null) latch.releaseAll();
- connectionCounterLatch = null;
- }
-
- protected void awaitConnection() throws InterruptedException {
- CounterLatch latch = connectionCounterLatch;
- if (latch!=null) latch.await();
+ connectionLimitLatch = null;
}
- protected long countUpConnection() {
- CounterLatch latch = connectionCounterLatch;
- if (latch!=null) return latch.countUp();
- else return -1;
+ protected void countUpOrAwaitConnection() throws InterruptedException {
+ LimitLatch latch = connectionLimitLatch;
+ if (latch!=null) latch.countUpOrAwait();
}
protected long countDownConnection() {
- CounterLatch latch = connectionCounterLatch;
+ LimitLatch latch = connectionLimitLatch;
if (latch!=null) {
long result = latch.countDown();
if (result<0) {
}
try {
//if we have reached max connections, wait
- awaitConnection();
+ countUpOrAwaitConnection();
long socket = 0;
try {
// Successful accept, reset the error delay
errorDelay = 0;
- //increment socket count
- countUpConnection();
/*
* In the case of a deferred accept unlockAccept needs to
* send data. This data will be rubbish, so destroy the
}
try {
//if we have reached max connections, wait
- awaitConnection();
-
+ countUpOrAwaitConnection();
+
Socket socket = null;
try {
// Accept the next incoming connection from the server
} catch (IOException e) {
// Ignore
}
- } else {
- countUpConnection();
}
} else {
// Close socket right away
}
try {
//if we have reached max connections, wait
- awaitConnection();
+ countUpOrAwaitConnection();
SocketChannel socket = null;
try {
if (log.isDebugEnabled())
log.debug("", ix);
}
- } else {
- countUpConnection();
}
}
} catch (SocketTimeoutException sx) {
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomcat.util.threads;
-
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-/**
- * Simple counter latch that allows code to keep an up and down counter, and waits while the latch holds a certain wait value.
- * and threads using the latch to wait if the count has reached a certain value.
- * The counter latch can be used to keep track of an atomic counter, since the operations {@link #countDown()}
- * and {@link #countUp()} are atomic.
- * When the latch reaches the wait value, threads will block. The counter latch can hence act like a
- * count down latch or a count up latch, while letting you keep track of the counter as well.
- * This counter latch works opposite as the java.util.concurrent.CountDownLatch, since the CounterLatch only blocks on a single value and releases the threads on all other values.
- * @author fhanik
- * @see <a href="http://download.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html">CountDownLatch</a>
- *
- */
-public class CounterLatch {
-
- private class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 1L;
-
- public Sync() {
- }
-
- @Override
- protected int tryAcquireShared(int arg) {
- return ((!released) && count.get() == signal) ? -1 : 1;
- }
-
- @Override
- protected boolean tryReleaseShared(int arg) {
- return true;
- }
- }
-
- private final Sync sync;
- private final AtomicLong count;
- private volatile long signal;
- private volatile boolean released = false;
-
- /**
- * Instantiates a CounterLatch object with an initial value and a wait value.
- * @param initial - initial value of the counter
- * @param waitValue - when the counter holds this value,
- * threads calling {@link #await()} or {@link #await(long, TimeUnit)}
- * will wait until the counter changes value or until they are interrupted.
- */
- public CounterLatch(long initial, long waitValue) {
- this.signal = waitValue;
- this.count = new AtomicLong(initial);
- this.sync = new Sync();
- }
-
- /**
- * Causes the calling thread to wait if the counter holds the waitValue.
- * If the counter holds any other value, the thread will return
- * If the thread is interrupted or becomes interrupted an InterruptedException is thrown
- * @throws InterruptedException
- */
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
-
- /**
- * Causes the calling thread to wait if the counter holds the waitValue.
- * If the counter holds any other value, the thread will return
- * If the thread is interrupted or becomes interrupted an InterruptedException is thrown
- * @return true if the value changed, false if the timeout has elapsed
- * @throws InterruptedException
- */
- public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- }
-
- /**
- * Increments the counter
- * @return the previous counter value
- */
- public long countUp() {
- long previous = count.getAndIncrement();
- if (previous == signal) {
- sync.releaseShared(0);
- }
- return previous;
- }
-
- /**
- * Decrements the counter
- * @return the previous counter value
- */
- public long countDown() {
- long previous = count.getAndDecrement();
- if (previous == signal) {
- sync.releaseShared(0);
- }
- return previous;
- }
-
- /**
- * Returns the current counter value
- * @return the current counter value
- */
- public long getCount() {
- return count.get();
- }
-
- /**
- * Performs an atomic update of the counter
- * If the operation is successful and {@code expect==waitValue && expect!=update} waiting threads will be released.
- * @param expect - the expected counter value
- * @param update - the new counter value
- * @return <code>true</code> if successful, <code>false</code> if the
- * current value wasn't as expected
- */
- public boolean compareAndSet(long expect, long update) {
- boolean result = count.compareAndSet(expect, update);
- if (result && expect==signal && expect != update) {
- sync.releaseShared(0);
- }
- return result;
- }
-
- /**
- * returns true if there are threads blocked by this latch
- * @return true if there are threads blocked by this latch
- */
- public boolean hasQueuedThreads() {
- return sync.hasQueuedThreads();
- }
-
- /**
- * Returns a collection of the blocked threads
- * @return a collection of the blocked threads
- */
- public Collection<Thread> getQueuedThreads() {
- return sync.getQueuedThreads();
- }
-
- /**
- * releases all waiting threads. This operation is permanent, and no threads will block,
- * even if the counter hits the {@code waitValue} until {@link #reset(long)} has been called.
- * @return <code>true</code> if this release of shared mode may permit a
- * waiting acquire (shared or exclusive) to succeed; and
- * <code>false</code> otherwise
- */
- public boolean releaseAll() {
- released = true;
- return sync.releaseShared(0);
- }
-
- /**
- * Resets the latch and initializes the counter with the new value.
- * @param value the new counter value
- * @see #releaseAll()
- */
- public void reset(long value) {
- this.count.set(value);
- released = false;
- }
-
-}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.threads;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * Shared latch that allows the latch to be acquired a limited number of times
+ * after which all subsequent requests to acquire the latch will be placed in a
+ * FIFO queue until one of the shares is returned.
+ */
+public class LimitLatch {
+
+ private class Sync extends AbstractQueuedSynchronizer {
+ private static final long serialVersionUID = 1L;
+
+ public Sync() {
+ }
+
+ @Override
+ protected int tryAcquireShared(int ignored) {
+ long newCount = count.incrementAndGet();
+ if (!released && newCount > limit) {
+ // Limit exceeded
+ count.decrementAndGet();
+ return -1;
+ } else {
+ return 1;
+ }
+ }
+
+ @Override
+ protected boolean tryReleaseShared(int arg) {
+ count.decrementAndGet();
+ return true;
+ }
+ }
+
+ private final Sync sync;
+ private final AtomicLong count;
+ private volatile long limit;
+ private volatile boolean released = false;
+
+ /**
+ * Instantiates a LimitLatch object with an initial limit.
+ * @param limit - maximum number of concurrent acquisitions of this latch
+ */
+ public LimitLatch(long limit) {
+ this.limit = limit;
+ this.count = new AtomicLong(0);
+ this.sync = new Sync();
+ }
+
+ /**
+ * Obtain the current limit.
+ */
+ public long getLimit() {
+ return limit;
+ }
+
+
+ /**
+ * Sets a new limit. If the limit is decreased there may be a period where
+ * more shares of the latch are acquired than the limit. In this case no
+ * more shares of the latch will be issued until sufficient shares have been
+ * returned to reduce the number of acquired shares of the latch to below
+ * the new limit. If the limit is increased, threads currently in the queue
+ * may not be issued one of the newly available shares until the next
+ * request is made for a latch.
+ *
+ * @param limit The new limit
+ */
+ public void setLimit(long limit) {
+ this.limit = limit;
+ }
+
+
+ /**
+ * Acquires a shared latch if one is available or waits for one if no shared
+ * latch is current available.
+ */
+ public void countUpOrAwait() throws InterruptedException {
+ sync.acquireSharedInterruptibly(1);
+ }
+
+ /**
+ * Releases a shared latch, making it available for another thread to use.
+ * @return the previous counter value
+ */
+ public long countDown() {
+ sync.releaseShared(0);
+ return count.get();
+ }
+
+ /**
+ * Releases all waiting threads and causes the {@link #limit} to be ignored
+ * until {@link #reset()} is called.
+ */
+ public boolean releaseAll() {
+ released = true;
+ return sync.releaseShared(0);
+ }
+
+ /**
+ * Resets the latch and initializes the shared acquisition counter to zero.
+ * @see #releaseAll()
+ */
+ public void reset() {
+ this.count.set(0);
+ released = false;
+ }
+
+ /**
+ * Returns <code>true</code> if there is at least one thread waiting to
+ * acquire the shared lock, otherwise returns <code>false</code>.
+ */
+ public boolean hasQueuedThreads() {
+ return sync.hasQueuedThreads();
+ }
+
+ /**
+ * Provide access to the list of threads waiting to acquire this limited
+ * shared latch.
+ */
+ public Collection<Thread> getQueuedThreads() {
+ return sync.getQueuedThreads();
+ }
+}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomcat.util.threads;
-
-import junit.framework.TestCase;
-
-public class TestCounterLatch extends TestCase {
-
- private volatile CounterLatch latch = null;
-
- @Override
- public void tearDown() {
- CounterLatch temp = latch;
- if (temp!=null) temp.releaseAll();
- latch = null;
- }
-
- public void testNoThreads() throws Exception {
- latch = new CounterLatch(0,0);
- assertEquals("No threads should be waiting", false, latch.hasQueuedThreads());
- }
-
- public void testOneThreadNoWait() throws Exception {
- latch = new CounterLatch(0,1);
- assertEquals("No threads should be waiting", false, latch.hasQueuedThreads());
- Thread testThread = new Thread() {
- @Override
- public void run() {
- try {
- latch.await();
- } catch (InterruptedException x) {
- x.printStackTrace();
- }
- }
- };
- testThread.start();
- Thread.sleep(50);
- assertEquals("0 threads should be waiting", 0, latch.getQueuedThreads().size());
- latch.countUp();
- Thread.sleep(50);
- assertEquals("No threads should be waiting", false, latch.hasQueuedThreads());
- }
-
- public void testOneThreadWaitCountUp() throws Exception {
- latch = new CounterLatch(0,1);
- assertEquals("No threads should be waiting", false, latch.hasQueuedThreads());
- Thread testThread = new Thread() {
- @Override
- public void run() {
- try {
- latch.await();
- } catch (InterruptedException x) {
- x.printStackTrace();
- }
- }
- };
- latch.countUp();
- testThread.start();
- Thread.sleep(50);
- assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size());
- latch.countUp();
- Thread.sleep(50);
- assertEquals("No threads should be waiting", false, latch.hasQueuedThreads());
- }
-
- public void testOneThreadWaitCountDown() throws Exception {
- latch = new CounterLatch(1,0);
- assertEquals("No threads should be waiting", false, latch.hasQueuedThreads());
- Thread testThread = new Thread() {
- @Override
- public void run() {
- try {
- //System.out.println("Entering ["+Thread.currentThread().getName()+"]");
- latch.await();
- } catch (InterruptedException x) {
- x.printStackTrace();
- }
- //System.out.println("Exiting ["+Thread.currentThread().getName()+"]");
- }
- };
- latch.countDown();
- testThread.start();
- Thread.sleep(50);
- assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size());
- latch.countDown();
- Thread.sleep(50);
- assertEquals("No threads should be waiting", false, latch.hasQueuedThreads());
- }
-
- public void testOneRelease() throws Exception {
- latch = new CounterLatch(1,0);
- assertEquals("No threads should be waiting", false, latch.hasQueuedThreads());
- Thread testThread = new Thread() {
- @Override
- public void run() {
- try {
- latch.await();
- } catch (InterruptedException x) {
- x.printStackTrace();
- }
- }
- };
- latch.countDown();
- testThread.start();
- Thread.sleep(50);
- assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size());
- latch.releaseAll();
- Thread.sleep(50);
- assertEquals("No threads should be waiting", false, latch.hasQueuedThreads());
- }
-}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.threads;
+
+import junit.framework.TestCase;
+
+public class TestLimitLatch extends TestCase {
+
+ private volatile LimitLatch latch = null;
+
+ @Override
+ public void tearDown() {
+ LimitLatch temp = latch;
+ if (temp!=null) temp.releaseAll();
+ latch = null;
+ }
+
+ public void testNoThreads() throws Exception {
+ latch = new LimitLatch(0);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ }
+
+ public void testOneThreadNoWait() throws Exception {
+ latch = new LimitLatch(1);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ Thread testThread = new TestThread();
+ testThread.start();
+ Thread.sleep(50);
+ assertEquals("0 threads should be waiting", 0,
+ latch.getQueuedThreads().size());
+ latch.countUpOrAwait();
+ Thread.sleep(50);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ }
+
+ public void testOneThreadWaitCountUp() throws Exception {
+ latch = new LimitLatch(1);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ Thread testThread = new TestThread();
+ latch.countUpOrAwait();
+ testThread.start();
+ Thread.sleep(50);
+ assertEquals("1 threads should be waiting", 1,
+ latch.getQueuedThreads().size());
+ latch.countDown();
+ Thread.sleep(50);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ }
+
+ public void testOneRelease() throws Exception {
+ latch = new LimitLatch(1);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ Thread testThread = new TestThread();
+ latch.countUpOrAwait();
+ testThread.start();
+ Thread.sleep(50);
+ assertEquals("1 threads should be waiting", 1,
+ latch.getQueuedThreads().size());
+ latch.releaseAll();
+ Thread.sleep(50);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ }
+
+ public void testTenWait() throws Exception {
+ latch = new LimitLatch(10);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ Thread[] testThread = new TestThread[30];
+ for (int i = 0; i < 30; i++) {
+ testThread[i] = new TestThread(1000);
+ testThread[i].start();
+ }
+ Thread.sleep(50);
+ assertEquals("20 threads should be waiting", 20,
+ latch.getQueuedThreads().size());
+ Thread.sleep(1000);
+ assertEquals("10 threads should be waiting", 10,
+ latch.getQueuedThreads().size());
+ Thread.sleep(1000);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ }
+
+ private class TestThread extends Thread {
+
+ private int holdTime;
+
+ public TestThread() {
+ this(100);
+ }
+
+ public TestThread(int holdTime) {
+ this.holdTime = holdTime;
+ }
+
+ @Override
+ public void run() {
+ try {
+ latch.countUpOrAwait();
+ Thread.sleep(holdTime);
+ latch.countDown();
+ } catch (InterruptedException x) {
+ x.printStackTrace();
+ }
+ }
+ }
+}
Include a comment header in generated java files that indicates when the
file was generated and which version of Tomcat generated it. (markt)
</add>
+ <fix>
+ <bug>51240</bug>: Ensure that maxConnections limit is enforced when
+ multiple acceptor threads are configured. (markt)
+ </fix>
</changelog>
</subsection>
<subsection name="Cluster">