View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ReadableByteChannel;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  
38  import org.apache.hc.core5.http.ConnectionClosedException;
39  import org.apache.hc.core5.http.ConnectionReuseStrategy;
40  import org.apache.hc.core5.http.ContentLengthStrategy;
41  import org.apache.hc.core5.http.EntityDetails;
42  import org.apache.hc.core5.http.Header;
43  import org.apache.hc.core5.http.HttpException;
44  import org.apache.hc.core5.http.HttpRequest;
45  import org.apache.hc.core5.http.HttpResponse;
46  import org.apache.hc.core5.http.config.CharCodingConfig;
47  import org.apache.hc.core5.http.config.H1Config;
48  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
49  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
50  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
51  import org.apache.hc.core5.http.impl.Http1StreamListener;
52  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
53  import org.apache.hc.core5.http.nio.CapacityChannel;
54  import org.apache.hc.core5.http.nio.ContentDecoder;
55  import org.apache.hc.core5.http.nio.ContentEncoder;
56  import org.apache.hc.core5.http.nio.HandlerFactory;
57  import org.apache.hc.core5.http.nio.NHttpMessageParser;
58  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
59  import org.apache.hc.core5.http.nio.SessionInputBuffer;
60  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
61  import org.apache.hc.core5.http.nio.command.ExecutionCommand;
62  import org.apache.hc.core5.http.protocol.HttpCoreContext;
63  import org.apache.hc.core5.http.protocol.HttpProcessor;
64  import org.apache.hc.core5.io.ShutdownType;
65  import org.apache.hc.core5.net.InetAddressUtils;
66  import org.apache.hc.core5.reactor.TlsCapableIOSession;
67  import org.apache.hc.core5.util.Args;
68  import org.apache.hc.core5.util.Asserts;
69  
70  public class ServerHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpRequest, HttpResponse> {
71  
72      private final String scheme;
73      private final HttpProcessor httpProcessor;
74      private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
75      private final H1Config h1Config;
76      private final ConnectionReuseStrategy connectionReuseStrategy;
77      private final Http1StreamListener streamListener;
78      private final Queue<ServerHttp1StreamHandler> pipeline;
79      private final Http1StreamChannel<HttpResponse> outputChannel;
80  
81      private volatile ServerHttp1StreamHandler outgoing;
82      private volatile ServerHttp1StreamHandler incoming;
83  
84      public ServerHttp1StreamDuplexer(
85              final TlsCapableIOSession ioSession,
86              final HttpProcessor httpProcessor,
87              final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
88              final String scheme,
89              final H1Config h1Config,
90              final CharCodingConfig charCodingConfig,
91              final ConnectionReuseStrategy connectionReuseStrategy,
92              final NHttpMessageParser<HttpRequest> incomingMessageParser,
93              final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
94              final ContentLengthStrategy incomingContentStrategy,
95              final ContentLengthStrategy outgoingContentStrategy,
96              final Http1StreamListener streamListener) {
97          super(ioSession, h1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter, incomingContentStrategy, outgoingContentStrategy);
98          this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
99          this.exchangeHandlerFactory = Args.notNull(exchangeHandlerFactory, "Exchange handler factory");
100         this.scheme = scheme;
101         this.h1Config = h1Config != null ? h1Config : H1Config.DEFAULT;
102         this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
103                 DefaultConnectionReuseStrategy.INSTANCE;
104         this.streamListener = streamListener;
105         this.pipeline = new ConcurrentLinkedQueue<>();
106         this.outputChannel = new Http1StreamChannel<HttpResponse>() {
107 
108             @Override
109             public void close() {
110                 shutdown(ShutdownType.IMMEDIATE);
111             }
112 
113             @Override
114             public void submit(final HttpResponse response, final boolean endStream) throws HttpException, IOException {
115                 if (streamListener != null) {
116                     streamListener.onResponseHead(ServerHttp1StreamDuplexer.this, response);
117                 }
118                 commitMessageHead(response, endStream);
119             }
120 
121             @Override
122             public void requestOutput() {
123                 requestSessionOutput();
124             }
125 
126             @Override
127             public void suspendOutput() {
128                 suspendSessionOutput();
129             }
130 
131             @Override
132             public int getSocketTimeout() {
133                 return getSessionTimeout();
134             }
135 
136             @Override
137             public void setSocketTimeout(final int timeout) {
138                 setSessionTimeout(timeout);
139             }
140 
141             @Override
142             public int write(final ByteBuffer src) throws IOException {
143                 return streamOutput(src);
144             }
145 
146             @Override
147             public void complete(final List<? extends Header> trailers) throws IOException {
148                 endOutputStream(trailers);
149             }
150 
151             @Override
152             public boolean isCompleted() {
153                 return isOutputCompleted();
154             }
155 
156             @Override
157             public boolean abortGracefully() throws IOException {
158                 final MessageDelineation messageDelineation = endOutputStream(null);
159                 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
160             }
161 
162             @Override
163             public void activate() throws HttpException, IOException {
164             }
165 
166         };
167     }
168 
169     @Override
170     public void releaseResources() {
171         if (incoming != null) {
172             incoming.releaseResources();
173             incoming = null;
174         }
175         if (outgoing != null) {
176             outgoing.releaseResources();
177             outgoing = null;
178         }
179         for (;;) {
180             final ServerHttp1StreamHandler handler = pipeline.poll();
181             if (handler != null) {
182                 handler.releaseResources();
183             } else {
184                 break;
185             }
186         }
187     }
188 
189     @Override
190     void terminate(final Exception exception) {
191         if (incoming != null) {
192             incoming.failed(exception);
193             incoming.releaseResources();
194             incoming = null;
195         }
196         if (outgoing != null) {
197             outgoing.failed(exception);
198             outgoing.releaseResources();
199             outgoing = null;
200         }
201         for (;;) {
202             final ServerHttp1StreamHandler handler = pipeline.poll();
203             if (handler != null) {
204                 handler.failed(exception);
205                 handler.releaseResources();
206             } else {
207                 break;
208             }
209         }
210     }
211 
212     @Override
213     void disconnected() {
214         if (incoming != null) {
215             if (!incoming.isCompleted()) {
216                 incoming.failed(new ConnectionClosedException("Connection closed"));
217             }
218             incoming.releaseResources();
219             incoming = null;
220         }
221         if (outgoing != null) {
222             if (!outgoing.isCompleted()) {
223                 outgoing.failed(new ConnectionClosedException("Connection closed"));
224             }
225             outgoing.releaseResources();
226             outgoing = null;
227         }
228         for (;;) {
229             final ServerHttp1StreamHandler handler = pipeline.poll();
230             if (handler != null) {
231                 handler.failed(new ConnectionClosedException("Connection closed"));
232                 handler.releaseResources();
233             } else {
234                 break;
235             }
236         }
237     }
238 
239     @Override
240     void updateInputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
241         connMetrics.incrementRequestCount();
242     }
243 
244     @Override
245     void updateOutputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
246         if (response.getCode() >= 200) {
247             connMetrics.incrementRequestCount();
248         }
249     }
250 
251     @Override
252     protected boolean handleIncomingMessage(final HttpRequest request) throws HttpException {
253         return true;
254     }
255 
256     @Override
257     protected ContentDecoder createContentDecoder(
258             final long len,
259             final ReadableByteChannel channel,
260             final SessionInputBuffer buffer,
261             final BasicHttpTransportMetrics metrics) throws HttpException {
262         if (len >= 0) {
263             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
264         } else if (len == ContentLengthStrategy.CHUNKED) {
265             return new ChunkDecoder(channel, buffer, h1Config, metrics);
266         } else {
267             return null;
268         }
269     }
270 
271     @Override
272     protected boolean handleOutgoingMessage(final HttpResponse response) throws HttpException {
273         return true;
274     }
275 
276     @Override
277     protected ContentEncoder createContentEncoder(
278             final long len,
279             final WritableByteChannel channel,
280             final SessionOutputBuffer buffer,
281             final BasicHttpTransportMetrics metrics) throws HttpException {
282         if (len >= 0) {
283             return new LengthDelimitedEncoder(channel, buffer, metrics, len, h1Config.getChunkSizeHint());
284         } else if (len == ContentLengthStrategy.CHUNKED) {
285             final int chunkSizeHint = h1Config.getChunkSizeHint() >= 0 ? h1Config.getChunkSizeHint() : 2048;
286             return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
287         } else {
288             return new IdentityEncoder(channel, buffer, metrics, h1Config.getChunkSizeHint());
289         }
290     }
291 
292     @Override
293     boolean inputIdle() {
294         return incoming == null;
295     }
296 
297     @Override
298     boolean outputIdle() {
299         return outgoing == null && pipeline.isEmpty();
300     }
301 
302     @Override
303     void consumeHeader(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
304         if (streamListener != null) {
305             streamListener.onRequestHead(this, request);
306         }
307         final ServerHttp1StreamHandler streamHandler;
308         final HttpCoreContext context = HttpCoreContext.create();
309         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
310         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
311         if (outgoing == null) {
312             streamHandler = new ServerHttp1StreamHandler(
313                     outputChannel,
314                     httpProcessor,
315                     connectionReuseStrategy,
316                     exchangeHandlerFactory,
317                     context);
318             outgoing = streamHandler;
319         } else {
320             streamHandler = new ServerHttp1StreamHandler(
321                     new DelayedOutputChannel(outputChannel),
322                     httpProcessor,
323                     connectionReuseStrategy,
324                     exchangeHandlerFactory,
325                     context);
326             pipeline.add(streamHandler);
327         }
328         request.setScheme(scheme);
329         streamHandler.consumeHeader(request, entityDetails);
330         incoming = streamHandler;
331     }
332 
333     @Override
334     int consumeData(final ByteBuffer src) throws HttpException, IOException {
335         Asserts.notNull(incoming, "Request stream handler");
336         return incoming.consumeData(src);
337     }
338 
339     @Override
340     void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
341         Asserts.notNull(incoming, "Request stream handler");
342         incoming.updateCapacity(capacityChannel);
343     }
344 
345     @Override
346     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
347         Asserts.notNull(incoming, "Request stream handler");
348         incoming.dataEnd(trailers);
349     }
350 
351     @Override
352     void inputEnd() throws HttpException, IOException {
353         if (incoming != null) {
354             if (incoming.isCompleted()) {
355                 incoming.releaseResources();
356             }
357             incoming = null;
358         }
359     }
360 
361     @Override
362     void execute(final ExecutionCommand executionCommand) throws HttpException {
363         throw new HttpException("Illegal command: " + executionCommand.getClass());
364     }
365 
366     @Override
367     boolean isOutputReady() {
368         return outgoing != null && outgoing.isOutputReady();
369     }
370 
371     @Override
372     void produceOutput() throws HttpException, IOException {
373         Asserts.notNull(outgoing, "Response stream handler");
374         outgoing.produceOutput();
375     }
376 
377     @Override
378     void outputEnd() throws HttpException, IOException {
379         if (outgoing != null && outgoing.isResponseFinal()) {
380             if (streamListener != null) {
381                 streamListener.onExchangeComplete(this, isOpen());
382             }
383             if (outgoing.isCompleted()) {
384                 outgoing.releaseResources();
385             }
386             outgoing = null;
387         }
388         if (outgoing == null && isOpen()) {
389             final ServerHttp1StreamHandler handler = pipeline.poll();
390             if (handler != null) {
391                 outgoing = handler;
392                 handler.activateChannel();
393                 if (handler.isOutputReady()) {
394                     handler.produceOutput();
395                 }
396             }
397         }
398     }
399 
400     @Override
401     boolean handleTimeout() {
402         return false;
403     }
404 
405     @Override
406     public String toString() {
407         final StringBuilder buffer = new StringBuilder();
408         InetAddressUtils.formatAddress(buffer, getRemoteAddress());
409         buffer.append("->");
410         InetAddressUtils.formatAddress(buffer, getLocalAddress());
411         return buffer.toString();
412     }
413 
414     private static class DelayedOutputChannel implements Http1StreamChannel<HttpResponse> {
415 
416         private final Http1StreamChannel<HttpResponse> channel;
417 
418         private volatile boolean direct;
419         private volatile HttpResponse delayedResponse;
420         private volatile boolean completed;
421 
422         private DelayedOutputChannel(final Http1StreamChannel<HttpResponse> channel) {
423             this.channel = channel;
424         }
425 
426         @Override
427         public void close() {
428             channel.close();
429         }
430 
431         @Override
432         public void submit(final HttpResponse response, final boolean endStream) throws HttpException, IOException {
433             synchronized (this) {
434                 if (direct) {
435                     channel.submit(response, endStream);
436                 } else {
437                     delayedResponse = response;
438                     completed = endStream;
439                 }
440             }
441         }
442 
443         @Override
444         public void suspendOutput() {
445             channel.suspendOutput();
446         }
447 
448         @Override
449         public void requestOutput() {
450             channel.requestOutput();
451         }
452 
453         @Override
454         public int getSocketTimeout() {
455             return channel.getSocketTimeout();
456         }
457 
458         @Override
459         public void setSocketTimeout(final int timeout) {
460             channel.setSocketTimeout(timeout);
461         }
462 
463         @Override
464         public int write(final ByteBuffer src) throws IOException {
465             synchronized (this) {
466                 if (direct) {
467                     return channel.write(src);
468                 } else {
469                     return 0;
470                 }
471             }
472         }
473 
474         @Override
475         public void complete(final List<? extends Header> trailers) throws IOException {
476             synchronized (this) {
477                 if (direct) {
478                     channel.complete(trailers);
479                 } else {
480                     completed = true;
481                 }
482             }
483         }
484 
485         @Override
486         public boolean abortGracefully() throws IOException {
487             synchronized (this) {
488                 if (direct) {
489                     return channel.abortGracefully();
490                 } else {
491                     completed = true;
492                     return true;
493                 }
494             }
495         }
496 
497         @Override
498         public boolean isCompleted() {
499             synchronized (this) {
500                 if (direct) {
501                     return channel.isCompleted();
502                 } else {
503                     return completed;
504                 }
505             }
506         }
507 
508         @Override
509         public void activate() throws IOException, HttpException {
510             synchronized (this) {
511                 direct = true;
512                 if (delayedResponse != null) {
513                     channel.submit(delayedResponse, completed);
514                     delayedResponse = null;
515                 }
516             }
517         }
518 
519     }
520 
521 }