1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.compat;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.locks.ReentrantLock;
34
35 import org.apache.hc.core5.annotation.Internal;
36 import org.apache.hc.core5.http.nio.CapacityChannel;
37 import org.apache.hc.core5.http.nio.support.classic.ContentInputBuffer;
38 import org.apache.hc.core5.util.Timeout;
39
40
41
42
43 @Internal
44 final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
45
46 private final int initialBufferSize;
47 private final AtomicInteger capacityIncrement;
48
49 private volatile CapacityChannel capacityChannel;
50
51 public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
52 super(lock, initialBufferSize);
53 this.initialBufferSize = initialBufferSize;
54 this.capacityIncrement = new AtomicInteger(0);
55 }
56
57 public SharedInputBuffer(final int bufferSize) {
58 this(new ReentrantLock(), bufferSize);
59 }
60
61 public int fill(final ByteBuffer src) {
62 lock.lock();
63 try {
64 setInputMode();
65 ensureAdjustedCapacity(buffer().position() + src.remaining());
66 buffer().put(src);
67 final int remaining = buffer().remaining();
68 condition.signalAll();
69 return remaining;
70 } finally {
71 lock.unlock();
72 }
73 }
74
75 private void incrementCapacity() throws IOException {
76 if (capacityChannel != null) {
77 final int increment = capacityIncrement.getAndSet(0);
78 if (increment > 0) {
79 capacityChannel.update(increment);
80 }
81 }
82 }
83
84 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
85 lock.lock();
86 try {
87 this.capacityChannel = capacityChannel;
88 setInputMode();
89 if (buffer().position() == 0) {
90 capacityChannel.update(initialBufferSize);
91 }
92 } finally {
93 lock.unlock();
94 }
95 }
96
97 private void awaitInput(final Timeout timeout) throws InterruptedIOException {
98 if (!buffer().hasRemaining()) {
99 setInputMode();
100 while (buffer().position() == 0 && !endStream && !aborted) {
101 try {
102 if (timeout == null) {
103 condition.await();
104 } else {
105 if (!condition.await(timeout.getDuration(), timeout.getTimeUnit())) {
106 throw new InterruptedIOException("Timeout blocked waiting for input (" + timeout + ")");
107 }
108 }
109 } catch (final InterruptedException ex) {
110 Thread.currentThread().interrupt();
111 throw new InterruptedIOException(ex.getMessage());
112 }
113 }
114 setOutputMode();
115 }
116 }
117
118 private void ensureNotAborted() throws InterruptedIOException {
119 if (aborted) {
120 throw new InterruptedIOException("Operation aborted");
121 }
122 }
123
124 @Override
125 public int read() throws IOException {
126 return read(null);
127 }
128
129
130
131
132 public int read(final Timeout timeout) throws IOException {
133 lock.lock();
134 try {
135 setOutputMode();
136 awaitInput(timeout);
137 ensureNotAborted();
138 if (!buffer().hasRemaining() && endStream) {
139 return -1;
140 }
141 final int b = buffer().get() & 0xff;
142 capacityIncrement.incrementAndGet();
143 if (!buffer().hasRemaining()) {
144 incrementCapacity();
145 }
146 return b;
147 } finally {
148 lock.unlock();
149 }
150 }
151
152 @Override
153 public int read(final byte[] b, final int off, final int len) throws IOException {
154 return read(b, off, len, null);
155 }
156
157
158
159
160 public int read(final byte[] b, final int off, final int len, final Timeout timeout) throws IOException {
161 if (len == 0) {
162 return 0;
163 }
164 lock.lock();
165 try {
166 setOutputMode();
167 awaitInput(timeout);
168 ensureNotAborted();
169 if (!buffer().hasRemaining() && endStream) {
170 return -1;
171 }
172 final int chunk = Math.min(buffer().remaining(), len);
173 buffer().get(b, off, chunk);
174 capacityIncrement.addAndGet(chunk);
175 if (!buffer().hasRemaining()) {
176 incrementCapacity();
177 }
178 return chunk;
179 } finally {
180 lock.unlock();
181 }
182 }
183
184 public void markEndStream() {
185 if (endStream) {
186 return;
187 }
188 lock.lock();
189 try {
190 if (!endStream) {
191 endStream = true;
192 capacityChannel = null;
193 condition.signalAll();
194 }
195 } finally {
196 lock.unlock();
197 }
198 }
199
200 }