View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.ReadableByteChannel;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.WritableByteChannel;
37  import java.util.List;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  import javax.net.ssl.SSLSession;
41  
42  import org.apache.hc.core5.http.ConnectionClosedException;
43  import org.apache.hc.core5.http.ContentLengthStrategy;
44  import org.apache.hc.core5.http.EndpointDetails;
45  import org.apache.hc.core5.http.EntityDetails;
46  import org.apache.hc.core5.http.Header;
47  import org.apache.hc.core5.http.HttpConnection;
48  import org.apache.hc.core5.http.HttpException;
49  import org.apache.hc.core5.http.HttpMessage;
50  import org.apache.hc.core5.http.Message;
51  import org.apache.hc.core5.http.ProtocolVersion;
52  import org.apache.hc.core5.http.config.CharCodingConfig;
53  import org.apache.hc.core5.http.config.H1Config;
54  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
55  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
56  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
57  import org.apache.hc.core5.http.impl.CharCodingSupport;
58  import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
59  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
60  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
61  import org.apache.hc.core5.http.nio.CapacityChannel;
62  import org.apache.hc.core5.http.nio.ContentDecoder;
63  import org.apache.hc.core5.http.nio.ContentEncoder;
64  import org.apache.hc.core5.http.nio.NHttpMessageParser;
65  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
66  import org.apache.hc.core5.http.nio.SessionInputBuffer;
67  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
68  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
69  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
70  import org.apache.hc.core5.io.CloseMode;
71  import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
72  import org.apache.hc.core5.reactor.Command;
73  import org.apache.hc.core5.reactor.EventMask;
74  import org.apache.hc.core5.reactor.ProtocolIOSession;
75  import org.apache.hc.core5.reactor.ssl.TlsDetails;
76  import org.apache.hc.core5.util.Args;
77  import org.apache.hc.core5.util.Identifiable;
78  import org.apache.hc.core5.util.Timeout;
79  
80  abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
81          implements Identifiable, HttpConnection {
82  
83      private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
84  
85      private final ProtocolIOSession ioSession;
86      private final H1Config h1Config;
87      private final SessionInputBufferImpl inbuf;
88      private final SessionOutputBufferImpl outbuf;
89      private final BasicHttpTransportMetrics inTransportMetrics;
90      private final BasicHttpTransportMetrics outTransportMetrics;
91      private final BasicHttpConnectionMetrics connMetrics;
92      private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
93      private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
94      private final ContentLengthStrategy incomingContentStrategy;
95      private final ContentLengthStrategy outgoingContentStrategy;
96      private final AtomicInteger inputWindow;
97      private final ByteBuffer contentBuffer;
98      private final AtomicInteger outputRequests;
99  
100     private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
101     private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
102     private volatile ConnectionState connState;
103 
104     private volatile ProtocolVersion version;
105     private volatile EndpointDetails endpointDetails;
106 
107     AbstractHttp1StreamDuplexer(
108             final ProtocolIOSession ioSession,
109             final H1Config h1Config,
110             final CharCodingConfig charCodingConfig,
111             final NHttpMessageParser<IncomingMessage> incomingMessageParser,
112             final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
113             final ContentLengthStrategy incomingContentStrategy,
114             final ContentLengthStrategy outgoingContentStrategy) {
115         this.ioSession = Args.notNull(ioSession, "I/O session");
116         this.h1Config = h1Config != null ? h1Config : H1Config.DEFAULT;
117         final int bufferSize = this.h1Config.getBufferSize();
118         this.inbuf = new SessionInputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
119                 this.h1Config.getMaxLineLength(),
120                 CharCodingSupport.createDecoder(charCodingConfig));
121         this.outbuf = new SessionOutputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
122                 CharCodingSupport.createEncoder(charCodingConfig));
123         this.inTransportMetrics = new BasicHttpTransportMetrics();
124         this.outTransportMetrics = new BasicHttpTransportMetrics();
125         this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
126         this.incomingMessageParser = incomingMessageParser;
127         this.outgoingMessageWriter = outgoingMessageWriter;
128         this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
129                 DefaultContentLengthStrategy.INSTANCE;
130         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
131                 DefaultContentLengthStrategy.INSTANCE;
132         this.inputWindow = new AtomicInteger(0);
133         this.contentBuffer = ByteBuffer.allocate(this.h1Config.getBufferSize());
134         this.outputRequests = new AtomicInteger(0);
135         this.connState = ConnectionState.READY;
136     }
137 
138     @Override
139     public String getId() {
140         return ioSession.getId();
141     }
142 
143     void shutdownSession(final CloseMode closeMode) {
144         if (closeMode == CloseMode.GRACEFUL) {
145             connState = ConnectionState.GRACEFUL_SHUTDOWN;
146             ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
147         } else {
148             connState = ConnectionState.SHUTDOWN;
149             ioSession.close();
150         }
151     }
152 
153     void shutdownSession(final Exception exception) {
154         connState = ConnectionState.SHUTDOWN;
155         try {
156             terminate(exception);
157         } finally {
158             ioSession.close();
159         }
160     }
161 
162     abstract void disconnected();
163 
164     abstract void terminate(final Exception exception);
165 
166     abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
167 
168     abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
169 
170     abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
171 
172     abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
173 
174     abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
175 
176     abstract ContentDecoder createContentDecoder(
177             long contentLength,
178             ReadableByteChannel channel,
179             SessionInputBuffer buffer,
180             BasicHttpTransportMetrics metrics) throws HttpException;
181 
182     abstract ContentEncoder createContentEncoder(
183             long contentLength,
184             WritableByteChannel channel,
185             SessionOutputBuffer buffer,
186             BasicHttpTransportMetrics metrics) throws HttpException;
187 
188     abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
189 
190     abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
191 
192     abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
193 
194     abstract boolean isOutputReady();
195 
196     abstract void produceOutput() throws HttpException, IOException;
197 
198     abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
199 
200     abstract void inputEnd() throws HttpException, IOException;
201 
202     abstract void outputEnd() throws HttpException, IOException;
203 
204     abstract boolean inputIdle();
205 
206     abstract boolean outputIdle();
207 
208     abstract boolean handleTimeout();
209 
210     private void processCommands() throws HttpException, IOException {
211         for (;;) {
212             final Command command = ioSession.poll();
213             if (command == null) {
214                 return;
215             }
216             if (command instanceof ShutdownCommand) {
217                 final ShutdownCommand../org/apache/hc/core5/http/nio/command/ShutdownCommand.html#ShutdownCommand">ShutdownCommand shutdownCommand = (ShutdownCommand) command;
218                 requestShutdown(shutdownCommand.getType());
219             } else if (command instanceof RequestExecutionCommand) {
220                 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
221                     command.cancel();
222                 } else {
223                     execute((RequestExecutionCommand) command);
224                     return;
225                 }
226             } else {
227                 throw new HttpException("Unexpected command: " + command.getClass());
228             }
229         }
230     }
231 
232     public final void onConnect(final ByteBuffer prefeed) throws HttpException, IOException {
233         if (prefeed != null) {
234             inbuf.put(prefeed);
235         }
236         connState = ConnectionState.ACTIVE;
237         processCommands();
238     }
239 
240     private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
241         for (;;) {
242             final int current = window.get();
243             final long newValue = (long) current + delta;
244             if (Math.abs(newValue) > 0x7fffffffL) {
245                 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
246             }
247             if (window.compareAndSet(current, (int) newValue)) {
248                 return (int) newValue;
249             }
250         }
251     }
252 
253     public final void onInput() throws HttpException, IOException {
254         while (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
255             int totalBytesRead = 0;
256             int messagesReceived = 0;
257             if (incomingMessage == null) {
258 
259                 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle()) {
260                     ioSession.clearEvent(SelectionKey.OP_READ);
261                     return;
262                 }
263 
264                 int bytesRead;
265                 do {
266                     bytesRead = inbuf.fill(ioSession.channel());
267                     if (bytesRead > 0) {
268                         totalBytesRead += bytesRead;
269                         inTransportMetrics.incrementBytesTransferred(bytesRead);
270                     }
271                     final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, bytesRead == -1);
272                     if (messageHead != null) {
273                         messagesReceived++;
274                         incomingMessageParser.reset();
275 
276                         this.version = messageHead.getVersion();
277 
278                         updateInputMetrics(messageHead, connMetrics);
279                         final ContentDecoder contentDecoder;
280                         if (handleIncomingMessage(messageHead)) {
281                             final long len = incomingContentStrategy.determineLength(messageHead);
282                             contentDecoder = createContentDecoder(len, ioSession.channel(), inbuf, inTransportMetrics);
283                             consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
284                         } else {
285                             consumeHeader(messageHead, null);
286                             contentDecoder = null;
287                         }
288                         inputWindow.set(h1Config.getInitialWindowSize());
289                         if (contentDecoder != null) {
290                             incomingMessage = new Message<>(messageHead, contentDecoder);
291                             break;
292                         }
293                         inputEnd();
294                         if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
295                             ioSession.setEvent(SelectionKey.OP_READ);
296                         } else {
297                             break;
298                         }
299                     }
300                 } while (bytesRead > 0);
301 
302                 if (bytesRead == -1 && !inbuf.hasData()) {
303                     if (outputIdle() && inputIdle()) {
304                         requestShutdown(CloseMode.GRACEFUL);
305                     } else {
306                         shutdownSession(new ConnectionClosedException("Connection closed by peer"));
307                     }
308                     return;
309                 }
310             }
311 
312             if (incomingMessage != null) {
313                 final ContentDecoder contentDecoder = incomingMessage.getBody();
314 
315                 // At present the consumer can be forced to consume data
316                 // over its declared capacity in order to avoid having
317                 // unprocessed message body content stuck in the session
318                 // input buffer
319                 int bytesRead;
320                 while ((bytesRead = contentDecoder.read(contentBuffer)) > 0) {
321                     totalBytesRead += bytesRead;
322                     contentBuffer.flip();
323                     consumeData(contentBuffer);
324                     contentBuffer.clear();
325                     final int capacity = updateWindow(inputWindow, -bytesRead);
326                     if (capacity <= 0) {
327                         if (!contentDecoder.isCompleted()) {
328                             ioSession.clearEvent(SelectionKey.OP_READ);
329                             updateCapacity(new CapacityChannel() {
330 
331                                 @Override
332                                 public void update(final int increment) throws IOException {
333                                     if (increment > 0) {
334                                         final int capacity = inputWindow.get();
335                                         final int remaining = capacity > 0 ? Integer.MAX_VALUE - capacity : Integer.MAX_VALUE;
336                                         final int chunk = Math.min(increment, remaining);
337                                         updateWindow(inputWindow, chunk);
338                                         requestSessionInput();
339                                     }
340                                 }
341 
342                             });
343                         }
344                         break;
345                     }
346                 }
347                 if (contentDecoder.isCompleted()) {
348                     dataEnd(contentDecoder.getTrailers());
349                     incomingMessage = null;
350                     ioSession.setEvent(SelectionKey.OP_READ);
351                     inputEnd();
352                 }
353             }
354             if (totalBytesRead == 0 && messagesReceived == 0) {
355                 break;
356             }
357         }
358     }
359 
360     public final void onOutput() throws IOException, HttpException {
361         ioSession.lock().lock();
362         try {
363             if (outbuf.hasData()) {
364                 final int bytesWritten = outbuf.flush(ioSession.channel());
365                 if (bytesWritten > 0) {
366                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
367                 }
368             }
369         } finally {
370             ioSession.lock().unlock();
371         }
372         if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
373             produceOutput();
374             final int pendingOutputRequests = outputRequests.get();
375             final boolean outputPending = isOutputReady();
376             final boolean outputEnd;
377             ioSession.lock().lock();
378             try {
379                 if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
380                     ioSession.clearEvent(SelectionKey.OP_WRITE);
381                 } else {
382                     outputRequests.addAndGet(-pendingOutputRequests);
383                 }
384                 outputEnd = outgoingMessage == null && !outbuf.hasData();
385             } finally {
386                 ioSession.lock().unlock();
387             }
388             if (outputEnd) {
389                 outputEnd();
390                 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
391                     processCommands();
392                 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
393                     connState = ConnectionState.SHUTDOWN;
394                 }
395             }
396         }
397         if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
398             ioSession.close();
399         }
400     }
401 
402     public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
403         if (!handleTimeout()) {
404             onException(SocketTimeoutExceptionFactory.create(timeout));
405         }
406     }
407 
408     public final void onException(final Exception ex) {
409         shutdownSession(ex);
410         for (;;) {
411             final Command command = ioSession.poll();
412             if (command != null) {
413                 if (command instanceof RequestExecutionCommand) {
414                     final AsyncClientExchangeHandler exchangeHandler = ((RequestExecutionCommand) command).getExchangeHandler();
415                     exchangeHandler.failed(ex);
416                     exchangeHandler.releaseResources();
417                 } else {
418                     command.cancel();
419                 }
420             } else {
421                 break;
422             }
423         }
424     }
425 
426     public final void onDisconnect() {
427         disconnected();
428         for (;;) {
429             final Command command = ioSession.poll();
430             if (command != null) {
431                 if (command instanceof RequestExecutionCommand) {
432                     final AsyncClientExchangeHandler exchangeHandler = ((RequestExecutionCommand) command).getExchangeHandler();
433                     exchangeHandler.failed(new ConnectionClosedException());
434                     exchangeHandler.releaseResources();
435                 } else {
436                     command.cancel();
437                 }
438             } else {
439                 break;
440             }
441         }
442     }
443 
444     void requestShutdown(final CloseMode closeMode) {
445         switch (closeMode) {
446             case GRACEFUL:
447                 if (connState == ConnectionState.ACTIVE) {
448                     connState = ConnectionState.GRACEFUL_SHUTDOWN;
449                 }
450                 break;
451             case IMMEDIATE:
452                 connState = ConnectionState.SHUTDOWN;
453                 break;
454         }
455         ioSession.setEvent(SelectionKey.OP_WRITE);
456     }
457 
458     void commitMessageHead(
459             final OutgoingMessage messageHead,
460             final boolean endStream,
461             final FlushMode flushMode) throws HttpException, IOException {
462         ioSession.lock().lock();
463         try {
464             outgoingMessageWriter.write(messageHead, outbuf);
465             updateOutputMetrics(messageHead, connMetrics);
466             if (!endStream) {
467                 final ContentEncoder contentEncoder;
468                 if (handleOutgoingMessage(messageHead)) {
469                     final long len = outgoingContentStrategy.determineLength(messageHead);
470                     contentEncoder = createContentEncoder(len, ioSession.channel(), outbuf, outTransportMetrics);
471                 } else {
472                     contentEncoder = null;
473                 }
474                 if (contentEncoder != null) {
475                     outgoingMessage = new Message<>(messageHead, contentEncoder);
476                 }
477             }
478             outgoingMessageWriter.reset();
479             if (flushMode == FlushMode.IMMEDIATE) {
480                 outbuf.flush(ioSession.channel());
481             }
482             ioSession.setEvent(EventMask.WRITE);
483         } finally {
484             ioSession.lock().unlock();
485         }
486     }
487 
488     void requestSessionInput() {
489         ioSession.setEvent(SelectionKey.OP_READ);
490     }
491 
492     void requestSessionOutput() {
493         outputRequests.incrementAndGet();
494         ioSession.setEvent(SelectionKey.OP_WRITE);
495     }
496 
497     Timeout getSessionTimeout() {
498         return ioSession.getSocketTimeout();
499     }
500 
501     void setSessionTimeout(final Timeout timeout) {
502         ioSession.setSocketTimeout(timeout);
503     }
504 
505     void suspendSessionOutput() throws IOException {
506         ioSession.lock().lock();
507         try {
508             if (outbuf.hasData()) {
509                 outbuf.flush(ioSession.channel());
510             } else {
511                 ioSession.clearEvent(SelectionKey.OP_WRITE);
512             }
513         } finally {
514             ioSession.lock().unlock();
515         }
516     }
517 
518     int streamOutput(final ByteBuffer src) throws IOException {
519         ioSession.lock().lock();
520         try {
521             if (outgoingMessage == null) {
522                 throw new ClosedChannelException();
523             }
524             final ContentEncoder contentEncoder = outgoingMessage.getBody();
525             final int bytesWritten = contentEncoder.write(src);
526             if (bytesWritten > 0) {
527                 ioSession.setEvent(SelectionKey.OP_WRITE);
528             }
529             return bytesWritten;
530         } finally {
531             ioSession.lock().unlock();
532         }
533     }
534 
535     enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
536 
537     MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
538         ioSession.lock().lock();
539         try {
540             if (outgoingMessage == null) {
541                 return MessageDelineation.NONE;
542             }
543             final ContentEncoder contentEncoder = outgoingMessage.getBody();
544             contentEncoder.complete(trailers);
545             ioSession.setEvent(SelectionKey.OP_WRITE);
546             outgoingMessage = null;
547             return contentEncoder instanceof ChunkEncoder
548                             ? MessageDelineation.CHUNK_CODED
549                             : MessageDelineation.MESSAGE_HEAD;
550         } finally {
551             ioSession.lock().unlock();
552         }
553     }
554 
555     boolean isOutputCompleted() {
556         ioSession.lock().lock();
557         try {
558             if (outgoingMessage == null) {
559                 return true;
560             }
561             final ContentEncoder contentEncoder = outgoingMessage.getBody();
562             return contentEncoder.isCompleted();
563         } finally {
564             ioSession.lock().unlock();
565         }
566     }
567 
568     @Override
569     public void close() throws IOException {
570         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
571     }
572 
573     @Override
574     public void close(final CloseMode closeMode) {
575         ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
576     }
577 
578     @Override
579     public boolean isOpen() {
580         return connState == ConnectionState.ACTIVE;
581     }
582 
583     @Override
584     public Timeout getSocketTimeout() {
585         return ioSession.getSocketTimeout();
586     }
587 
588     @Override
589     public void setSocketTimeout(final Timeout timeout) {
590         ioSession.setSocketTimeout(timeout);
591     }
592 
593     @Override
594     public EndpointDetails getEndpointDetails() {
595         if (endpointDetails == null) {
596             endpointDetails = new BasicEndpointDetails(
597                     ioSession.getRemoteAddress(),
598                     ioSession.getLocalAddress(),
599                     connMetrics,
600                     ioSession.getSocketTimeout());
601         }
602         return endpointDetails;
603     }
604 
605     @Override
606     public ProtocolVersion getProtocolVersion() {
607         return version;
608     }
609 
610     @Override
611     public SocketAddress getRemoteAddress() {
612         return ioSession.getRemoteAddress();
613     }
614 
615     @Override
616     public SocketAddress getLocalAddress() {
617         return ioSession.getLocalAddress();
618     }
619 
620     @Override
621     public SSLSession getSSLSession() {
622         final TlsDetails tlsDetails = ioSession.getTlsDetails();
623         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
624     }
625 
626     void appendState(final StringBuilder buf) {
627         buf.append("connState=").append(connState)
628                 .append(", inbuf=").append(inbuf)
629                 .append(", outbuf=").append(outbuf)
630                 .append(", inputWindow=").append(inputWindow);
631     }
632 
633 }