View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.net.SocketTimeoutException;
33  import java.nio.ByteBuffer;
34  import java.nio.channels.ClosedChannelException;
35  import java.nio.channels.ReadableByteChannel;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.WritableByteChannel;
38  import java.util.List;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import javax.net.ssl.SSLContext;
44  import javax.net.ssl.SSLSession;
45  
46  import org.apache.hc.core5.http.ConnectionClosedException;
47  import org.apache.hc.core5.http.ContentLengthStrategy;
48  import org.apache.hc.core5.http.EndpointDetails;
49  import org.apache.hc.core5.http.EntityDetails;
50  import org.apache.hc.core5.http.Header;
51  import org.apache.hc.core5.http.HttpException;
52  import org.apache.hc.core5.http.HttpMessage;
53  import org.apache.hc.core5.http.Message;
54  import org.apache.hc.core5.http.ProtocolVersion;
55  import org.apache.hc.core5.http.config.CharCodingConfig;
56  import org.apache.hc.core5.http.config.H1Config;
57  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
58  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
59  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
60  import org.apache.hc.core5.http.impl.CharCodingSupport;
61  import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
62  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
63  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
64  import org.apache.hc.core5.http.nio.CapacityChannel;
65  import org.apache.hc.core5.http.nio.ContentDecoder;
66  import org.apache.hc.core5.http.nio.ContentEncoder;
67  import org.apache.hc.core5.http.nio.NHttpMessageParser;
68  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
69  import org.apache.hc.core5.http.nio.ResourceHolder;
70  import org.apache.hc.core5.http.nio.SessionInputBuffer;
71  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
72  import org.apache.hc.core5.http.nio.command.ExecutionCommand;
73  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
74  import org.apache.hc.core5.io.ShutdownType;
75  import org.apache.hc.core5.reactor.Command;
76  import org.apache.hc.core5.reactor.EventMask;
77  import org.apache.hc.core5.reactor.IOEventHandler;
78  import org.apache.hc.core5.reactor.TlsCapableIOSession;
79  import org.apache.hc.core5.reactor.ssl.SSLBufferManagement;
80  import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
81  import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
82  import org.apache.hc.core5.reactor.ssl.TlsDetails;
83  import org.apache.hc.core5.util.Args;
84  import org.apache.hc.core5.util.Identifiable;
85  
86  abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
87          implements Identifiable, ResourceHolder, UpgradeableHttpConnection {
88  
89      private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
90  
91      private final TlsCapableIOSession ioSession;
92      private final H1Config h1Config;
93      private final SessionInputBufferImpl inbuf;
94      private final SessionOutputBufferImpl outbuf;
95      private final BasicHttpTransportMetrics inTransportMetrics;
96      private final BasicHttpTransportMetrics outTransportMetrics;
97      private final BasicHttpConnectionMetrics connMetrics;
98      private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
99      private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
100     private final ContentLengthStrategy incomingContentStrategy;
101     private final ContentLengthStrategy outgoingContentStrategy;
102     private final ByteBuffer contentBuffer;
103     private final Lock outputLock;
104     private final AtomicInteger outputRequests;
105 
106     private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
107     private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
108     private volatile ConnectionState connState = ConnectionState.READY;
109 
110     private volatile ProtocolVersion version;
111     private volatile EndpointDetails endpointDetails;
112 
113     AbstractHttp1StreamDuplexer(
114             final TlsCapableIOSession ioSession,
115             final H1Config h1Config,
116             final CharCodingConfig charCodingConfig,
117             final NHttpMessageParser<IncomingMessage> incomingMessageParser,
118             final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
119             final ContentLengthStrategy incomingContentStrategy,
120             final ContentLengthStrategy outgoingContentStrategy) {
121         this.ioSession = Args.notNull(ioSession, "I/O session");
122         this.h1Config = h1Config != null ? h1Config : H1Config.DEFAULT;
123         final int bufferSize = this.h1Config.getBufferSize();
124         this.inbuf = new SessionInputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
125                 this.h1Config.getMaxLineLength(),
126                 CharCodingSupport.createDecoder(charCodingConfig));
127         this.outbuf = new SessionOutputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
128                 CharCodingSupport.createEncoder(charCodingConfig));
129         this.inTransportMetrics = new BasicHttpTransportMetrics();
130         this.outTransportMetrics = new BasicHttpTransportMetrics();
131         this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
132         this.incomingMessageParser = incomingMessageParser;
133         this.outgoingMessageWriter = outgoingMessageWriter;
134         this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
135                 DefaultContentLengthStrategy.INSTANCE;
136         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
137                 DefaultContentLengthStrategy.INSTANCE;
138         this.contentBuffer = ByteBuffer.allocate(this.h1Config.getBufferSize());
139         this.outputLock = new ReentrantLock();
140         this.outputRequests = new AtomicInteger(0);
141         this.connState = ConnectionState.READY;
142     }
143 
144     @Override
145     public String getId() {
146         return ioSession.getId();
147     }
148 
149     void shutdownSession(final ShutdownType shutdownType) {
150         if (shutdownType == ShutdownType.GRACEFUL) {
151             connState = ConnectionState.GRACEFUL_SHUTDOWN;
152             ioSession.addLast(new ShutdownCommand(ShutdownType.GRACEFUL));
153         } else {
154             connState = ConnectionState.SHUTDOWN;
155             ioSession.close();
156         }
157     }
158 
159     void shutdownSession(final Exception exception) {
160         connState = ConnectionState.SHUTDOWN;
161         try {
162             terminate(exception);
163         } finally {
164             ioSession.close();
165         }
166     }
167 
168     abstract void disconnected();
169 
170     abstract void terminate(final Exception exception);
171 
172     abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
173 
174     abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
175 
176     abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
177 
178     abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
179 
180     abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
181 
182     abstract ContentDecoder createContentDecoder(
183             long contentLength,
184             ReadableByteChannel channel,
185             SessionInputBuffer buffer,
186             BasicHttpTransportMetrics metrics) throws HttpException;
187 
188     abstract ContentEncoder createContentEncoder(
189             long contentLength,
190             WritableByteChannel channel,
191             SessionOutputBuffer buffer,
192             BasicHttpTransportMetrics metrics) throws HttpException;
193 
194     abstract int consumeData(ByteBuffer src) throws HttpException, IOException;
195 
196     abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
197 
198     abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
199 
200     abstract boolean isOutputReady();
201 
202     abstract void produceOutput() throws HttpException, IOException;
203 
204     abstract void execute(ExecutionCommand executionCommand) throws HttpException, IOException;
205 
206     abstract void inputEnd() throws HttpException, IOException;
207 
208     abstract void outputEnd() throws HttpException, IOException;
209 
210     abstract boolean inputIdle();
211 
212     abstract boolean outputIdle();
213 
214     abstract boolean handleTimeout();
215 
216     private void processCommands() throws HttpException, IOException {
217         for (;;) {
218             final Command command = ioSession.getCommand();
219             if (command == null) {
220                 return;
221             }
222             if (command instanceof ShutdownCommand) {
223                 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
224                 requestShutdown(shutdownCommand.getType());
225             } else if (command instanceof ExecutionCommand) {
226                 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
227                     command.cancel();
228                 } else {
229                     execute((ExecutionCommand) command);
230                     return;
231                 }
232             } else {
233                 throw new HttpException("Unexpected command: " + command.getClass());
234             }
235         }
236     }
237 
238     public final void onConnect(final ByteBuffer prefeed) throws HttpException, IOException {
239         if (prefeed != null) {
240             inbuf.put(prefeed);
241         }
242         connState = ConnectionState.ACTIVE;
243         processCommands();
244     }
245 
246     public final void onInput() throws HttpException, IOException {
247         while (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
248             int totalBytesRead = 0;
249             int messagesReceived = 0;
250             if (incomingMessage == null) {
251 
252                 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle()) {
253                     ioSession.clearEvent(SelectionKey.OP_READ);
254                     return;
255                 }
256 
257                 int bytesRead;
258                 do {
259                     bytesRead = inbuf.fill(ioSession.channel());
260                     if (bytesRead > 0) {
261                         totalBytesRead += bytesRead;
262                         inTransportMetrics.incrementBytesTransferred(bytesRead);
263                     }
264                     final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, bytesRead == -1);
265                     if (messageHead != null) {
266                         messagesReceived++;
267                         incomingMessageParser.reset();
268 
269                         this.version = messageHead.getVersion();
270 
271                         updateInputMetrics(messageHead, connMetrics);
272                         final ContentDecoder contentDecoder;
273                         if (handleIncomingMessage(messageHead)) {
274                             final long len = incomingContentStrategy.determineLength(messageHead);
275                             contentDecoder = createContentDecoder(len, ioSession.channel(), inbuf, inTransportMetrics);
276                             consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
277                         } else {
278                             consumeHeader(messageHead, null);
279                             contentDecoder = null;
280                         }
281                         if (contentDecoder != null) {
282                             incomingMessage = new Message<>(messageHead, contentDecoder);
283                             break;
284                         } else {
285                             inputEnd();
286                             if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
287                                 ioSession.setEvent(SelectionKey.OP_READ);
288                             } else {
289                                 break;
290                             }
291                         }
292                     }
293                 } while (bytesRead > 0);
294 
295                 if (bytesRead == -1 && !inbuf.hasData()) {
296                     if (outputIdle() && inputIdle()) {
297                         requestShutdown(ShutdownType.IMMEDIATE);
298                     } else {
299                         shutdownSession(new ConnectionClosedException("Connection closed by peer"));
300                     }
301                     return;
302                 }
303             }
304 
305             if (incomingMessage != null) {
306                 final ContentDecoder contentDecoder = incomingMessage.getBody();
307 
308                 int bytesRead;
309                 while ((bytesRead = contentDecoder.read(contentBuffer)) > 0) {
310                     if (bytesRead > 0) {
311                         totalBytesRead += bytesRead;
312                     }
313                     contentBuffer.flip();
314                     final int capacity = consumeData(contentBuffer);
315                     contentBuffer.clear();
316                     if (capacity <= 0) {
317                         if (!contentDecoder.isCompleted()) {
318                             ioSession.clearEvent(SelectionKey.OP_READ);
319                             updateCapacity(new CapacityChannel() {
320 
321                                 @Override
322                                 public void update(final int increment) throws IOException {
323                                     if (increment > 0) {
324                                         requestSessionInput();
325                                     }
326                                 }
327 
328                             });
329                         }
330                         break;
331                     }
332                 }
333                 if (contentDecoder.isCompleted()) {
334                     dataEnd(contentDecoder.getTrailers());
335                     incomingMessage = null;
336                     ioSession.setEvent(SelectionKey.OP_READ);
337                     inputEnd();
338                 }
339             }
340             if (totalBytesRead == 0 && messagesReceived == 0) {
341                 break;
342             }
343         }
344     }
345 
346     public final void onOutput() throws IOException, HttpException {
347         outputLock.lock();
348         try {
349             if (outbuf.hasData()) {
350                 final int bytesWritten = outbuf.flush(ioSession.channel());
351                 if (bytesWritten > 0) {
352                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
353                 }
354             }
355         } finally {
356             outputLock.unlock();
357         }
358         if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
359             if (isOutputReady()) {
360                 produceOutput();
361             } else {
362                 final int pendingOutputRequests = outputRequests.get();
363                 final boolean outputPending;
364                 outputLock.lock();
365                 try {
366                     outputPending = outbuf.hasData();
367                 } finally {
368                     outputLock.unlock();
369                 }
370                 if (!outputPending && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
371                     ioSession.clearEvent(SelectionKey.OP_WRITE);
372                 } else {
373                     outputRequests.addAndGet(-pendingOutputRequests);
374                 }
375             }
376 
377             outputLock.lock();
378             final boolean outputEnd;
379             try {
380                 outputEnd = outgoingMessage == null && !outbuf.hasData();
381             } finally {
382                 outputLock.unlock();
383             }
384             if (outputEnd) {
385                 outputEnd();
386                 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
387                     processCommands();
388                 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
389                     connState = ConnectionState.SHUTDOWN;
390                 }
391             }
392         }
393         if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
394             ioSession.close();
395             releaseResources();
396         }
397     }
398 
399     public final void onTimeout() throws IOException, HttpException {
400         if (!handleTimeout()) {
401             onException(new SocketTimeoutException());
402         }
403     }
404 
405     public final void onException(final Exception ex) {
406         shutdownSession(ex);
407         for (;;) {
408             final Command command = ioSession.getCommand();
409             if (command != null) {
410                 if (command instanceof ExecutionCommand) {
411                     final AsyncClientExchangeHandler exchangeHandler = ((ExecutionCommand) command).getExchangeHandler();
412                     exchangeHandler.failed(ex);
413                     exchangeHandler.releaseResources();
414                 } else {
415                     command.cancel();
416                 }
417             } else {
418                 break;
419             }
420         }
421     }
422 
423     public final void onDisconnect() {
424         disconnected();
425         for (;;) {
426             final Command command = ioSession.getCommand();
427             if (command != null) {
428                 if (command instanceof ExecutionCommand) {
429                     final AsyncClientExchangeHandler exchangeHandler = ((ExecutionCommand) command).getExchangeHandler();
430                     exchangeHandler.failed(new ConnectionClosedException("Connection closed"));
431                     exchangeHandler.releaseResources();
432                 } else {
433                     command.cancel();
434                 }
435             } else {
436                 break;
437             }
438         }
439         releaseResources();
440     }
441 
442     void requestShutdown(final ShutdownType shutdownType) {
443         switch (shutdownType) {
444             case GRACEFUL:
445                 if (connState == ConnectionState.ACTIVE) {
446                     connState = ConnectionState.GRACEFUL_SHUTDOWN;
447                 }
448                 break;
449             case IMMEDIATE:
450                 connState = ConnectionState.SHUTDOWN;
451                 break;
452         }
453         ioSession.setEvent(SelectionKey.OP_WRITE);
454     }
455 
456     void commitMessageHead(final OutgoingMessage messageHead, final boolean endStream) throws HttpException, IOException {
457         outputLock.lock();
458         try {
459             outgoingMessageWriter.write(messageHead, outbuf);
460             updateOutputMetrics(messageHead, connMetrics);
461             if (!endStream) {
462                 final ContentEncoder contentEncoder;
463                 if (handleOutgoingMessage(messageHead)) {
464                     final long len = outgoingContentStrategy.determineLength(messageHead);
465                     contentEncoder = createContentEncoder(len, ioSession.channel(), outbuf, outTransportMetrics);
466                 } else {
467                     contentEncoder = null;
468                 }
469                 if (contentEncoder != null) {
470                     outgoingMessage = new Message<>(messageHead, contentEncoder);
471                 }
472             }
473             outgoingMessageWriter.reset();
474             ioSession.setEvent(EventMask.WRITE);
475         } finally {
476             outputLock.unlock();
477         }
478     }
479 
480     void requestSessionInput() {
481         ioSession.setEvent(SelectionKey.OP_READ);
482     }
483 
484     void suspendSessionInput() {
485         ioSession.clearEvent(SelectionKey.OP_READ);
486     }
487 
488     void requestSessionOutput() {
489         outputRequests.incrementAndGet();
490         ioSession.setEvent(SelectionKey.OP_WRITE);
491     }
492 
493     int getSessionTimeout() {
494         return ioSession.getSocketTimeout();
495     }
496 
497     void setSessionTimeout(final int timeout) {
498         ioSession.setSocketTimeout(timeout);
499     }
500 
501     void suspendSessionOutput() {
502         ioSession.clearEvent(SelectionKey.OP_WRITE);
503     }
504 
505     int streamOutput(final ByteBuffer src) throws IOException {
506         outputLock.lock();
507         try {
508             if (outgoingMessage == null) {
509                 throw new ClosedChannelException();
510             }
511             final ContentEncoder contentEncoder = outgoingMessage.getBody();
512             final int bytesWritten = contentEncoder.write(src);
513             if (bytesWritten > 0) {
514                 ioSession.setEvent(SelectionKey.OP_WRITE);
515             }
516             return bytesWritten;
517         } finally {
518             outputLock.unlock();
519         }
520     }
521 
522     enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
523 
524     MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
525         outputLock.lock();
526         try {
527             if (outgoingMessage == null) {
528                 return MessageDelineation.NONE;
529             }
530             final ContentEncoder contentEncoder = outgoingMessage.getBody();
531             contentEncoder.complete(trailers);
532             ioSession.setEvent(SelectionKey.OP_WRITE);
533             outgoingMessage = null;
534             if (contentEncoder instanceof ChunkEncoder) {
535                 return MessageDelineation.CHUNK_CODED;
536             } else {
537                 return MessageDelineation.MESSAGE_HEAD;
538             }
539         } finally {
540             outputLock.unlock();
541         }
542     }
543 
544     boolean isOutputCompleted() {
545         outputLock.lock();
546         try {
547             if (outgoingMessage == null) {
548                 return true;
549             }
550             final ContentEncoder contentEncoder = outgoingMessage.getBody();
551             return contentEncoder.isCompleted();
552         } finally {
553             outputLock.unlock();
554         }
555     }
556 
557     @Override
558     public void close() throws IOException {
559         ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
560     }
561 
562     @Override
563     public void shutdown(final ShutdownType shutdownType) {
564         ioSession.addFirst(new ShutdownCommand(shutdownType));
565     }
566 
567     @Override
568     public boolean isOpen() {
569         return connState == ConnectionState.ACTIVE;
570     }
571 
572     @Override
573     public void setSocketTimeout(final int timeout) {
574         ioSession.setSocketTimeout(timeout);
575     }
576 
577     @Override
578     public EndpointDetails getEndpointDetails() {
579         if (endpointDetails == null) {
580             endpointDetails = new BasicEndpointDetails(ioSession.getRemoteAddress(), ioSession.getLocalAddress(), connMetrics);
581         }
582         return endpointDetails;
583     }
584 
585     @Override
586     public int getSocketTimeout() {
587         return ioSession.getSocketTimeout();
588     }
589 
590     @Override
591     public ProtocolVersion getProtocolVersion() {
592         return version;
593     }
594 
595     @Override
596     public SocketAddress getRemoteAddress() {
597         return ioSession.getRemoteAddress();
598     }
599 
600     @Override
601     public SocketAddress getLocalAddress() {
602         return ioSession.getLocalAddress();
603     }
604 
605     @Override
606     public SSLSession getSSLSession() {
607         final TlsDetails tlsDetails = ioSession.getTlsDetails();
608         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
609     }
610 
611     @Override
612     public TlsDetails getTlsDetails() {
613         return ioSession.getTlsDetails();
614     }
615 
616     @Override
617     public void startTls(
618             final SSLContext sslContext,
619             final SSLBufferManagement sslBufferManagement,
620             final SSLSessionInitializer initializer,
621             final SSLSessionVerifier verifier) throws UnsupportedOperationException {
622         ioSession.startTls(sslContext, sslBufferManagement, initializer, verifier);
623     }
624 
625     @Override
626     public void upgrade(final IOEventHandler eventHandler) {
627         ioSession.setHandler(eventHandler);
628     }
629 
630 }