Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 82a522fc4d6584bb85af919a0aa9112d2b7bd635 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
//
//  ========================================================================
//  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.servlets;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.server.HttpOutput;

/**
 * A servlet that uses the Servlet 3.1 asynchronous IO API to server
 * static content at a limited data rate.
 * <p>
 * Two implementations are supported: <ul>
 * <li>The <code>StandardDataStream</code> impl uses only standard
 * APIs, but produces more garbage due to the byte[] nature of the API.  
 * <li>the <code>JettyDataStream</code> impl uses a Jetty API to write a ByteBuffer
 * and thus allow the efficient use of file mapped buffers without any
 * temporary buffer copies (I did tell the JSR that this was a good idea to 
 * have in the standard!).
 * </ul>
 * <p>
 * The data rate is controlled by setting init parameters:
 * <dl>
 * <dt>buffersize</dt><dd>The amount of data in bytes written per write</dd>
 * <dt>pause</dt><dd>The period in ms to wait after a write before attempting another</dd>
 * <dt>pool</dt><dd>The size of the thread pool used to service the writes (defaults to available processors)</dd>
 * </dl>
 * Thus if buffersize = 1024 and pause = 100, the data rate will be limited to 10KB per second.
 */
public class DataRateLimitedServlet extends HttpServlet
{
    private static final long serialVersionUID = -4771757707068097025L;
    private int buffersize=8192;
    private long pauseNS=TimeUnit.MILLISECONDS.toNanos(100);
    ScheduledThreadPoolExecutor scheduler;
    private final ConcurrentHashMap<String, ByteBuffer> cache=new ConcurrentHashMap<>();
    
    @Override
    public void init() throws ServletException
    {
        // read the init params
        String tmp = getInitParameter("buffersize");
        if (tmp!=null)
            buffersize=Integer.parseInt(tmp);
        tmp = getInitParameter("pause");
        if (tmp!=null)
            pauseNS=TimeUnit.MILLISECONDS.toNanos(Integer.parseInt(tmp));
        tmp = getInitParameter("pool");
        int pool=tmp==null?Runtime.getRuntime().availableProcessors():Integer.parseInt(tmp);
        
        // Create and start a shared scheduler.  
        scheduler=new ScheduledThreadPoolExecutor(pool);
    }

    @Override
    public void destroy()
    {
        scheduler.shutdown();
    }
    
    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
    {
        // Get the path of the static resource to serve.
        String info=request.getPathInfo();
                
        // We don't handle directories
        if (info.endsWith("/"))
        {
            response.sendError(503,"directories not supported");
            return;
        }

        // Set the mime type of the response
        String content_type=getServletContext().getMimeType(info);
        response.setContentType(content_type==null?"application/x-data":content_type);
        
        // Look for a matching file path
        String path = request.getPathTranslated();
        
        // If we have a file path and this is a jetty response, we can use the JettyStream impl
        ServletOutputStream out = response.getOutputStream();
        if (path != null && out instanceof HttpOutput)
        {
            // If the file exists
            File file = new File(path);
            if (file.exists() && file.canRead())
            {
                // Set the content length
                response.setContentLengthLong(file.length());
                
                // Look for a file mapped buffer in the cache
                ByteBuffer mapped=cache.get(path);
                
                // Handle cache miss
                if (mapped==null)
                {
                    // TODO implement LRU cache flush
                    try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
                    {
                        ByteBuffer buf = raf.getChannel().map(MapMode.READ_ONLY,0,raf.length());
                        mapped=cache.putIfAbsent(path,buf);
                        if (mapped==null)
                            mapped=buf;
                    }
                }

                // start async request handling
                AsyncContext async=request.startAsync();

                // Set a JettyStream as the write listener to write the content asynchronously.
                out.setWriteListener(new JettyDataStream(mapped,async,out));    
                return;
            }
        }
        
        // Jetty API was not used, so lets try the standards approach
        
        // Can we find the content as an input stream
        InputStream content = getServletContext().getResourceAsStream(info);
        if (content==null)
        {
            response.sendError(404);
            return;
        }

        // Set a StandardStream as he write listener to write the content asynchronously
        out.setWriteListener(new StandardDataStream(content,request.startAsync(),out));
    }

