View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.ReadableByteChannel;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.WritableByteChannel;
37  import java.util.List;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.locks.Lock;
40  import java.util.concurrent.locks.ReentrantLock;
41  
42  import javax.net.ssl.SSLSession;
43  
44  import org.apache.hc.core5.http.ConnectionClosedException;
45  import org.apache.hc.core5.http.ContentLengthStrategy;
46  import org.apache.hc.core5.http.EndpointDetails;
47  import org.apache.hc.core5.http.EntityDetails;
48  import org.apache.hc.core5.http.Header;
49  import org.apache.hc.core5.http.HttpConnection;
50  import org.apache.hc.core5.http.HttpException;
51  import org.apache.hc.core5.http.HttpMessage;
52  import org.apache.hc.core5.http.Message;
53  import org.apache.hc.core5.http.ProtocolVersion;
54  import org.apache.hc.core5.http.config.CharCodingConfig;
55  import org.apache.hc.core5.http.config.H1Config;
56  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
57  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
58  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
59  import org.apache.hc.core5.http.impl.CharCodingSupport;
60  import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
61  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
62  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
63  import org.apache.hc.core5.http.nio.CapacityChannel;
64  import org.apache.hc.core5.http.nio.ContentDecoder;
65  import org.apache.hc.core5.http.nio.ContentEncoder;
66  import org.apache.hc.core5.http.nio.NHttpMessageParser;
67  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
68  import org.apache.hc.core5.http.nio.SessionInputBuffer;
69  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
70  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
71  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
72  import org.apache.hc.core5.io.CloseMode;
73  import org.apache.hc.core5.reactor.Command;
74  import org.apache.hc.core5.reactor.EventMask;
75  import org.apache.hc.core5.reactor.ProtocolIOSession;
76  import org.apache.hc.core5.reactor.ssl.TlsDetails;
77  import org.apache.hc.core5.util.Args;
78  import org.apache.hc.core5.util.Identifiable;
79  import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
80  
81  abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
82          implements Identifiable, HttpConnection {
83  
84      private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
85  
86      private final ProtocolIOSession ioSession;
87      private final H1Config h1Config;
88      private final SessionInputBufferImpl inbuf;
89      private final SessionOutputBufferImpl outbuf;
90      private final BasicHttpTransportMetrics inTransportMetrics;
91      private final BasicHttpTransportMetrics outTransportMetrics;
92      private final BasicHttpConnectionMetrics connMetrics;
93      private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
94      private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
95      private final ContentLengthStrategy incomingContentStrategy;
96      private final ContentLengthStrategy outgoingContentStrategy;
97      private final ByteBuffer contentBuffer;
98      private final Lock outputLock;
99      private final AtomicInteger outputRequests;
100 
101     private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
102     private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
103     private volatile ConnectionState connState;
104 
105     private volatile ProtocolVersion version;
106     private volatile EndpointDetails endpointDetails;
107 
108     AbstractHttp1StreamDuplexer(
109             final ProtocolIOSession ioSession,
110             final H1Config h1Config,
111             final CharCodingConfig charCodingConfig,
112             final NHttpMessageParser<IncomingMessage> incomingMessageParser,
113             final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
114             final ContentLengthStrategy incomingContentStrategy,
115             final ContentLengthStrategy outgoingContentStrategy) {
116         this.ioSession = Args.notNull(ioSession, "I/O session");
117         this.h1Config = h1Config != null ? h1Config : H1Config.DEFAULT;
118         final int bufferSize = this.h1Config.getBufferSize();
119         this.inbuf = new SessionInputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
120                 this.h1Config.getMaxLineLength(),
121                 CharCodingSupport.createDecoder(charCodingConfig));
122         this.outbuf = new SessionOutputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
123                 CharCodingSupport.createEncoder(charCodingConfig));
124         this.inTransportMetrics = new BasicHttpTransportMetrics();
125         this.outTransportMetrics = new BasicHttpTransportMetrics();
126         this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
127         this.incomingMessageParser = incomingMessageParser;
128         this.outgoingMessageWriter = outgoingMessageWriter;
129         this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
130                 DefaultContentLengthStrategy.INSTANCE;
131         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
132                 DefaultContentLengthStrategy.INSTANCE;
133         this.contentBuffer = ByteBuffer.allocate(this.h1Config.getBufferSize());
134         this.outputLock = new ReentrantLock();
135         this.outputRequests = new AtomicInteger(0);
136         this.connState = ConnectionState.READY;
137     }
138 
139     @Override
140     public String getId() {
141         return ioSession.getId();
142     }
143 
144     void shutdownSession(final CloseMode closeMode) {
145         if (closeMode == CloseMode.GRACEFUL) {
146             connState = ConnectionState.GRACEFUL_SHUTDOWN;
147             ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL);
148         } else {
149             connState = ConnectionState.SHUTDOWN;
150             ioSession.close();
151         }
152     }
153 
154     void shutdownSession(final Exception exception) {
155         connState = ConnectionState.SHUTDOWN;
156         try {
157             terminate(exception);
158         } finally {
159             ioSession.close();
160         }
161     }
162 
163     abstract void disconnected();
164 
165     abstract void terminate(final Exception exception);
166 
167     abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
168 
169     abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
170 
171     abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
172 
173     abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
174 
175     abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
176 
177     abstract ContentDecoder createContentDecoder(
178             long contentLength,
179             ReadableByteChannel channel,
180             SessionInputBuffer buffer,
181             BasicHttpTransportMetrics metrics) throws HttpException;
182 
183     abstract ContentEncoder createContentEncoder(
184             long contentLength,
185             WritableByteChannel channel,
186             SessionOutputBuffer buffer,
187             BasicHttpTransportMetrics metrics) throws HttpException;
188 
189     abstract int consumeData(ByteBuffer src) throws HttpException, IOException;
190 
191     abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
192 
193     abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
194 
195     abstract boolean isOutputReady();
196 
197     abstract void produceOutput() throws HttpException, IOException;
198 
199     abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
200 
201     abstract void inputEnd() throws HttpException, IOException;
202 
203     abstract void outputEnd() throws HttpException, IOException;
204 
205     abstract boolean inputIdle();
206 
207     abstract boolean outputIdle();
208 
209     abstract boolean handleTimeout();
210 
211     private void processCommands() throws HttpException, IOException {
212         for (;;) {
213             final Command command = ioSession.poll();
214             if (command == null) {
215                 return;
216             }
217             if (command instanceof ShutdownCommand) {
218                 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
219                 requestShutdown(shutdownCommand.getType());
220             } else if (command instanceof RequestExecutionCommand) {
221                 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
222                     command.cancel();
223                 } else {
224                     execute((RequestExecutionCommand) command);
225                     return;
226                 }
227             } else {
228                 throw new HttpException("Unexpected command: " + command.getClass());
229             }
230         }
231     }
232 
233     public final void onConnect(final ByteBuffer prefeed) throws HttpException, IOException {
234         if (prefeed != null) {
235             inbuf.put(prefeed);
236         }
237         connState = ConnectionState.ACTIVE;
238         processCommands();
239     }
240 
241     public final void onInput() throws HttpException, IOException {
242         while (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
243             int totalBytesRead = 0;
244             int messagesReceived = 0;
245             if (incomingMessage == null) {
246 
247                 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle()) {
248                     ioSession.clearEvent(SelectionKey.OP_READ);
249                     return;
250                 }
251 
252                 int bytesRead;
253                 do {
254                     bytesRead = inbuf.fill(ioSession.channel());
255                     if (bytesRead > 0) {
256                         totalBytesRead += bytesRead;
257                         inTransportMetrics.incrementBytesTransferred(bytesRead);
258                     }
259                     final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, bytesRead == -1);
260                     if (messageHead != null) {
261                         messagesReceived++;
262                         incomingMessageParser.reset();
263 
264                         this.version = messageHead.getVersion();
265 
266                         updateInputMetrics(messageHead, connMetrics);
267                         final ContentDecoder contentDecoder;
268                         if (handleIncomingMessage(messageHead)) {
269                             final long len = incomingContentStrategy.determineLength(messageHead);
270                             contentDecoder = createContentDecoder(len, ioSession.channel(), inbuf, inTransportMetrics);
271                             consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
272                         } else {
273                             consumeHeader(messageHead, null);
274                             contentDecoder = null;
275                         }
276                         if (contentDecoder != null) {
277                             incomingMessage = new Message<>(messageHead, contentDecoder);
278                             break;
279                         }
280                         inputEnd();
281                         if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
282                             ioSession.setEvent(SelectionKey.OP_READ);
283                         } else {
284                             break;
285                         }
286                     }
287                 } while (bytesRead > 0);
288 
289                 if (bytesRead == -1 && !inbuf.hasData()) {
290                     if (outputIdle() && inputIdle()) {
291                         requestShutdown(CloseMode.IMMEDIATE);
292                     } else {
293                         shutdownSession(new ConnectionClosedException("Connection closed by peer"));
294                     }
295                     return;
296                 }
297             }
298 
299             if (incomingMessage != null) {
300                 final ContentDecoder contentDecoder = incomingMessage.getBody();
301 
302                 int bytesRead;
303                 while ((bytesRead = contentDecoder.read(contentBuffer)) > 0) {
304                     if (bytesRead > 0) {
305                         totalBytesRead += bytesRead;
306                     }
307                     contentBuffer.flip();
308                     final int capacity = consumeData(contentBuffer);
309                     contentBuffer.clear();
310                     if (capacity <= 0) {
311                         if (!contentDecoder.isCompleted()) {
312                             ioSession.clearEvent(SelectionKey.OP_READ);
313                             updateCapacity(new CapacityChannel() {
314 
315                                 @Override
316                                 public void update(final int increment) throws IOException {
317                                     if (increment > 0) {
318                                         requestSessionInput();
319                                     }
320                                 }
321 
322                             });
323                         }
324                         break;
325                     }
326                 }
327                 if (contentDecoder.isCompleted()) {
328                     dataEnd(contentDecoder.getTrailers());
329                     incomingMessage = null;
330                     ioSession.setEvent(SelectionKey.OP_READ);
331                     inputEnd();
332                 }
333             }
334             if (totalBytesRead == 0 && messagesReceived == 0) {
335                 break;
336             }
337         }
338     }
339 
340     public final void onOutput() throws IOException, HttpException {
341         outputLock.lock();
342         try {
343             if (outbuf.hasData()) {
344                 final int bytesWritten = outbuf.flush(ioSession.channel());
345                 if (bytesWritten > 0) {
346                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
347                 }
348             }
349         } finally {
350             outputLock.unlock();
351         }
352         if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
353             if (isOutputReady()) {
354                 produceOutput();
355             } else {
356                 final int pendingOutputRequests = outputRequests.get();
357                 final boolean outputPending;
358                 outputLock.lock();
359                 try {
360                     outputPending = outbuf.hasData();
361                 } finally {
362                     outputLock.unlock();
363                 }
364                 if (!outputPending && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
365                     ioSession.clearEvent(SelectionKey.OP_WRITE);
366                 } else {
367                     outputRequests.addAndGet(-pendingOutputRequests);
368                 }
369             }
370 
371             outputLock.lock();
372             final boolean outputEnd;
373             try {
374                 outputEnd = outgoingMessage == null && !outbuf.hasData();
375             } finally {
376                 outputLock.unlock();
377             }
378             if (outputEnd) {
379                 outputEnd();
380                 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
381                     processCommands();
382                 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
383                     connState = ConnectionState.SHUTDOWN;
384                 }
385             }
386         }
387         if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
388             ioSession.close();
389         }
390     }
391 
392     public final void onTimeout(final int timeoutMillis) throws IOException, HttpException {
393         if (!handleTimeout()) {
394             onException(SocketTimeoutExceptionFactory.create(timeoutMillis));
395         }
396     }
397 
398     public final void onException(final Exception ex) {
399         shutdownSession(ex);
400         for (;;) {
401             final Command command = ioSession.poll();
402             if (command != null) {
403                 if (command instanceof RequestExecutionCommand) {
404                     final AsyncClientExchangeHandler exchangeHandler = ((RequestExecutionCommand) command).getExchangeHandler();
405                     exchangeHandler.failed(ex);
406                     exchangeHandler.releaseResources();
407                 } else {
408                     command.cancel();
409                 }
410             } else {
411                 break;
412             }
413         }
414     }
415 
416     public final void onDisconnect() {
417         disconnected();
418         for (;;) {
419             final Command command = ioSession.poll();
420             if (command != null) {
421                 if (command instanceof RequestExecutionCommand) {
422                     final AsyncClientExchangeHandler exchangeHandler = ((RequestExecutionCommand) command).getExchangeHandler();
423                     exchangeHandler.failed(new ConnectionClosedException());
424                     exchangeHandler.releaseResources();
425                 } else {
426                     command.cancel();
427                 }
428             } else {
429                 break;
430             }
431         }
432     }
433 
434     void requestShutdown(final CloseMode closeMode) {
435         switch (closeMode) {
436             case GRACEFUL:
437                 if (connState == ConnectionState.ACTIVE) {
438                     connState = ConnectionState.GRACEFUL_SHUTDOWN;
439                 }
440                 break;
441             case IMMEDIATE:
442                 connState = ConnectionState.SHUTDOWN;
443                 break;
444         }
445         ioSession.setEvent(SelectionKey.OP_WRITE);
446     }
447 
448     void commitMessageHead(final OutgoingMessage messageHead, final boolean endStream) throws HttpException, IOException {
449         outputLock.lock();
450         try {
451             outgoingMessageWriter.write(messageHead, outbuf);
452             updateOutputMetrics(messageHead, connMetrics);
453             if (!endStream) {
454                 final ContentEncoder contentEncoder;
455                 if (handleOutgoingMessage(messageHead)) {
456                     final long len = outgoingContentStrategy.determineLength(messageHead);
457                     contentEncoder = createContentEncoder(len, ioSession.channel(), outbuf, outTransportMetrics);
458                 } else {
459                     contentEncoder = null;
460                 }
461                 if (contentEncoder != null) {
462                     outgoingMessage = new Message<>(messageHead, contentEncoder);
463                 }
464             }
465             outgoingMessageWriter.reset();
466             ioSession.setEvent(EventMask.WRITE);
467         } finally {
468             outputLock.unlock();
469         }
470     }
471 
472     void requestSessionInput() {
473         ioSession.setEvent(SelectionKey.OP_READ);
474     }
475 
476     void suspendSessionInput() {
477         ioSession.clearEvent(SelectionKey.OP_READ);
478     }
479 
480     void requestSessionOutput() {
481         outputRequests.incrementAndGet();
482         ioSession.setEvent(SelectionKey.OP_WRITE);
483     }
484 
485     int getSessionTimeoutMillis() {
486         return ioSession.getSocketTimeoutMillis();
487     }
488 
489     void setSessionTimeoutMillis(final int timeout) {
490         ioSession.setSocketTimeoutMillis(timeout);
491     }
492 
493     void suspendSessionOutput() {
494         ioSession.clearEvent(SelectionKey.OP_WRITE);
495     }
496 
497     int streamOutput(final ByteBuffer src) throws IOException {
498         outputLock.lock();
499         try {
500             if (outgoingMessage == null) {
501                 throw new ClosedChannelException();
502             }
503             final ContentEncoder contentEncoder = outgoingMessage.getBody();
504             final int bytesWritten = contentEncoder.write(src);
505             if (bytesWritten > 0) {
506                 ioSession.setEvent(SelectionKey.OP_WRITE);
507             }
508             return bytesWritten;
509         } finally {
510             outputLock.unlock();
511         }
512     }
513 
514     enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
515 
516     MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
517         outputLock.lock();
518         try {
519             if (outgoingMessage == null) {
520                 return MessageDelineation.NONE;
521             }
522             final ContentEncoder contentEncoder = outgoingMessage.getBody();
523             contentEncoder.complete(trailers);
524             ioSession.setEvent(SelectionKey.OP_WRITE);
525             outgoingMessage = null;
526             return contentEncoder instanceof ChunkEncoder
527                             ? MessageDelineation.CHUNK_CODED
528                             : MessageDelineation.MESSAGE_HEAD;
529         } finally {
530             outputLock.unlock();
531         }
532     }
533 
534     boolean isOutputCompleted() {
535         outputLock.lock();
536         try {
537             if (outgoingMessage == null) {
538                 return true;
539             }
540             final ContentEncoder contentEncoder = outgoingMessage.getBody();
541             return contentEncoder.isCompleted();
542         } finally {
543             outputLock.unlock();
544         }
545     }
546 
547     @Override
548     public void close() throws IOException {
549         ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL);
550     }
551 
552     @Override
553     public void close(final CloseMode closeMode) {
554         ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
555     }
556 
557     @Override
558     public boolean isOpen() {
559         return connState == ConnectionState.ACTIVE;
560     }
561 
562     @Override
563     public void setSocketTimeoutMillis(final int timeout) {
564         ioSession.setSocketTimeoutMillis(timeout);
565     }
566 
567     @Override
568     public EndpointDetails getEndpointDetails() {
569         if (endpointDetails == null) {
570             endpointDetails = new BasicEndpointDetails(ioSession.getRemoteAddress(),
571                             ioSession.getLocalAddress(), connMetrics, ioSession.getSocketTimeoutMillis());
572         }
573         return endpointDetails;
574     }
575 
576     @Override
577     public int getSocketTimeoutMillis() {
578         return ioSession.getSocketTimeoutMillis();
579     }
580 
581     @Override
582     public ProtocolVersion getProtocolVersion() {
583         return version;
584     }
585 
586     @Override
587     public SocketAddress getRemoteAddress() {
588         return ioSession.getRemoteAddress();
589     }
590 
591     @Override
592     public SocketAddress getLocalAddress() {
593         return ioSession.getLocalAddress();
594     }
595 
596     @Override
597     public SSLSession getSSLSession() {
598         final TlsDetails tlsDetails = ioSession.getTlsDetails();
599         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
600     }
601 
602 }