1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.util;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.locks.Condition;
32 import java.util.concurrent.locks.ReentrantLock;
33
34 import org.apache.http.annotation.ThreadSafe;
35 import org.apache.http.nio.ContentDecoder;
36 import org.apache.http.nio.IOControl;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 @ThreadSafe
55 public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
56
57 private final ReentrantLock lock;
58 private final Condition condition;
59
60 private volatile IOControl ioctrl;
61 private volatile boolean shutdown = false;
62 private volatile boolean endOfStream = false;
63
64
65
66
67 @Deprecated
68 public SharedInputBuffer(final int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
69 super(buffersize, allocator);
70 this.ioctrl = ioctrl;
71 this.lock = new ReentrantLock();
72 this.condition = this.lock.newCondition();
73 }
74
75
76
77
78 public SharedInputBuffer(final int buffersize, final ByteBufferAllocator allocator) {
79 super(buffersize, allocator);
80 this.lock = new ReentrantLock();
81 this.condition = this.lock.newCondition();
82 }
83
84
85
86
87 public SharedInputBuffer(final int buffersize) {
88 this(buffersize, HeapByteBufferAllocator.INSTANCE);
89 }
90
91 public void reset() {
92 if (this.shutdown) {
93 return;
94 }
95 this.lock.lock();
96 try {
97 clear();
98 this.endOfStream = false;
99 } finally {
100 this.lock.unlock();
101 }
102 }
103
104
105
106
107 @Deprecated
108 public int consumeContent(final ContentDecoder decoder) throws IOException {
109 return consumeContent(decoder, null);
110 }
111
112
113
114
115 public int consumeContent(final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
116 if (this.shutdown) {
117 return -1;
118 }
119 this.lock.lock();
120 try {
121 if (ioctrl != null) {
122 this.ioctrl = ioctrl;
123 }
124 setInputMode();
125 int totalRead = 0;
126 int bytesRead;
127 while ((bytesRead = decoder.read(this.buffer)) > 0) {
128 totalRead += bytesRead;
129 }
130 if (bytesRead == -1 || decoder.isCompleted()) {
131 this.endOfStream = true;
132 }
133 if (!this.buffer.hasRemaining()) {
134 if (this.ioctrl != null) {
135 this.ioctrl.suspendInput();
136 }
137 }
138 this.condition.signalAll();
139
140 if (totalRead > 0) {
141 return totalRead;
142 } else {
143 if (this.endOfStream) {
144 return -1;
145 } else {
146 return 0;
147 }
148 }
149 } finally {
150 this.lock.unlock();
151 }
152 }
153
154 @Override
155 public boolean hasData() {
156 this.lock.lock();
157 try {
158 return super.hasData();
159 } finally {
160 this.lock.unlock();
161 }
162 }
163
164 @Override
165 public int available() {
166 this.lock.lock();
167 try {
168 return super.available();
169 } finally {
170 this.lock.unlock();
171 }
172 }
173
174 @Override
175 public int capacity() {
176 this.lock.lock();
177 try {
178 return super.capacity();
179 } finally {
180 this.lock.unlock();
181 }
182 }
183
184 @Override
185 public int length() {
186 this.lock.lock();
187 try {
188 return super.length();
189 } finally {
190 this.lock.unlock();
191 }
192 }
193
194 protected void waitForData() throws IOException {
195 this.lock.lock();
196 try {
197 try {
198 while (!super.hasData() && !this.endOfStream) {
199 if (this.shutdown) {
200 throw new InterruptedIOException("Input operation aborted");
201 }
202 if (this.ioctrl != null) {
203 this.ioctrl.requestInput();
204 }
205 this.condition.await();
206 }
207 } catch (final InterruptedException ex) {
208 throw new IOException("Interrupted while waiting for more data");
209 }
210 } finally {
211 this.lock.unlock();
212 }
213 }
214
215 public void close() {
216 if (this.shutdown) {
217 return;
218 }
219 this.endOfStream = true;
220 this.lock.lock();
221 try {
222 this.condition.signalAll();
223 } finally {
224 this.lock.unlock();
225 }
226 }
227
228 public void shutdown() {
229 if (this.shutdown) {
230 return;
231 }
232 this.shutdown = true;
233 this.lock.lock();
234 try {
235 this.condition.signalAll();
236 } finally {
237 this.lock.unlock();
238 }
239 }
240
241 protected boolean isShutdown() {
242 return this.shutdown;
243 }
244
245 protected boolean isEndOfStream() {
246 return this.shutdown || (!hasData() && this.endOfStream);
247 }
248
249 public int read() throws IOException {
250 if (this.shutdown) {
251 return -1;
252 }
253 this.lock.lock();
254 try {
255 if (!hasData()) {
256 waitForData();
257 }
258 if (isEndOfStream()) {
259 return -1;
260 }
261 return this.buffer.get() & 0xff;
262 } finally {
263 this.lock.unlock();
264 }
265 }
266
267 public int read(final byte[] b, final int off, final int len) throws IOException {
268 if (this.shutdown) {
269 return -1;
270 }
271 if (b == null) {
272 return 0;
273 }
274 this.lock.lock();
275 try {
276 if (!hasData()) {
277 waitForData();
278 }
279 if (isEndOfStream()) {
280 return -1;
281 }
282 setOutputMode();
283 int chunk = len;
284 if (chunk > this.buffer.remaining()) {
285 chunk = this.buffer.remaining();
286 }
287 this.buffer.get(b, off, chunk);
288 return chunk;
289 } finally {
290 this.lock.unlock();
291 }
292 }
293
294 public int read(final byte[] b) throws IOException {
295 if (this.shutdown) {
296 return -1;
297 }
298 if (b == null) {
299 return 0;
300 }
301 return read(b, 0, b.length);
302 }
303
304 }