View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ReadableByteChannel;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  
38  import org.apache.hc.core5.annotation.Internal;
39  import org.apache.hc.core5.http.ConnectionClosedException;
40  import org.apache.hc.core5.http.ConnectionReuseStrategy;
41  import org.apache.hc.core5.http.ContentLengthStrategy;
42  import org.apache.hc.core5.http.EntityDetails;
43  import org.apache.hc.core5.http.Header;
44  import org.apache.hc.core5.http.HttpException;
45  import org.apache.hc.core5.http.HttpRequest;
46  import org.apache.hc.core5.http.HttpResponse;
47  import org.apache.hc.core5.http.config.CharCodingConfig;
48  import org.apache.hc.core5.http.config.H1Config;
49  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
50  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
51  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
52  import org.apache.hc.core5.http.impl.Http1StreamListener;
53  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
54  import org.apache.hc.core5.http.nio.CapacityChannel;
55  import org.apache.hc.core5.http.nio.ContentDecoder;
56  import org.apache.hc.core5.http.nio.ContentEncoder;
57  import org.apache.hc.core5.http.nio.HandlerFactory;
58  import org.apache.hc.core5.http.nio.NHttpMessageParser;
59  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
60  import org.apache.hc.core5.http.nio.SessionInputBuffer;
61  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
62  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
63  import org.apache.hc.core5.http.protocol.HttpCoreContext;
64  import org.apache.hc.core5.http.protocol.HttpProcessor;
65  import org.apache.hc.core5.io.CloseMode;
66  import org.apache.hc.core5.net.InetAddressUtils;
67  import org.apache.hc.core5.reactor.ProtocolIOSession;
68  import org.apache.hc.core5.util.Args;
69  import org.apache.hc.core5.util.Asserts;
70  import org.apache.hc.core5.util.Timeout;
71  
72  /**
73   * I/O event handler for events fired by {@link ProtocolIOSession} that implements
74   * server side HTTP/1.1 messaging protocol with full support for
75   * duplexed message transmission and message pipelining.
76   *
77   * @since 5.0
78   */
79  @Internal
80  public class ServerHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpRequest, HttpResponse> {
81  
82      private final String scheme;
83      private final HttpProcessor httpProcessor;
84      private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
85      private final H1Config h1Config;
86      private final ConnectionReuseStrategy connectionReuseStrategy;
87      private final Http1StreamListener streamListener;
88      private final Queue<ServerHttp1StreamHandler> pipeline;
89      private final Http1StreamChannel<HttpResponse> outputChannel;
90  
91      private volatile ServerHttp1StreamHandler outgoing;
92      private volatile ServerHttp1StreamHandler incoming;
93  
94      public ServerHttp1StreamDuplexer(
95              final ProtocolIOSession ioSession,
96              final HttpProcessor httpProcessor,
97              final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
98              final String scheme,
99              final H1Config h1Config,
100             final CharCodingConfig charCodingConfig,
101             final ConnectionReuseStrategy connectionReuseStrategy,
102             final NHttpMessageParser<HttpRequest> incomingMessageParser,
103             final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
104             final ContentLengthStrategy incomingContentStrategy,
105             final ContentLengthStrategy outgoingContentStrategy,
106             final Http1StreamListener streamListener) {
107         super(ioSession, h1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter, incomingContentStrategy, outgoingContentStrategy);
108         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
109         this.exchangeHandlerFactory = Args.notNull(exchangeHandlerFactory, "Exchange handler factory");
110         this.scheme = scheme;
111         this.h1Config = h1Config != null ? h1Config : H1Config.DEFAULT;
112         this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
113                 DefaultConnectionReuseStrategy.INSTANCE;
114         this.streamListener = streamListener;
115         this.pipeline = new ConcurrentLinkedQueue<>();
116         this.outputChannel = new Http1StreamChannel<HttpResponse>() {
117 
118             @Override
119             public void close() {
120                 ServerHttp1StreamDuplexer.this.close(CloseMode.GRACEFUL);
121             }
122 
123             @Override
124             public void submit(
125                     final HttpResponse response,
126                     final boolean endStream,
127                     final FlushMode flushMode) throws HttpException, IOException {
128                 if (streamListener != null) {
129                     streamListener.onResponseHead(ServerHttp1StreamDuplexer.this, response);
130                 }
131                 commitMessageHead(response, endStream, flushMode);
132             }
133 
134             @Override
135             public void requestOutput() {
136                 requestSessionOutput();
137             }
138 
139             @Override
140             public void suspendOutput() throws IOException {
141                 suspendSessionOutput();
142             }
143 
144             @Override
145             public Timeout getSocketTimeout() {
146                 return getSessionTimeout();
147             }
148 
149             @Override
150             public void setSocketTimeout(final Timeout timeout) {
151                 setSessionTimeout(timeout);
152             }
153 
154             @Override
155             public int write(final ByteBuffer src) throws IOException {
156                 return streamOutput(src);
157             }
158 
159             @Override
160             public void complete(final List<? extends Header> trailers) throws IOException {
161                 endOutputStream(trailers);
162             }
163 
164             @Override
165             public boolean isCompleted() {
166                 return isOutputCompleted();
167             }
168 
169             @Override
170             public boolean abortGracefully() throws IOException {
171                 final MessageDelineation messageDelineation = endOutputStream(null);
172                 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
173             }
174 
175             @Override
176             public void activate() throws HttpException, IOException {
177                 // empty
178             }
179 
180             @Override
181             public String toString() {
182                 return "Http1StreamChannel[" + ServerHttp1StreamDuplexer.this.toString() + "]";
183             }
184 
185         };
186     }
187 
188     @Override
189     void terminate(final Exception exception) {
190         if (incoming != null) {
191             incoming.failed(exception);
192             incoming.releaseResources();
193             incoming = null;
194         }
195         if (outgoing != null) {
196             outgoing.failed(exception);
197             outgoing.releaseResources();
198             outgoing = null;
199         }
200         for (;;) {
201             final ServerHttp1StreamHandler handler = pipeline.poll();
202             if (handler != null) {
203                 handler.failed(exception);
204                 handler.releaseResources();
205             } else {
206                 break;
207             }
208         }
209     }
210 
211     @Override
212     void disconnected() {
213         if (incoming != null) {
214             if (!incoming.isCompleted()) {
215                 incoming.failed(new ConnectionClosedException());
216             }
217             incoming.releaseResources();
218             incoming = null;
219         }
220         if (outgoing != null) {
221             if (!outgoing.isCompleted()) {
222                 outgoing.failed(new ConnectionClosedException());
223             }
224             outgoing.releaseResources();
225             outgoing = null;
226         }
227         for (;;) {
228             final ServerHttp1StreamHandler handler = pipeline.poll();
229             if (handler != null) {
230                 handler.failed(new ConnectionClosedException());
231                 handler.releaseResources();
232             } else {
233                 break;
234             }
235         }
236     }
237 
238     @Override
239     void updateInputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
240         connMetrics.incrementRequestCount();
241     }
242 
243     @Override
244     void updateOutputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
245         if (response.getCode() >= 200) {
246             connMetrics.incrementRequestCount();
247         }
248     }
249 
250     @Override
251     protected boolean handleIncomingMessage(final HttpRequest request) throws HttpException {
252         return true;
253     }
254 
255     @Override
256     protected ContentDecoder createContentDecoder(
257             final long len,
258             final ReadableByteChannel channel,
259             final SessionInputBuffer buffer,
260             final BasicHttpTransportMetrics metrics) throws HttpException {
261         if (len >= 0) {
262             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
263         } else if (len == ContentLengthStrategy.CHUNKED) {
264             return new ChunkDecoder(channel, buffer, h1Config, metrics);
265         } else {
266             return null;
267         }
268     }
269 
270     @Override
271     protected boolean handleOutgoingMessage(final HttpResponse response) throws HttpException {
272         return true;
273     }
274 
275     @Override
276     protected ContentEncoder createContentEncoder(
277             final long len,
278             final WritableByteChannel channel,
279             final SessionOutputBuffer buffer,
280             final BasicHttpTransportMetrics metrics) throws HttpException {
281         final int chunkSizeHint = h1Config.getChunkSizeHint() >= 0 ? h1Config.getChunkSizeHint() : 2048;
282         if (len >= 0) {
283             return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
284         } else if (len == ContentLengthStrategy.CHUNKED) {
285             return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
286         } else {
287             return new IdentityEncoder(channel, buffer, metrics, chunkSizeHint);
288         }
289     }
290 
291     @Override
292     boolean inputIdle() {
293         return incoming == null;
294     }
295 
296     @Override
297     boolean outputIdle() {
298         return outgoing == null && pipeline.isEmpty();
299     }
300 
301     @Override
302     void consumeHeader(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
303         if (streamListener != null) {
304             streamListener.onRequestHead(this, request);
305         }
306         final ServerHttp1StreamHandler streamHandler;
307         final HttpCoreContext context = HttpCoreContext.create();
308         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
309         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
310         if (outgoing == null) {
311             streamHandler = new ServerHttp1StreamHandler(
312                     outputChannel,
313                     httpProcessor,
314                     connectionReuseStrategy,
315                     exchangeHandlerFactory,
316                     context);
317             outgoing = streamHandler;
318         } else {
319             streamHandler = new ServerHttp1StreamHandler(
320                     new DelayedOutputChannel(outputChannel),
321                     httpProcessor,
322                     connectionReuseStrategy,
323                     exchangeHandlerFactory,
324                     context);
325             pipeline.add(streamHandler);
326         }
327         request.setScheme(scheme);
328         streamHandler.consumeHeader(request, entityDetails);
329         incoming = streamHandler;
330     }
331 
332     @Override
333     int consumeData(final ByteBuffer src) throws HttpException, IOException {
334         Asserts.notNull(incoming, "Request stream handler");
335         return incoming.consumeData(src);
336     }
337 
338     @Override
339     void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
340         Asserts.notNull(incoming, "Request stream handler");
341         incoming.updateCapacity(capacityChannel);
342     }
343 
344     @Override
345     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
346         Asserts.notNull(incoming, "Request stream handler");
347         incoming.dataEnd(trailers);
348     }
349 
350     @Override
351     void inputEnd() throws HttpException, IOException {
352         if (incoming != null) {
353             if (incoming.isCompleted()) {
354                 incoming.releaseResources();
355             }
356             incoming = null;
357         }
358     }
359 
360     @Override
361     void execute(final RequestExecutionCommand executionCommand) throws HttpException {
362         throw new HttpException("Illegal command: " + executionCommand.getClass());
363     }
364 
365     @Override
366     boolean isOutputReady() {
367         return outgoing != null && outgoing.isOutputReady();
368     }
369 
370     @Override
371     void produceOutput() throws HttpException, IOException {
372         Asserts.notNull(outgoing, "Response stream handler");
373         outgoing.produceOutput();
374     }
375 
376     @Override
377     void outputEnd() throws HttpException, IOException {
378         if (outgoing != null && outgoing.isResponseFinal()) {
379             if (streamListener != null) {
380                 streamListener.onExchangeComplete(this, isOpen());
381             }
382             if (outgoing.isCompleted()) {
383                 outgoing.releaseResources();
384             }
385             outgoing = null;
386         }
387         if (outgoing == null && isOpen()) {
388             final ServerHttp1StreamHandler handler = pipeline.poll();
389             if (handler != null) {
390                 outgoing = handler;
391                 handler.activateChannel();
392                 if (handler.isOutputReady()) {
393                     handler.produceOutput();
394                 }
395             }
396         }
397     }
398 
399     @Override
400     boolean handleTimeout() {
401         return false;
402     }
403 
404     @Override
405     public String toString() {
406         final StringBuilder buffer = new StringBuilder();
407         InetAddressUtils.formatAddress(buffer, getRemoteAddress());
408         buffer.append("->");
409         InetAddressUtils.formatAddress(buffer, getLocalAddress());
410         return buffer.toString();
411     }
412 
413     private static class DelayedOutputChannel implements Http1StreamChannel<HttpResponse> {
414 
415         private final Http1StreamChannel<HttpResponse> channel;
416 
417         private volatile boolean direct;
418         private volatile HttpResponse delayedResponse;
419         private volatile boolean completed;
420 
421         private DelayedOutputChannel(final Http1StreamChannel<HttpResponse> channel) {
422             this.channel = channel;
423         }
424 
425         @Override
426         public void close() {
427             channel.close();
428         }
429 
430         @Override
431         public void submit(
432                 final HttpResponse response,
433                 final boolean endStream,
434                 final FlushMode flushMode) throws HttpException, IOException {
435             synchronized (this) {
436                 if (direct) {
437                     channel.submit(response, endStream, flushMode);
438                 } else {
439                     delayedResponse = response;
440                     completed = endStream;
441                 }
442             }
443         }
444 
445         @Override
446         public void suspendOutput() throws IOException {
447             channel.suspendOutput();
448         }
449 
450         @Override
451         public void requestOutput() {
452             channel.requestOutput();
453         }
454 
455         @Override
456         public Timeout getSocketTimeout() {
457             return channel.getSocketTimeout();
458         }
459 
460         @Override
461         public void setSocketTimeout(final Timeout timeout) {
462             channel.setSocketTimeout(timeout);
463         }
464 
465         @Override
466         public int write(final ByteBuffer src) throws IOException {
467             synchronized (this) {
468                 return direct ? channel.write(src) : 0;
469             }
470         }
471 
472         @Override
473         public void complete(final List<? extends Header> trailers) throws IOException {
474             synchronized (this) {
475                 if (direct) {
476                     channel.complete(trailers);
477                 } else {
478                     completed = true;
479                 }
480             }
481         }
482 
483         @Override
484         public boolean abortGracefully() throws IOException {
485             synchronized (this) {
486                 if (direct) {
487                     return channel.abortGracefully();
488                 }
489                 completed = true;
490                 return true;
491             }
492         }
493 
494         @Override
495         public boolean isCompleted() {
496             synchronized (this) {
497                 return direct ? channel.isCompleted() : completed;
498             }
499         }
500 
501         @Override
502         public void activate() throws IOException, HttpException {
503             synchronized (this) {
504                 direct = true;
505                 if (delayedResponse != null) {
506                     channel.submit(delayedResponse, completed, completed ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
507                     delayedResponse = null;
508                 }
509             }
510         }
511 
512     }
513 
514 }