1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.ReadableByteChannel;
34 import java.nio.channels.SelectionKey;
35 import java.nio.channels.WritableByteChannel;
36 import java.util.List;
37 import java.util.concurrent.atomic.AtomicInteger;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import javax.net.ssl.SSLHandshakeException;
41 import javax.net.ssl.SSLSession;
42
43 import org.apache.hc.core5.http.ConnectionClosedException;
44 import org.apache.hc.core5.http.ContentLengthStrategy;
45 import org.apache.hc.core5.http.EndpointDetails;
46 import org.apache.hc.core5.http.EntityDetails;
47 import org.apache.hc.core5.http.Header;
48 import org.apache.hc.core5.http.HttpConnection;
49 import org.apache.hc.core5.http.HttpException;
50 import org.apache.hc.core5.http.HttpMessage;
51 import org.apache.hc.core5.http.Message;
52 import org.apache.hc.core5.http.ProtocolVersion;
53 import org.apache.hc.core5.http.config.CharCodingConfig;
54 import org.apache.hc.core5.http.config.Http1Config;
55 import org.apache.hc.core5.http.impl.BasicEndpointDetails;
56 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
57 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
58 import org.apache.hc.core5.http.impl.CharCodingSupport;
59 import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
60 import org.apache.hc.core5.http.impl.IncomingEntityDetails;
61 import org.apache.hc.core5.http.nio.CapacityChannel;
62 import org.apache.hc.core5.http.nio.ContentDecoder;
63 import org.apache.hc.core5.http.nio.ContentEncoder;
64 import org.apache.hc.core5.http.nio.NHttpMessageParser;
65 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
66 import org.apache.hc.core5.http.nio.SessionInputBuffer;
67 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
68 import org.apache.hc.core5.http.nio.command.CommandSupport;
69 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
70 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
71 import org.apache.hc.core5.io.CloseMode;
72 import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
73 import org.apache.hc.core5.reactor.Command;
74 import org.apache.hc.core5.reactor.EventMask;
75 import org.apache.hc.core5.reactor.IOSession;
76 import org.apache.hc.core5.reactor.ProtocolIOSession;
77 import org.apache.hc.core5.reactor.ssl.TlsDetails;
78 import org.apache.hc.core5.util.Args;
79 import org.apache.hc.core5.util.Identifiable;
80 import org.apache.hc.core5.util.Timeout;
81
82 abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
83 implements Identifiable, HttpConnection {
84
85 private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
86
87 private final ProtocolIOSession ioSession;
88 private final Http1Config http1Config;
89 private final SessionInputBufferImpl inbuf;
90 private final SessionOutputBufferImpl outbuf;
91 private final BasicHttpTransportMetrics inTransportMetrics;
92 private final BasicHttpTransportMetrics outTransportMetrics;
93 private final BasicHttpConnectionMetrics connMetrics;
94 private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
95 private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
96 private final ContentLengthStrategy incomingContentStrategy;
97 private final ContentLengthStrategy outgoingContentStrategy;
98 private final ByteBuffer contentBuffer;
99 private final AtomicInteger outputRequests;
100
101 private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
102 private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
103 private volatile ConnectionState connState;
104 private volatile CapacityWindow capacityWindow;
105
106 private volatile ProtocolVersion version;
107 private volatile EndpointDetails endpointDetails;
108
109 AbstractHttp1StreamDuplexer(
110 final ProtocolIOSession ioSession,
111 final Http1Config http1Config,
112 final CharCodingConfig charCodingConfig,
113 final NHttpMessageParser<IncomingMessage> incomingMessageParser,
114 final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
115 final ContentLengthStrategy incomingContentStrategy,
116 final ContentLengthStrategy outgoingContentStrategy) {
117 this.ioSession = Args.notNull(ioSession, "I/O session");
118 this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
119 final int bufferSize = this.http1Config.getBufferSize();
120 this.inbuf = new SessionInputBufferImpl(bufferSize, Math.min(bufferSize, 512),
121 this.http1Config.getMaxLineLength(),
122 CharCodingSupport.createDecoder(charCodingConfig));
123 this.outbuf = new SessionOutputBufferImpl(bufferSize, Math.min(bufferSize, 512),
124 CharCodingSupport.createEncoder(charCodingConfig));
125 this.inTransportMetrics = new BasicHttpTransportMetrics();
126 this.outTransportMetrics = new BasicHttpTransportMetrics();
127 this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
128 this.incomingMessageParser = incomingMessageParser;
129 this.outgoingMessageWriter = outgoingMessageWriter;
130 this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
131 DefaultContentLengthStrategy.INSTANCE;
132 this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
133 DefaultContentLengthStrategy.INSTANCE;
134 this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
135 this.outputRequests = new AtomicInteger(0);
136 this.connState = ConnectionState.READY;
137 }
138
139 @Override
140 public String getId() {
141 return ioSession.getId();
142 }
143
144 boolean isActive() {
145 return connState == ConnectionState.ACTIVE;
146 }
147
148 boolean isShuttingDown() {
149 return connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0;
150 }
151
152 void shutdownSession(final CloseMode closeMode) {
153 if (closeMode == CloseMode.GRACEFUL) {
154 connState = ConnectionState.GRACEFUL_SHUTDOWN;
155 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
156 } else {
157 connState = ConnectionState.SHUTDOWN;
158 ioSession.close();
159 }
160 }
161
162 void shutdownSession(final Exception cause) {
163 connState = ConnectionState.SHUTDOWN;
164 try {
165 terminate(cause);
166 } finally {
167 final CloseMode closeMode;
168 if (cause instanceof ConnectionClosedException) {
169 closeMode = CloseMode.GRACEFUL;
170 } else if (cause instanceof SSLHandshakeException) {
171 closeMode = CloseMode.GRACEFUL;
172 } else if (cause instanceof IOException) {
173 closeMode = CloseMode.IMMEDIATE;
174 } else {
175 closeMode = CloseMode.GRACEFUL;
176 }
177 ioSession.close(closeMode);
178 }
179 }
180
181 abstract void disconnected();
182
183 abstract void terminate(final Exception exception);
184
185 abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
186
187 abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
188
189 abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
190
191 abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
192
193 abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
194
195 abstract ContentDecoder createContentDecoder(
196 long contentLength,
197 ReadableByteChannel channel,
198 SessionInputBuffer buffer,
199 BasicHttpTransportMetrics metrics) throws HttpException;
200
201 abstract ContentEncoder createContentEncoder(
202 long contentLength,
203 WritableByteChannel channel,
204 SessionOutputBuffer buffer,
205 BasicHttpTransportMetrics metrics) throws HttpException;
206
207 abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
208
209 abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
210
211 abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
212
213 abstract boolean isOutputReady();
214
215 abstract boolean isRequestInitiated();
216
217 abstract void produceOutput() throws HttpException, IOException;
218
219 abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
220
221 abstract void inputEnd() throws HttpException, IOException;
222
223 abstract void outputEnd() throws HttpException, IOException;
224
225 abstract boolean inputIdle();
226
227 abstract boolean outputIdle();
228
229 abstract boolean handleTimeout();
230
231 private void processCommands() throws HttpException, IOException {
232 for (;;) {
233 final Command command = ioSession.poll();
234 if (command == null) {
235 return;
236 }
237 if (command instanceof ShutdownCommand) {
238 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
239 requestShutdown(shutdownCommand.getType());
240 } else if (command instanceof RequestExecutionCommand) {
241 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
242 command.cancel();
243 } else {
244 execute((RequestExecutionCommand) command);
245 return;
246 }
247 } else {
248 throw new HttpException("Unexpected command: " + command.getClass());
249 }
250 }
251 }
252
253 public final void onConnect() throws HttpException, IOException {
254 if (connState == ConnectionState.READY) {
255 connState = ConnectionState.ACTIVE;
256 processCommands();
257 }
258 }
259
260 IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
261 final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
262 if (messageHead != null) {
263 incomingMessageParser.reset();
264 }
265 return messageHead;
266 }
267
268 public final void onInput(final ByteBuffer src) throws HttpException, IOException {
269 if (src != null) {
270 final int n = src.remaining();
271 inbuf.put(src);
272 inTransportMetrics.incrementBytesTransferred(n);
273 }
274
275 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && !inbuf.hasData() && inputIdle()) {
276 ioSession.clearEvent(SelectionKey.OP_READ);
277 return;
278 }
279
280 boolean endOfStream = false;
281 if (incomingMessage == null) {
282 final int bytesRead = inbuf.fill(ioSession);
283 if (bytesRead > 0) {
284 inTransportMetrics.incrementBytesTransferred(bytesRead);
285 }
286 endOfStream = bytesRead == -1;
287 }
288
289 do {
290 if (incomingMessage == null) {
291
292 final IncomingMessage messageHead = parseMessageHead(endOfStream);
293 if (messageHead != null) {
294 this.version = messageHead.getVersion();
295
296 updateInputMetrics(messageHead, connMetrics);
297 final ContentDecoder contentDecoder;
298 if (handleIncomingMessage(messageHead)) {
299 final long len = incomingContentStrategy.determineLength(messageHead);
300 contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
301 consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
302 } else {
303 consumeHeader(messageHead, null);
304 contentDecoder = null;
305 }
306 capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
307 if (contentDecoder != null) {
308 incomingMessage = new Message<>(messageHead, contentDecoder);
309 } else {
310 inputEnd();
311 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
312 ioSession.setEvent(SelectionKey.OP_READ);
313 }
314 }
315 } else {
316 break;
317 }
318 }
319
320 if (incomingMessage != null) {
321 final ContentDecoder contentDecoder = incomingMessage.getBody();
322
323
324
325
326
327 final int bytesRead = contentDecoder.read(contentBuffer);
328 if (bytesRead > 0) {
329 contentBuffer.flip();
330 consumeData(contentBuffer);
331 contentBuffer.clear();
332 final int capacity = capacityWindow.removeCapacity(bytesRead);
333 if (capacity <= 0) {
334 if (!contentDecoder.isCompleted()) {
335 updateCapacity(capacityWindow);
336 }
337 }
338 }
339 if (contentDecoder.isCompleted()) {
340 dataEnd(contentDecoder.getTrailers());
341 capacityWindow.close();
342 incomingMessage = null;
343 ioSession.setEvent(SelectionKey.OP_READ);
344 inputEnd();
345 } else if (bytesRead == 0) {
346 break;
347 }
348 }
349 } while (inbuf.hasData());
350
351 if (endOfStream && !inbuf.hasData()) {
352 if (inputIdle()) {
353 requestShutdown(CloseMode.GRACEFUL);
354 } else {
355 shutdownSession(new ConnectionClosedException("Connection closed by peer"));
356 }
357 }
358 }
359
360 public final void onOutput() throws IOException, HttpException {
361 ioSession.getLock().lock();
362 try {
363 if (outbuf.hasData()) {
364 final int bytesWritten = outbuf.flush(ioSession);
365 if (bytesWritten > 0) {
366 outTransportMetrics.incrementBytesTransferred(bytesWritten);
367 }
368 }
369 } finally {
370 ioSession.getLock().unlock();
371 }
372 if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
373 final int pendingOutputRequests = outputRequests.get();
374 produceOutput();
375 final boolean outputPending = isOutputReady();
376 final boolean outputEnd;
377 ioSession.getLock().lock();
378 try {
379 if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
380 ioSession.clearEvent(SelectionKey.OP_WRITE);
381 } else {
382 outputRequests.addAndGet(-pendingOutputRequests);
383 }
384 outputEnd = outgoingMessage == null && !outbuf.hasData() && !isRequestInitiated();
385 } finally {
386 ioSession.getLock().unlock();
387 }
388 if (outputEnd) {
389 outputEnd();
390 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
391 processCommands();
392 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
393 connState = ConnectionState.SHUTDOWN;
394 }
395 }
396 }
397 if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
398 ioSession.close();
399 }
400 }
401
402 public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
403 if (!handleTimeout()) {
404 onException(SocketTimeoutExceptionFactory.create(timeout));
405 }
406 }
407
408 public final void onException(final Exception ex) {
409 shutdownSession(ex);
410 CommandSupport.failCommands(ioSession, ex);
411 }
412
413 public final void onDisconnect() {
414 disconnected();
415 CommandSupport.cancelCommands(ioSession);
416 }
417
418 void requestShutdown(final CloseMode closeMode) {
419 switch (closeMode) {
420 case GRACEFUL:
421 if (connState == ConnectionState.ACTIVE) {
422 connState = ConnectionState.GRACEFUL_SHUTDOWN;
423 }
424 break;
425 case IMMEDIATE:
426 connState = ConnectionState.SHUTDOWN;
427 break;
428 }
429 ioSession.setEvent(SelectionKey.OP_WRITE);
430 }
431
432 void commitMessageHead(
433 final OutgoingMessage messageHead,
434 final boolean endStream,
435 final FlushMode flushMode) throws HttpException, IOException {
436 ioSession.getLock().lock();
437 try {
438 outgoingMessageWriter.write(messageHead, outbuf);
439 updateOutputMetrics(messageHead, connMetrics);
440 if (!endStream) {
441 final ContentEncoder contentEncoder;
442 if (handleOutgoingMessage(messageHead)) {
443 final long len = outgoingContentStrategy.determineLength(messageHead);
444 contentEncoder = createContentEncoder(len, ioSession, outbuf, outTransportMetrics);
445 } else {
446 contentEncoder = null;
447 }
448 if (contentEncoder != null) {
449 outgoingMessage = new Message<>(messageHead, contentEncoder);
450 }
451 }
452 outgoingMessageWriter.reset();
453 if (flushMode == FlushMode.IMMEDIATE) {
454 final int bytesWritten = outbuf.flush(ioSession);
455 if (bytesWritten > 0) {
456 outTransportMetrics.incrementBytesTransferred(bytesWritten);
457 }
458 }
459 ioSession.setEvent(EventMask.WRITE);
460 } finally {
461 ioSession.getLock().unlock();
462 }
463 }
464
465 void requestSessionInput() {
466 ioSession.setEvent(SelectionKey.OP_READ);
467 }
468
469 void requestSessionOutput() {
470 outputRequests.incrementAndGet();
471 ioSession.setEvent(SelectionKey.OP_WRITE);
472 }
473
474 Timeout getSessionTimeout() {
475 return ioSession.getSocketTimeout();
476 }
477
478 void setSessionTimeout(final Timeout timeout) {
479 ioSession.setSocketTimeout(timeout);
480 }
481
482 void suspendSessionInput() {
483 ioSession.clearEvent(SelectionKey.OP_READ);
484 }
485
486 void suspendSessionOutput() throws IOException {
487 ioSession.getLock().lock();
488 try {
489 if (outbuf.hasData()) {
490 final int bytesWritten = outbuf.flush(ioSession);
491 if (bytesWritten > 0) {
492 outTransportMetrics.incrementBytesTransferred(bytesWritten);
493 }
494 } else {
495 ioSession.clearEvent(SelectionKey.OP_WRITE);
496 }
497 } finally {
498 ioSession.getLock().unlock();
499 }
500 }
501
502 int streamOutput(final ByteBuffer src) throws IOException {
503 ioSession.getLock().lock();
504 try {
505 if (outgoingMessage == null) {
506 throw new ConnectionClosedException();
507 }
508 final ContentEncoder contentEncoder = outgoingMessage.getBody();
509 final int bytesWritten = contentEncoder.write(src);
510 if (bytesWritten > 0) {
511 ioSession.setEvent(SelectionKey.OP_WRITE);
512 }
513 return bytesWritten;
514 } finally {
515 ioSession.getLock().unlock();
516 }
517 }
518
519 enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
520
521 MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
522 ioSession.getLock().lock();
523 try {
524 if (outgoingMessage == null) {
525 return MessageDelineation.NONE;
526 }
527 final ContentEncoder contentEncoder = outgoingMessage.getBody();
528 contentEncoder.complete(trailers);
529 ioSession.setEvent(SelectionKey.OP_WRITE);
530 outgoingMessage = null;
531 return contentEncoder instanceof ChunkEncoder
532 ? MessageDelineation.CHUNK_CODED
533 : MessageDelineation.MESSAGE_HEAD;
534 } finally {
535 ioSession.getLock().unlock();
536 }
537 }
538
539 boolean isOutputCompleted() {
540 ioSession.getLock().lock();
541 try {
542 if (outgoingMessage == null) {
543 return true;
544 }
545 final ContentEncoder contentEncoder = outgoingMessage.getBody();
546 return contentEncoder.isCompleted();
547 } finally {
548 ioSession.getLock().unlock();
549 }
550 }
551
552 @Override
553 public void close() throws IOException {
554 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
555 }
556
557 @Override
558 public void close(final CloseMode closeMode) {
559 ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
560 }
561
562 @Override
563 public boolean isOpen() {
564 return connState.compareTo(ConnectionState.ACTIVE) <= 0;
565 }
566
567 @Override
568 public Timeout getSocketTimeout() {
569 return ioSession.getSocketTimeout();
570 }
571
572 @Override
573 public void setSocketTimeout(final Timeout timeout) {
574 ioSession.setSocketTimeout(timeout);
575 }
576
577 @Override
578 public EndpointDetails getEndpointDetails() {
579 if (endpointDetails == null) {
580 endpointDetails = new BasicEndpointDetails(
581 ioSession.getRemoteAddress(),
582 ioSession.getLocalAddress(),
583 connMetrics,
584 ioSession.getSocketTimeout());
585 }
586 return endpointDetails;
587 }
588
589 @Override
590 public ProtocolVersion getProtocolVersion() {
591 return version;
592 }
593
594 @Override
595 public SocketAddress getRemoteAddress() {
596 return ioSession.getRemoteAddress();
597 }
598
599 @Override
600 public SocketAddress getLocalAddress() {
601 return ioSession.getLocalAddress();
602 }
603
604 @Override
605 public SSLSession getSSLSession() {
606 final TlsDetails tlsDetails = ioSession.getTlsDetails();
607 return tlsDetails != null ? tlsDetails.getSSLSession() : null;
608 }
609
610 void appendState(final StringBuilder buf) {
611 buf.append("connState=").append(connState)
612 .append(", inbuf=").append(inbuf)
613 .append(", outbuf=").append(outbuf)
614 .append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
615 }
616
617 static class CapacityWindow implements CapacityChannel {
618 private final IOSession ioSession;
619 private final ReentrantLock lock;
620 private int window;
621 private boolean closed;
622
623 CapacityWindow(final int window, final IOSession ioSession) {
624 this.window = window;
625 this.ioSession = ioSession;
626 this.lock = new ReentrantLock();
627 }
628
629 @Override
630 public void update(final int increment) throws IOException {
631 lock.lock();
632 try {
633 if (closed) {
634 return;
635 }
636 if (increment > 0) {
637 updateWindow(increment);
638 ioSession.setEvent(SelectionKey.OP_READ);
639 }
640 } finally {
641 lock.unlock();
642 }
643 }
644
645
646
647
648
649 int removeCapacity(final int delta) {
650 lock.lock();
651 try {
652 updateWindow(-delta);
653 if (window <= 0) {
654 ioSession.clearEvent(SelectionKey.OP_READ);
655 }
656 return window;
657 } finally {
658 lock.unlock();
659 }
660 }
661
662 private void updateWindow(final int delta) {
663 int newValue = window + delta;
664
665 if (((window ^ newValue) & (delta ^ newValue)) < 0) {
666 newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
667 }
668 window = newValue;
669 }
670
671
672
673
674
675 void close() {
676 lock.lock();
677 try {
678 closed = true;
679 } finally {
680 lock.unlock();
681 }
682 }
683
684
685 int getWindow() {
686 return window;
687 }
688 }
689 }