View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ReadableByteChannel;
34  import java.nio.channels.SelectionKey;
35  import java.nio.channels.WritableByteChannel;
36  import java.util.List;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import javax.net.ssl.SSLHandshakeException;
41  import javax.net.ssl.SSLSession;
42  
43  import org.apache.hc.core5.http.ConnectionClosedException;
44  import org.apache.hc.core5.http.ContentLengthStrategy;
45  import org.apache.hc.core5.http.EndpointDetails;
46  import org.apache.hc.core5.http.EntityDetails;
47  import org.apache.hc.core5.http.Header;
48  import org.apache.hc.core5.http.HttpConnection;
49  import org.apache.hc.core5.http.HttpException;
50  import org.apache.hc.core5.http.HttpMessage;
51  import org.apache.hc.core5.http.Message;
52  import org.apache.hc.core5.http.ProtocolVersion;
53  import org.apache.hc.core5.http.config.CharCodingConfig;
54  import org.apache.hc.core5.http.config.Http1Config;
55  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
56  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
57  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
58  import org.apache.hc.core5.http.impl.CharCodingSupport;
59  import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
60  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
61  import org.apache.hc.core5.http.nio.CapacityChannel;
62  import org.apache.hc.core5.http.nio.ContentDecoder;
63  import org.apache.hc.core5.http.nio.ContentEncoder;
64  import org.apache.hc.core5.http.nio.NHttpMessageParser;
65  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
66  import org.apache.hc.core5.http.nio.SessionInputBuffer;
67  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
68  import org.apache.hc.core5.http.nio.command.CommandSupport;
69  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
70  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
71  import org.apache.hc.core5.io.CloseMode;
72  import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
73  import org.apache.hc.core5.reactor.Command;
74  import org.apache.hc.core5.reactor.EventMask;
75  import org.apache.hc.core5.reactor.IOSession;
76  import org.apache.hc.core5.reactor.ProtocolIOSession;
77  import org.apache.hc.core5.reactor.ssl.TlsDetails;
78  import org.apache.hc.core5.util.Args;
79  import org.apache.hc.core5.util.Identifiable;
80  import org.apache.hc.core5.util.Timeout;
81  
82  abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
83          implements Identifiable, HttpConnection {
84  
85      private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
86  
87      private final ProtocolIOSession ioSession;
88      private final Http1Config http1Config;
89      private final SessionInputBufferImpl inbuf;
90      private final SessionOutputBufferImpl outbuf;
91      private final BasicHttpTransportMetrics inTransportMetrics;
92      private final BasicHttpTransportMetrics outTransportMetrics;
93      private final BasicHttpConnectionMetrics connMetrics;
94      private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
95      private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
96      private final ContentLengthStrategy incomingContentStrategy;
97      private final ContentLengthStrategy outgoingContentStrategy;
98      private final ByteBuffer contentBuffer;
99      private final AtomicInteger outputRequests;
100 
101     private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
102     private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
103     private volatile ConnectionState connState;
104     private volatile CapacityWindow capacityWindow;
105 
106     private volatile ProtocolVersion version;
107     private volatile EndpointDetails endpointDetails;
108 
109     AbstractHttp1StreamDuplexer(
110             final ProtocolIOSession ioSession,
111             final Http1Config http1Config,
112             final CharCodingConfig charCodingConfig,
113             final NHttpMessageParser<IncomingMessage> incomingMessageParser,
114             final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
115             final ContentLengthStrategy incomingContentStrategy,
116             final ContentLengthStrategy outgoingContentStrategy) {
117         this.ioSession = Args.notNull(ioSession, "I/O session");
118         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
119         final int bufferSize = this.http1Config.getBufferSize();
120         this.inbuf = new SessionInputBufferImpl(bufferSize, Math.min(bufferSize, 512),
121                 this.http1Config.getMaxLineLength(),
122                 CharCodingSupport.createDecoder(charCodingConfig));
123         this.outbuf = new SessionOutputBufferImpl(bufferSize, Math.min(bufferSize, 512),
124                 CharCodingSupport.createEncoder(charCodingConfig));
125         this.inTransportMetrics = new BasicHttpTransportMetrics();
126         this.outTransportMetrics = new BasicHttpTransportMetrics();
127         this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
128         this.incomingMessageParser = incomingMessageParser;
129         this.outgoingMessageWriter = outgoingMessageWriter;
130         this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
131                 DefaultContentLengthStrategy.INSTANCE;
132         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
133                 DefaultContentLengthStrategy.INSTANCE;
134         this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
135         this.outputRequests = new AtomicInteger(0);
136         this.connState = ConnectionState.READY;
137     }
138 
139     @Override
140     public String getId() {
141         return ioSession.getId();
142     }
143 
144     boolean isActive() {
145         return connState == ConnectionState.ACTIVE;
146     }
147 
148     boolean isShuttingDown() {
149         return connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0;
150     }
151 
152     void shutdownSession(final CloseMode closeMode) {
153         if (closeMode == CloseMode.GRACEFUL) {
154             connState = ConnectionState.GRACEFUL_SHUTDOWN;
155             ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
156         } else {
157             connState = ConnectionState.SHUTDOWN;
158             ioSession.close();
159         }
160     }
161 
162     void shutdownSession(final Exception cause) {
163         connState = ConnectionState.SHUTDOWN;
164         try {
165             terminate(cause);
166         } finally {
167             final CloseMode closeMode;
168             if (cause instanceof ConnectionClosedException) {
169                 closeMode = CloseMode.GRACEFUL;
170             } else if (cause instanceof SSLHandshakeException) {
171                 closeMode = CloseMode.GRACEFUL;
172             } else if (cause instanceof IOException) {
173                 closeMode = CloseMode.IMMEDIATE;
174             } else {
175                 closeMode = CloseMode.GRACEFUL;
176             }
177             ioSession.close(closeMode);
178         }
179     }
180 
181     abstract void disconnected();
182 
183     abstract void terminate(final Exception exception);
184 
185     abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
186 
187     abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
188 
189     abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
190 
191     abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
192 
193     abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
194 
195     abstract ContentDecoder createContentDecoder(
196             long contentLength,
197             ReadableByteChannel channel,
198             SessionInputBuffer buffer,
199             BasicHttpTransportMetrics metrics) throws HttpException;
200 
201     abstract ContentEncoder createContentEncoder(
202             long contentLength,
203             WritableByteChannel channel,
204             SessionOutputBuffer buffer,
205             BasicHttpTransportMetrics metrics) throws HttpException;
206 
207     abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
208 
209     abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
210 
211     abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
212 
213     abstract boolean isOutputReady();
214 
215     abstract boolean isRequestInitiated();
216 
217     abstract void produceOutput() throws HttpException, IOException;
218 
219     abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
220 
221     abstract void inputEnd() throws HttpException, IOException;
222 
223     abstract void outputEnd() throws HttpException, IOException;
224 
225     abstract boolean inputIdle();
226 
227     abstract boolean outputIdle();
228 
229     abstract boolean handleTimeout();
230 
231     private void processCommands() throws HttpException, IOException {
232         for (;;) {
233             final Command command = ioSession.poll();
234             if (command == null) {
235                 return;
236             }
237             if (command instanceof ShutdownCommand) {
238                 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
239                 requestShutdown(shutdownCommand.getType());
240             } else if (command instanceof RequestExecutionCommand) {
241                 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
242                     command.cancel();
243                 } else {
244                     execute((RequestExecutionCommand) command);
245                     return;
246                 }
247             } else {
248                 throw new HttpException("Unexpected command: " + command.getClass());
249             }
250         }
251     }
252 
253     public final void onConnect() throws HttpException, IOException {
254         if (connState == ConnectionState.READY) {
255             connState = ConnectionState.ACTIVE;
256             processCommands();
257         }
258     }
259 
260     IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
261         final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
262         if (messageHead != null) {
263             incomingMessageParser.reset();
264         }
265         return messageHead;
266     }
267 
268     public final void onInput(final ByteBuffer src) throws HttpException, IOException {
269         if (src != null) {
270             final int n = src.remaining();
271             inbuf.put(src);
272             inTransportMetrics.incrementBytesTransferred(n);
273         }
274 
275         if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && !inbuf.hasData() && inputIdle()) {
276             ioSession.clearEvent(SelectionKey.OP_READ);
277             return;
278         }
279 
280         boolean endOfStream = false;
281         if (incomingMessage == null) {
282             final int bytesRead = inbuf.fill(ioSession);
283             if (bytesRead > 0) {
284                 inTransportMetrics.incrementBytesTransferred(bytesRead);
285             }
286             endOfStream = bytesRead == -1;
287         }
288 
289         do {
290             if (incomingMessage == null) {
291 
292                 final IncomingMessage messageHead = parseMessageHead(endOfStream);
293                 if (messageHead != null) {
294                     this.version = messageHead.getVersion();
295 
296                     updateInputMetrics(messageHead, connMetrics);
297                     final ContentDecoder contentDecoder;
298                     if (handleIncomingMessage(messageHead)) {
299                         final long len = incomingContentStrategy.determineLength(messageHead);
300                         contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
301                         consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
302                     } else {
303                         consumeHeader(messageHead, null);
304                         contentDecoder = null;
305                     }
306                     capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
307                     if (contentDecoder != null) {
308                         incomingMessage = new Message<>(messageHead, contentDecoder);
309                     } else {
310                         inputEnd();
311                         if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
312                             ioSession.setEvent(SelectionKey.OP_READ);
313                         }
314                     }
315                 } else {
316                     break;
317                 }
318             }
319 
320             if (incomingMessage != null) {
321                 final ContentDecoder contentDecoder = incomingMessage.getBody();
322 
323                 // At present the consumer can be forced to consume data
324                 // over its declared capacity in order to avoid having
325                 // unprocessed message body content stuck in the session
326                 // input buffer
327                 final int bytesRead = contentDecoder.read(contentBuffer);
328                 if (bytesRead > 0) {
329                     contentBuffer.flip();
330                     consumeData(contentBuffer);
331                     contentBuffer.clear();
332                     final int capacity = capacityWindow.removeCapacity(bytesRead);
333                     if (capacity <= 0) {
334                         if (!contentDecoder.isCompleted()) {
335                             updateCapacity(capacityWindow);
336                         }
337                     }
338                 }
339                 if (contentDecoder.isCompleted()) {
340                     dataEnd(contentDecoder.getTrailers());
341                     capacityWindow.close();
342                     incomingMessage = null;
343                     ioSession.setEvent(SelectionKey.OP_READ);
344                     inputEnd();
345                 } else if (bytesRead == 0) {
346                     break;
347                 }
348             }
349         } while (inbuf.hasData());
350 
351         if (endOfStream && !inbuf.hasData()) {
352             if (inputIdle()) {
353                 requestShutdown(CloseMode.GRACEFUL);
354             } else {
355                 shutdownSession(new ConnectionClosedException("Connection closed by peer"));
356             }
357         }
358     }
359 
360     public final void onOutput() throws IOException, HttpException {
361         ioSession.getLock().lock();
362         try {
363             if (outbuf.hasData()) {
364                 final int bytesWritten = outbuf.flush(ioSession);
365                 if (bytesWritten > 0) {
366                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
367                 }
368             }
369         } finally {
370             ioSession.getLock().unlock();
371         }
372         if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
373             final int pendingOutputRequests = outputRequests.get();
374             produceOutput();
375             final boolean outputPending = isOutputReady();
376             final boolean outputEnd;
377             ioSession.getLock().lock();
378             try {
379                 if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
380                     ioSession.clearEvent(SelectionKey.OP_WRITE);
381                 } else {
382                     outputRequests.addAndGet(-pendingOutputRequests);
383                 }
384                 outputEnd = outgoingMessage == null && !outbuf.hasData() && !isRequestInitiated();
385             } finally {
386                 ioSession.getLock().unlock();
387             }
388             if (outputEnd) {
389                 outputEnd();
390                 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
391                     processCommands();
392                 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
393                     connState = ConnectionState.SHUTDOWN;
394                 }
395             }
396         }
397         if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
398             ioSession.close();
399         }
400     }
401 
402     public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
403         if (!handleTimeout()) {
404             onException(SocketTimeoutExceptionFactory.create(timeout));
405         }
406     }
407 
408     public final void onException(final Exception ex) {
409         shutdownSession(ex);
410         CommandSupport.failCommands(ioSession, ex);
411     }
412 
413     public final void onDisconnect() {
414         disconnected();
415         CommandSupport.cancelCommands(ioSession);
416     }
417 
418     void requestShutdown(final CloseMode closeMode) {
419         switch (closeMode) {
420             case GRACEFUL:
421                 if (connState == ConnectionState.ACTIVE) {
422                     connState = ConnectionState.GRACEFUL_SHUTDOWN;
423                 }
424                 break;
425             case IMMEDIATE:
426                 connState = ConnectionState.SHUTDOWN;
427                 break;
428         }
429         ioSession.setEvent(SelectionKey.OP_WRITE);
430     }
431 
432     void commitMessageHead(
433             final OutgoingMessage messageHead,
434             final boolean endStream,
435             final FlushMode flushMode) throws HttpException, IOException {
436         ioSession.getLock().lock();
437         try {
438             outgoingMessageWriter.write(messageHead, outbuf);
439             updateOutputMetrics(messageHead, connMetrics);
440             if (!endStream) {
441                 final ContentEncoder contentEncoder;
442                 if (handleOutgoingMessage(messageHead)) {
443                     final long len = outgoingContentStrategy.determineLength(messageHead);
444                     contentEncoder = createContentEncoder(len, ioSession, outbuf, outTransportMetrics);
445                 } else {
446                     contentEncoder = null;
447                 }
448                 if (contentEncoder != null) {
449                     outgoingMessage = new Message<>(messageHead, contentEncoder);
450                 }
451             }
452             outgoingMessageWriter.reset();
453             if (flushMode == FlushMode.IMMEDIATE) {
454                 final int bytesWritten = outbuf.flush(ioSession);
455                 if (bytesWritten > 0) {
456                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
457                 }
458             }
459             ioSession.setEvent(EventMask.WRITE);
460         } finally {
461             ioSession.getLock().unlock();
462         }
463     }
464 
465     void requestSessionInput() {
466         ioSession.setEvent(SelectionKey.OP_READ);
467     }
468 
469     void requestSessionOutput() {
470         outputRequests.incrementAndGet();
471         ioSession.setEvent(SelectionKey.OP_WRITE);
472     }
473 
474     Timeout getSessionTimeout() {
475         return ioSession.getSocketTimeout();
476     }
477 
478     void setSessionTimeout(final Timeout timeout) {
479         ioSession.setSocketTimeout(timeout);
480     }
481 
482     void suspendSessionInput() {
483         ioSession.clearEvent(SelectionKey.OP_READ);
484     }
485 
486     void suspendSessionOutput() throws IOException {
487         ioSession.getLock().lock();
488         try {
489             if (outbuf.hasData()) {
490                 final int bytesWritten = outbuf.flush(ioSession);
491                 if (bytesWritten > 0) {
492                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
493                 }
494             } else {
495                 ioSession.clearEvent(SelectionKey.OP_WRITE);
496             }
497         } finally {
498             ioSession.getLock().unlock();
499         }
500     }
501 
502     int streamOutput(final ByteBuffer src) throws IOException {
503         ioSession.getLock().lock();
504         try {
505             if (outgoingMessage == null) {
506                 throw new ConnectionClosedException();
507             }
508             final ContentEncoder contentEncoder = outgoingMessage.getBody();
509             final int bytesWritten = contentEncoder.write(src);
510             if (bytesWritten > 0) {
511                 ioSession.setEvent(SelectionKey.OP_WRITE);
512             }
513             return bytesWritten;
514         } finally {
515             ioSession.getLock().unlock();
516         }
517     }
518 
519     enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
520 
521     MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
522         ioSession.getLock().lock();
523         try {
524             if (outgoingMessage == null) {
525                 return MessageDelineation.NONE;
526             }
527             final ContentEncoder contentEncoder = outgoingMessage.getBody();
528             contentEncoder.complete(trailers);
529             ioSession.setEvent(SelectionKey.OP_WRITE);
530             outgoingMessage = null;
531             return contentEncoder instanceof ChunkEncoder
532                             ? MessageDelineation.CHUNK_CODED
533                             : MessageDelineation.MESSAGE_HEAD;
534         } finally {
535             ioSession.getLock().unlock();
536         }
537     }
538 
539     boolean isOutputCompleted() {
540         ioSession.getLock().lock();
541         try {
542             if (outgoingMessage == null) {
543                 return true;
544             }
545             final ContentEncoder contentEncoder = outgoingMessage.getBody();
546             return contentEncoder.isCompleted();
547         } finally {
548             ioSession.getLock().unlock();
549         }
550     }
551 
552     @Override
553     public void close() throws IOException {
554         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
555     }
556 
557     @Override
558     public void close(final CloseMode closeMode) {
559         ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
560     }
561 
562     @Override
563     public boolean isOpen() {
564         return connState.compareTo(ConnectionState.ACTIVE) <= 0;
565     }
566 
567     @Override
568     public Timeout getSocketTimeout() {
569         return ioSession.getSocketTimeout();
570     }
571 
572     @Override
573     public void setSocketTimeout(final Timeout timeout) {
574         ioSession.setSocketTimeout(timeout);
575     }
576 
577     @Override
578     public EndpointDetails getEndpointDetails() {
579         if (endpointDetails == null) {
580             endpointDetails = new BasicEndpointDetails(
581                     ioSession.getRemoteAddress(),
582                     ioSession.getLocalAddress(),
583                     connMetrics,
584                     ioSession.getSocketTimeout());
585         }
586         return endpointDetails;
587     }
588 
589     @Override
590     public ProtocolVersion getProtocolVersion() {
591         return version;
592     }
593 
594     @Override
595     public SocketAddress getRemoteAddress() {
596         return ioSession.getRemoteAddress();
597     }
598 
599     @Override
600     public SocketAddress getLocalAddress() {
601         return ioSession.getLocalAddress();
602     }
603 
604     @Override
605     public SSLSession getSSLSession() {
606         final TlsDetails tlsDetails = ioSession.getTlsDetails();
607         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
608     }
609 
610     void appendState(final StringBuilder buf) {
611         buf.append("connState=").append(connState)
612                 .append(", inbuf=").append(inbuf)
613                 .append(", outbuf=").append(outbuf)
614                 .append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
615     }
616 
617     static class CapacityWindow implements CapacityChannel {
618         private final IOSession ioSession;
619         private final ReentrantLock lock;
620         private int window;
621         private boolean closed;
622 
623         CapacityWindow(final int window, final IOSession ioSession) {
624             this.window = window;
625             this.ioSession = ioSession;
626             this.lock = new ReentrantLock();
627         }
628 
629         @Override
630         public void update(final int increment) throws IOException {
631             lock.lock();
632             try {
633                 if (closed) {
634                     return;
635                 }
636                 if (increment > 0) {
637                     updateWindow(increment);
638                     ioSession.setEvent(SelectionKey.OP_READ);
639                 }
640             } finally {
641                 lock.unlock();
642             }
643         }
644 
645         /**
646          * Internal method for removing capacity. We don't need to check
647          * if this channel is closed in it.
648          */
649         int removeCapacity(final int delta) {
650             lock.lock();
651             try {
652                 updateWindow(-delta);
653                 if (window <= 0) {
654                     ioSession.clearEvent(SelectionKey.OP_READ);
655                 }
656                 return window;
657             } finally {
658                 lock.unlock();
659             }
660         }
661 
662         private void updateWindow(final int delta) {
663             int newValue = window + delta;
664             // Math.addExact
665             if (((window ^ newValue) & (delta ^ newValue)) < 0) {
666                 newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
667             }
668             window = newValue;
669         }
670 
671         /**
672          * Closes the capacity channel, preventing user code from accidentally requesting
673          * read events outside of the context of the request the channel was created for
674          */
675         void close() {
676             lock.lock();
677             try {
678                 closed = true;
679             } finally {
680                 lock.unlock();
681             }
682         }
683 
684         // visible for testing
685         int getWindow() {
686             return window;
687         }
688     }
689 }