View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.compat;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.nio.ByteBuffer;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.locks.ReentrantLock;
34  
35  import org.apache.hc.core5.annotation.Internal;
36  import org.apache.hc.core5.http.nio.CapacityChannel;
37  import org.apache.hc.core5.http.nio.support.classic.ContentInputBuffer;
38  import org.apache.hc.core5.util.Timeout;
39  
40  /**
41   * TODO: to be replaced by core functionality
42   */
43  @Internal
44  final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
45  
46      private final int initialBufferSize;
47      private final AtomicInteger capacityIncrement;
48  
49      private volatile CapacityChannel capacityChannel;
50  
51      public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
52          super(lock, initialBufferSize);
53          this.initialBufferSize = initialBufferSize;
54          this.capacityIncrement = new AtomicInteger(0);
55      }
56  
57      public SharedInputBuffer(final int bufferSize) {
58          this(new ReentrantLock(), bufferSize);
59      }
60  
61      public int fill(final ByteBuffer src) {
62          lock.lock();
63          try {
64              setInputMode();
65              ensureAdjustedCapacity(buffer().position() + src.remaining());
66              buffer().put(src);
67              final int remaining = buffer().remaining();
68              condition.signalAll();
69              return remaining;
70          } finally {
71              lock.unlock();
72          }
73      }
74  
75      private void incrementCapacity() throws IOException {
76          if (capacityChannel != null) {
77              final int increment = capacityIncrement.getAndSet(0);
78              if (increment > 0) {
79                  capacityChannel.update(increment);
80              }
81          }
82      }
83  
84      public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
85          lock.lock();
86          try {
87              this.capacityChannel = capacityChannel;
88              setInputMode();
89              if (buffer().position() == 0) {
90                  capacityChannel.update(initialBufferSize);
91              }
92          } finally {
93              lock.unlock();
94          }
95      }
96  
97      private void awaitInput(final Timeout timeout) throws InterruptedIOException {
98          if (!buffer().hasRemaining()) {
99              setInputMode();
100             while (buffer().position() == 0 && !endStream && !aborted) {
101                 try {
102                     if (timeout == null) {
103                         condition.await();
104                     } else {
105                         if (!condition.await(timeout.getDuration(), timeout.getTimeUnit())) {
106                             throw new InterruptedIOException("Timeout blocked waiting for input (" + timeout + ")");
107                         }
108                     }
109                 } catch (final InterruptedException ex) {
110                     Thread.currentThread().interrupt();
111                     throw new InterruptedIOException(ex.getMessage());
112                 }
113             }
114             setOutputMode();
115         }
116     }
117 
118     private void ensureNotAborted() throws InterruptedIOException {
119         if (aborted) {
120             throw new InterruptedIOException("Operation aborted");
121         }
122     }
123 
124     @Override
125     public int read() throws IOException {
126         return read(null);
127     }
128 
129     /**
130      * @since 5.4
131      */
132     public int read(final Timeout timeout) throws IOException {
133         lock.lock();
134         try {
135             setOutputMode();
136             awaitInput(timeout);
137             ensureNotAborted();
138             if (!buffer().hasRemaining() && endStream) {
139                 return -1;
140             }
141             final int b = buffer().get() & 0xff;
142             capacityIncrement.incrementAndGet();
143             if (!buffer().hasRemaining()) {
144                 incrementCapacity();
145             }
146             return b;
147         } finally {
148             lock.unlock();
149         }
150     }
151 
152     @Override
153     public int read(final byte[] b, final int off, final int len) throws IOException {
154         return read(b, off, len, null);
155     }
156 
157     /**
158      * @since 5.4
159      */
160     public int read(final byte[] b, final int off, final int len, final Timeout timeout) throws IOException {
161         if (len == 0) {
162             return 0;
163         }
164         lock.lock();
165         try {
166             setOutputMode();
167             awaitInput(timeout);
168             ensureNotAborted();
169             if (!buffer().hasRemaining() && endStream) {
170                 return -1;
171             }
172             final int chunk = Math.min(buffer().remaining(), len);
173             buffer().get(b, off, chunk);
174             capacityIncrement.addAndGet(chunk);
175             if (!buffer().hasRemaining()) {
176                 incrementCapacity();
177             }
178             return chunk;
179         } finally {
180             lock.unlock();
181         }
182     }
183 
184     public void markEndStream() {
185         if (endStream) {
186             return;
187         }
188         lock.lock();
189         try {
190             if (!endStream) {
191                 endStream = true;
192                 capacityChannel = null;
193                 condition.signalAll();
194             }
195         } finally {
196             lock.unlock();
197         }
198     }
199 
200 }