1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.util;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.locks.Condition;
32 import java.util.concurrent.locks.ReentrantLock;
33
34 import org.apache.http.annotation.ThreadSafe;
35 import org.apache.http.nio.ContentDecoder;
36 import org.apache.http.nio.IOControl;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 @ThreadSafe
55 public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
56
57 private final IOControl ioctrl;
58 private final ReentrantLock lock;
59 private final Condition condition;
60
61 private volatile boolean shutdown = false;
62 private volatile boolean endOfStream = false;
63
64 public SharedInputBuffer(int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
65 super(buffersize, allocator);
66 if (ioctrl == null) {
67 throw new IllegalArgumentException("I/O content control may not be null");
68 }
69 this.ioctrl = ioctrl;
70 this.lock = new ReentrantLock();
71 this.condition = this.lock.newCondition();
72 }
73
74 public void reset() {
75 if (this.shutdown) {
76 return;
77 }
78 this.lock.lock();
79 try {
80 clear();
81 this.endOfStream = false;
82 } finally {
83 this.lock.unlock();
84 }
85 }
86
87 public int consumeContent(final ContentDecoder decoder) throws IOException {
88 if (this.shutdown) {
89 return -1;
90 }
91 this.lock.lock();
92 try {
93 setInputMode();
94 int totalRead = 0;
95 int bytesRead;
96 while ((bytesRead = decoder.read(this.buffer)) > 0) {
97 totalRead += bytesRead;
98 }
99 if (bytesRead == -1 || decoder.isCompleted()) {
100 this.endOfStream = true;
101 }
102 if (!this.buffer.hasRemaining()) {
103 this.ioctrl.suspendInput();
104 }
105 this.condition.signalAll();
106
107 if (totalRead > 0) {
108 return totalRead;
109 } else {
110 if (this.endOfStream) {
111 return -1;
112 } else {
113 return 0;
114 }
115 }
116 } finally {
117 this.lock.unlock();
118 }
119 }
120
121 @Override
122 public boolean hasData() {
123 this.lock.lock();
124 try {
125 return super.hasData();
126 } finally {
127 this.lock.unlock();
128 }
129 }
130
131 @Override
132 public int available() {
133 this.lock.lock();
134 try {
135 return super.available();
136 } finally {
137 this.lock.unlock();
138 }
139 }
140
141 @Override
142 public int capacity() {
143 this.lock.lock();
144 try {
145 return super.capacity();
146 } finally {
147 this.lock.unlock();
148 }
149 }
150
151 @Override
152 public int length() {
153 this.lock.lock();
154 try {
155 return super.length();
156 } finally {
157 this.lock.unlock();
158 }
159 }
160
161 protected void waitForData() throws IOException {
162 this.lock.lock();
163 try {
164 try {
165 while (!super.hasData() && !this.endOfStream) {
166 if (this.shutdown) {
167 throw new InterruptedIOException("Input operation aborted");
168 }
169 this.ioctrl.requestInput();
170 this.condition.await();
171 }
172 } catch (InterruptedException ex) {
173 throw new IOException("Interrupted while waiting for more data");
174 }
175 } finally {
176 this.lock.unlock();
177 }
178 }
179
180 public void close() {
181 if (this.shutdown) {
182 return;
183 }
184 this.endOfStream = true;
185 this.lock.lock();
186 try {
187 this.condition.signalAll();
188 } finally {
189 this.lock.unlock();
190 }
191 }
192
193 public void shutdown() {
194 if (this.shutdown) {
195 return;
196 }
197 this.shutdown = true;
198 this.lock.lock();
199 try {
200 this.condition.signalAll();
201 } finally {
202 this.lock.unlock();
203 }
204 }
205
206 protected boolean isShutdown() {
207 return this.shutdown;
208 }
209
210 protected boolean isEndOfStream() {
211 return this.shutdown || (!hasData() && this.endOfStream);
212 }
213
214 public int read() throws IOException {
215 if (this.shutdown) {
216 return -1;
217 }
218 this.lock.lock();
219 try {
220 if (!hasData()) {
221 waitForData();
222 }
223 if (isEndOfStream()) {
224 return -1;
225 }
226 return this.buffer.get() & 0xff;
227 } finally {
228 this.lock.unlock();
229 }
230 }
231
232 public int read(final byte[] b, int off, int len) throws IOException {
233 if (this.shutdown) {
234 return -1;
235 }
236 if (b == null) {
237 return 0;
238 }
239 this.lock.lock();
240 try {
241 if (!hasData()) {
242 waitForData();
243 }
244 if (isEndOfStream()) {
245 return -1;
246 }
247 setOutputMode();
248 int chunk = len;
249 if (chunk > this.buffer.remaining()) {
250 chunk = this.buffer.remaining();
251 }
252 this.buffer.get(b, off, chunk);
253 return chunk;
254 } finally {
255 this.lock.unlock();
256 }
257 }
258
259 public int read(final byte[] b) throws IOException {
260 if (this.shutdown) {
261 return -1;
262 }
263 if (b == null) {
264 return 0;
265 }
266 return read(b, 0, b.length);
267 }
268
269 }