    /**
     * A standard API Stream writer
     */
    private final class StandardDataStream implements WriteListener, Runnable
    {
        private final InputStream content;
        private final AsyncContext async;
        private final ServletOutputStream out;

        private StandardDataStream(InputStream content, AsyncContext async, ServletOutputStream out)
        {
            this.content = content;
            this.async = async;
            this.out = out;
        }

        @Override
        public void onWritePossible() throws IOException
        {
            // If we are able to write
            if(out.isReady())
            {
                // Allocated a copy buffer for each write, so as to not hold while paused
                // TODO put these buffers into a pool
                byte[] buffer = new byte[buffersize];
                
                // read some content into the copy buffer
                int len=content.read(buffer);
                
                // If we are at EOF
                if (len<0)
                {
                    // complete the async lifecycle
                    async.complete();
                    return;
                }
                
                // write out the copy buffer.  This will be an asynchronous write
                // and will always return immediately without blocking.  If a subsequent
                // call to out.isReady() returns false, then this onWritePossible method
                // will be called back when a write is possible.
                out.write(buffer,0,len);
                
                // Schedule a timer callback to pause writing.  Because isReady() is not called,
                // a onWritePossible callback is no scheduled.
                scheduler.schedule(this,pauseNS,TimeUnit.NANOSECONDS);
            }
        }
        
        @Override 
        public void run()
        {
            try
            {
                // When the pause timer wakes up, call onWritePossible.  Either isReady() will return
                // true and another chunk of content will be written, or it will return false and the 
                // onWritePossible() callback will be scheduled when a write is next possible.
                onWritePossible();
            }
            catch(Exception e)
            {
                onError(e);
            }
        }

        @Override
        public void onError(Throwable t)
        {
            getServletContext().log("Async Error",t);
            async.complete();
        }
    }
    

    /**
     * A Jetty API DataStream
     *
     */
    private final class JettyDataStream implements WriteListener, Runnable
    {
        private final ByteBuffer content;
        private final int limit;
        private final AsyncContext async;
        private final HttpOutput out;

        private JettyDataStream(ByteBuffer content, AsyncContext async, ServletOutputStream out)
        {
            // Make a readonly copy of the passed buffer. This uses the same underlying content
            // without a copy, but gives this instance its own position and limit.
            this.content = content.asReadOnlyBuffer();
            // remember the ultimate limit.
            this.limit=this.content.limit();
            this.async = async;
            this.out = (HttpOutput)out;
        }

        @Override
        public void onWritePossible() throws IOException
        {            
            // If we are able to write
            if(out.isReady())
            {   
                // Position our buffers limit to allow only buffersize bytes to be written
                int l=content.position()+buffersize;
                // respect the ultimate limit
                if (l>limit)
                    l=limit;
                content.limit(l);

                // if all content has been written
                if (!content.hasRemaining())
                {              
                    // complete the async lifecycle
                    async.complete();
                    return;
                }
                
                // write our limited buffer.  This will be an asynchronous write
                // and will always return immediately without blocking.  If a subsequent
                // call to out.isReady() returns false, then this onWritePossible method
                // will be called back when a write is possible.
                out.write(content);

                // Schedule a timer callback to pause writing.  Because isReady() is not called,
                // a onWritePossible callback is not scheduled.
                scheduler.schedule(this,pauseNS,TimeUnit.NANOSECONDS);
            }
        }
        
        @Override 
        public void run()
        {
            try
            {
                // When the pause timer wakes up, call onWritePossible.  Either isReady() will return
                // true and another chunk of content will be written, or it will return false and the 
                // onWritePossible() callback will be scheduled when a write is next possible.
                onWritePossible();
            }
            catch(Exception e)
            {
                onError(e);
            }
        }

        @Override
        public void onError(Throwable t)
        {
            getServletContext().log("Async Error",t);
            async.complete();
        }
    }
}

Back to the top