diff options
author | Greg Wilkins | 2015-10-07 04:10:51 +0000 |
---|---|---|
committer | Greg Wilkins | 2015-10-07 04:11:53 +0000 |
commit | b2f8192c7b23166d592c8289fd89aa7ff09873f6 (patch) | |
tree | 793cd3f659bfc4900bc1fe22f1b91038e7eb7a22 | |
parent | 80675ce1d2b6f87591f04e6c3b443083fd412b2e (diff) | |
download | org.eclipse.jetty.project-b2f8192c7b23166d592c8289fd89aa7ff09873f6.tar.gz org.eclipse.jetty.project-b2f8192c7b23166d592c8289fd89aa7ff09873f6.tar.xz org.eclipse.jetty.project-b2f8192c7b23166d592c8289fd89aa7ff09873f6.zip |
478923 - threads stuck at SharedBlockingCallback$Blocker.block
Fixed the failure path so that it also checks for non blocking callbacks
and directly calls them
6 files changed, 478 insertions, 234 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index b213a5b028..1ae1a7a0c3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -84,29 +84,42 @@ public abstract class AbstractConnection implements Connection protected void failedCallback(final Callback callback, final Throwable x) { - // TODO always dispatch failure ? - try + if (callback.isNonBlocking()) { - getExecutor().execute(new Runnable() + try { - @Override - public void run() + callback.failed(x); + } + catch (Exception e) + { + LOG.warn(e); + } + } + else + { + try + { + getExecutor().execute(new Runnable() { - try - { - callback.failed(x); - } - catch (Exception e) + @Override + public void run() { - LOG.warn(e); + try + { + callback.failed(x); + } + catch (Exception e) + { + LOG.warn(e); + } } - } - }); - } - catch(RejectedExecutionException e) - { - LOG.debug(e); - callback.failed(x); + }); + } + catch(RejectedExecutionException e) + { + LOG.debug(e); + callback.failed(x); + } } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 73acb763aa..0444f788ab 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -216,8 +216,8 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint isOpen()?"Open":"CLOSED", isInputShutdown()?"ISHUT":"in", isOutputShutdown()?"OSHUT":"out", - _fillInterest.isInterested()?"R":"-", - _writeFlusher.isInProgress()?"W":"-", + _fillInterest.toStateString(), + _writeFlusher.toStateString(), getIdleFor(), getIdleTimeout(), getConnection()==null?null:getConnection().getClass().getSimpleName()); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java index 4eff032a8a..b5c48c4b08 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java @@ -138,7 +138,13 @@ public abstract class FillInterest @Override public String toString() { - return String.format("FillInterest@%x{%b,%s}", hashCode(), _interested.get(), _interested.get()); + return String.format("FillInterest@%x{%b,%s}", hashCode(), _interested.get()!=null, _interested.get()); + } + + + public String toStateString() + { + return _interested.get()==null?"-":"FI"; } /** diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 612fa5603d..f2dea7d8c3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -522,4 +522,23 @@ abstract public class WriteFlusher { return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get()); } + + public String toStateString() + { + switch(_state.get().getType()) + { + case WRITING: + return "W"; + case PENDING: + return "P"; + case COMPLETING: + return "C"; + case IDLE: + return "-"; + case FAILED: + return "F"; + default: + return "?"; + } + } } diff --git a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DefaultServletStarvationTest.java b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DefaultServletStarvationTest.java deleted file mode 100644 index e090808579..0000000000 --- a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DefaultServletStarvationTest.java +++ /dev/null @@ -1,213 +0,0 @@ -//
-// ========================================================================
-// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
-// ------------------------------------------------------------------------
-// All rights reserved. This program and the accompanying materials
-// are made available under the terms of the Eclipse Public License v1.0
-// and Apache License v2.0 which accompanies this distribution.
-//
-// The Eclipse Public License is available at
-// http://www.eclipse.org/legal/epl-v10.html
-//
-// The Apache License v2.0 is available at
-// http://www.opensource.org/licenses/apache2.0.php
-//
-// You may elect to redistribute this code under either of these licenses.
-// ========================================================================
-//
-
-package org.eclipse.jetty.servlets;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.eclipse.jetty.io.ManagedSelector;
-import org.eclipse.jetty.io.SelectChannelEndPoint;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
-import org.eclipse.jetty.toolchain.test.TestTracker;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class DefaultServletStarvationTest
-{
- @Rule
- public TestTracker tracker = new TestTracker();
- private Server _server;
-
- @After
- public void dispose() throws Exception
- {
- if (_server != null)
- _server.stop();
- }
-
- @Test
- public void testDefaultServletStarvation() throws Exception
- {
- int maxThreads = 2;
- QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
- threadPool.setDetailedDump(true);
- _server = new Server(threadPool);
-
- // Prepare a big file to download.
- File directory = MavenTestingUtils.getTargetTestingDir();
- Files.createDirectories(directory.toPath());
- String resourceName = "resource.bin";
- Path resourcePath = Paths.get(directory.getPath(), resourceName);
- try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE))
- {
- byte[] chunk = new byte[1024];
- Arrays.fill(chunk,(byte)'X');
- chunk[chunk.length-2]='\r';
- chunk[chunk.length-1]='\n';
- for (int i = 0; i < 256 * 1024; ++i)
- output.write(chunk);
- }
-
- final CountDownLatch writePending = new CountDownLatch(1);
- ServerConnector connector = new ServerConnector(_server, 0, 1)
- {
- @Override
- protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
- {
- return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
- {
- @Override
- protected void onIncompleteFlush()
- {
- super.onIncompleteFlush();
- writePending.countDown();
- }
- };
- }
- };
- _server.addConnector(connector);
-
- ServletContextHandler context = new ServletContextHandler(_server, "/");
- context.setResourceBase(directory.toURI().toString());
- context.addServlet(DefaultServlet.class, "/*").setAsyncSupported(false);
- _server.setHandler(context);
-
- _server.start();
-
- List<Socket> sockets = new ArrayList<>();
- for (int i = 0; i < maxThreads; ++i)
- {
- Socket socket = new Socket("localhost", connector.getLocalPort());
- sockets.add(socket);
- OutputStream output = socket.getOutputStream();
- String request = "" +
- "GET /" + resourceName + " HTTP/1.1\r\n" +
- "Host: localhost\r\n" +
-// "Connection: close\r\n" +
- "\r\n";
- output.write(request.getBytes(StandardCharsets.UTF_8));
- output.flush();
- Thread.sleep(100);
- }
-
-
- // Wait for a the servlet to block.
- Assert.assertTrue(writePending.await(5, TimeUnit.SECONDS));
-
- Thread.sleep(1000);
- _server.dumpStdErr();
- Thread.sleep(1000);
-
-
- ScheduledFuture<?> dumper = Executors.newSingleThreadScheduledExecutor().schedule(new Runnable()
- {
- @Override
- public void run()
- {
- _server.dumpStdErr();
- }
- }, 10, TimeUnit.SECONDS);
-
-
- long expected = Files.size(resourcePath);
- byte[] buffer = new byte[48 * 1024];
- for (Socket socket : sockets)
- {
- String socketString = socket.toString();
- long total = 0;
- InputStream input = socket.getInputStream();
-
- // look for CRLFCRLF
- StringBuilder header = new StringBuilder();
- int state=0;
- while (state<4 && header.length()<2048)
- {
- int ch=input.read();
- if (ch<0)
- break;
- header.append((char)ch);
- switch(state)
- {
- case 0:
- if (ch=='\r')
- state=1;
- break;
- case 1:
- if (ch=='\n')
- state=2;
- else
- state=0;
- break;
- case 2:
- if (ch=='\r')
- state=3;
- else
- state=0;
- break;
- case 3:
- if (ch=='\n')
- state=4;
- else
- state=0;
- break;
- }
- }
-
- while (total<expected)
- {
- int read=input.read(buffer);
- if (read<0)
- break;
- total+=read;
- }
-
- Assert.assertEquals(expected,total);
- }
-
- dumper.cancel(false);
-
- // We could read everything, good.
- for (Socket socket : sockets)
- socket.close();
- }
-}
diff --git a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/ThreadStarvationTest.java b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/ThreadStarvationTest.java new file mode 100644 index 0000000000..9243ca7ccb --- /dev/null +++ b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/ThreadStarvationTest.java @@ -0,0 +1,419 @@ +//
+// ========================================================================
+// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.servlets;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.io.ManagedSelector;
+import org.eclipse.jetty.io.SelectChannelEndPoint;
+import org.eclipse.jetty.server.HttpChannel;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
+import org.eclipse.jetty.toolchain.test.TestTracker;
+import org.eclipse.jetty.toolchain.test.annotation.Slow;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.StdErrLog;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class ThreadStarvationTest
+{
+ @Rule
+ public TestTracker tracker = new TestTracker();
+ private Server _server;
+
+ @After
+ public void dispose() throws Exception
+ {
+ if (_server != null)
+ _server.stop();
+ }
+
+ @Test
+ @Slow
+ public void testDefaultServletSuccess() throws Exception
+ {
+ int maxThreads = 10;
+ QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
+ threadPool.setDetailedDump(true);
+ _server = new Server(threadPool);
+
+ // Prepare a big file to download.
+ File directory = MavenTestingUtils.getTargetTestingDir();
+ Files.createDirectories(directory.toPath());
+ String resourceName = "resource.bin";
+ Path resourcePath = Paths.get(directory.getPath(), resourceName);
+ try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE))
+ {
+ byte[] chunk = new byte[1024];
+ Arrays.fill(chunk,(byte)'X');
+ chunk[chunk.length-2]='\r';
+ chunk[chunk.length-1]='\n';
+ for (int i = 0; i < 256 * 1024; ++i)
+ output.write(chunk);
+ }
+
+ final CountDownLatch writePending = new CountDownLatch(1);
+ ServerConnector connector = new ServerConnector(_server, 0, 1)
+ {
+ @Override
+ protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
+ {
+ return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
+ {
+ @Override
+ protected void onIncompleteFlush()
+ {
+ super.onIncompleteFlush();
+ writePending.countDown();
+ }
+ };
+ }
+ };
+ connector.setIdleTimeout(Long.MAX_VALUE);
+ _server.addConnector(connector);
+
+ ServletContextHandler context = new ServletContextHandler(_server, "/");
+ context.setResourceBase(directory.toURI().toString());
+ context.addServlet(DefaultServlet.class, "/*").setAsyncSupported(false);
+ _server.setHandler(context);
+
+ _server.start();
+
+ List<Socket> sockets = new ArrayList<>();
+ for (int i = 0; i < maxThreads*2; ++i)
+ {
+ Socket socket = new Socket("localhost", connector.getLocalPort());
+ sockets.add(socket);
+ OutputStream output = socket.getOutputStream();
+ String request = "" +
+ "GET /" + resourceName + " HTTP/1.1\r\n" +
+ "Host: localhost\r\n" +
+ "\r\n";
+ output.write(request.getBytes(StandardCharsets.UTF_8));
+ output.flush();
+ Thread.sleep(100);
+ }
+
+ // Wait for a the servlet to block.
+ Assert.assertTrue(writePending.await(5, TimeUnit.SECONDS));
+
+ long expected = Files.size(resourcePath);
+ byte[] buffer = new byte[48 * 1024];
+ List<Exchanger<Long>> totals = new ArrayList<>();
+ for (Socket socket : sockets)
+ {
+ final Exchanger<Long> x = new Exchanger<>();
+ totals.add(x);
+ final InputStream input = socket.getInputStream();
+
+ new Thread()
+ {
+ @Override
+ public void run()
+ {
+ long total=0;
+ try
+ {
+ // look for CRLFCRLF
+ StringBuilder header = new StringBuilder();
+ int state=0;
+ while (state<4 && header.length()<2048)
+ {
+ int ch=input.read();
+ if (ch<0)
+ break;
+ header.append((char)ch);
+ switch(state)
+ {
+ case 0:
+ if (ch=='\r')
+ state=1;
+ break;
+ case 1:
+ if (ch=='\n')
+ state=2;
+ else
+ state=0;
+ break;
+ case 2:
+ if (ch=='\r')
+ state=3;
+ else
+ state=0;
+ break;
+ case 3:
+ if (ch=='\n')
+ state=4;
+ else
+ state=0;
+ break;
+ }
+ }
+
+ while (total<expected)
+ {
+ int read=input.read(buffer);
+ if (read<0)
+ break;
+ total+=read;
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ try
+ {
+ x.exchange(total);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }.start();
+ }
+
+ for (Exchanger<Long> x : totals)
+ {
+ Long total = x.exchange(-1L,10000,TimeUnit.SECONDS);
+ Assert.assertEquals(expected,total.longValue());
+ }
+
+ // We could read everything, good.
+ for (Socket socket : sockets)
+ socket.close();
+ }
+
+ @Test
+ public void testFailureStarvation() throws Exception
+ {
+ try
+ {
+ ((StdErrLog)Log.getLogger(HttpChannel.class)).setHideStacks(true);
+
+ int acceptors = 0;
+ int selectors = 1;
+ int maxThreads = 10;
+ final int barried=maxThreads-acceptors-selectors;
+ final CyclicBarrier barrier = new CyclicBarrier(barried);
+
+
+ QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
+ threadPool.setDetailedDump(true);
+ _server = new Server(threadPool);
+
+
+ ServerConnector connector = new ServerConnector(_server, acceptors, selectors)
+ {
+ @Override
+ protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
+ {
+ return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
+ {
+
+ @Override
+ public boolean flush(ByteBuffer... buffers) throws IOException
+ {
+ super.flush(buffers[0]);
+ throw new IOException("TEST FAILURE");
+ }
+
+ };
+ }
+ };
+ connector.setIdleTimeout(Long.MAX_VALUE);
+ _server.addConnector(connector);
+
+ final AtomicInteger count = new AtomicInteger(0);
+ _server.setHandler(new AbstractHandler()
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ int c=count.getAndIncrement();
+ try
+ {
+ if (c<barried)
+ {
+ barrier.await(10,TimeUnit.SECONDS);
+ }
+ }
+ catch (InterruptedException | BrokenBarrierException | TimeoutException e)
+ {
+ throw new ServletException(e);
+ }
+ baseRequest.setHandled(true);
+ response.setStatus(200);
+ response.setContentLength(13);
+ response.getWriter().print("Hello World!\n");
+ response.getWriter().flush();
+ }
+ });
+
+ _server.start();
+
+ List<Socket> sockets = new ArrayList<>();
+ for (int i = 0; i < maxThreads*2; ++i)
+ {
+ Socket socket = new Socket("localhost", connector.getLocalPort());
+ sockets.add(socket);
+ OutputStream output = socket.getOutputStream();
+ String request = "" +
+ "GET / HTTP/1.1\r\n" +
+ "Host: localhost\r\n" +
+ // "Connection: close\r\n" +
+ "\r\n";
+ output.write(request.getBytes(StandardCharsets.UTF_8));
+ output.flush();
+ }
+
+
+ byte[] buffer = new byte[48 * 1024];
+ List<Exchanger<Integer>> totals = new ArrayList<>();
+ for (Socket socket : sockets)
+ {
+ final Exchanger<Integer> x = new Exchanger<>();
+ totals.add(x);
+ final InputStream input = socket.getInputStream();
+
+ new Thread()
+ {
+ @Override
+ public void run()
+ {
+ int read=0;
+ try
+ {
+ // look for CRLFCRLF
+ StringBuilder header = new StringBuilder();
+ int state=0;
+ while (state<4 && header.length()<2048)
+ {
+ int ch=input.read();
+ if (ch<0)
+ break;
+ header.append((char)ch);
+ switch(state)
+ {
+ case 0:
+ if (ch=='\r')
+ state=1;
+ break;
+ case 1:
+ if (ch=='\n')
+ state=2;
+ else
+ state=0;
+ break;
+ case 2:
+ if (ch=='\r')
+ state=3;
+ else
+ state=0;
+ break;
+ case 3:
+ if (ch=='\n')
+ state=4;
+ else
+ state=0;
+ break;
+ }
+ }
+
+ read=input.read(buffer);
+ }
+ catch (IOException e)
+ {
+ // e.printStackTrace();
+ }
+ finally
+ {
+ try
+ {
+ x.exchange(read);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }.start();
+ }
+
+ for (Exchanger<Integer> x : totals)
+ {
+ Integer read = x.exchange(-1,10,TimeUnit.SECONDS);
+ Assert.assertEquals(-1,read.intValue());
+ }
+
+ // We could read everything, good.
+ for (Socket socket : sockets)
+ socket.close();
+
+ _server.stop();
+ }
+ finally
+ {
+ ((StdErrLog)Log.getLogger(HttpChannel.class)).setHideStacks(false);
+ }
+ }
+}
|