1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.util;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.locks.Condition;
32 import java.util.concurrent.locks.ReentrantLock;
33
34 import org.apache.http.annotation.ThreadSafe;
35 import org.apache.http.nio.ContentEncoder;
36 import org.apache.http.nio.IOControl;
37 import org.apache.http.util.Args;
38 import org.apache.http.util.Asserts;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 @ThreadSafe
57 public class SharedOutputBuffer extends ExpandableBuffer implements ContentOutputBuffer {
58
59 private final ReentrantLock lock;
60 private final Condition condition;
61
62 private volatile IOControl ioctrl;
63 private volatile boolean shutdown = false;
64 private volatile boolean endOfStream = false;
65
66
67
68
69 @Deprecated
70 public SharedOutputBuffer(final int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
71 super(buffersize, allocator);
72 Args.notNull(ioctrl, "I/O content control");
73 this.ioctrl = ioctrl;
74 this.lock = new ReentrantLock();
75 this.condition = this.lock.newCondition();
76 }
77
78
79
80
81 public SharedOutputBuffer(final int buffersize, final ByteBufferAllocator allocator) {
82 super(buffersize, allocator);
83 this.lock = new ReentrantLock();
84 this.condition = this.lock.newCondition();
85 }
86
87
88
89
90 public SharedOutputBuffer(final int buffersize) {
91 this(buffersize, HeapByteBufferAllocator.INSTANCE);
92 }
93
94 public void reset() {
95 if (this.shutdown) {
96 return;
97 }
98 this.lock.lock();
99 try {
100 clear();
101 this.endOfStream = false;
102 } finally {
103 this.lock.unlock();
104 }
105 }
106
107 @Override
108 public boolean hasData() {
109 this.lock.lock();
110 try {
111 return super.hasData();
112 } finally {
113 this.lock.unlock();
114 }
115 }
116
117 @Override
118 public int available() {
119 this.lock.lock();
120 try {
121 return super.available();
122 } finally {
123 this.lock.unlock();
124 }
125 }
126
127 @Override
128 public int capacity() {
129 this.lock.lock();
130 try {
131 return super.capacity();
132 } finally {
133 this.lock.unlock();
134 }
135 }
136
137 @Override
138 public int length() {
139 this.lock.lock();
140 try {
141 return super.length();
142 } finally {
143 this.lock.unlock();
144 }
145 }
146
147
148
149
150 @Deprecated
151 public int produceContent(final ContentEncoder encoder) throws IOException {
152 return produceContent(encoder, null);
153 }
154
155
156
157
158 public int produceContent(final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
159 if (this.shutdown) {
160 return -1;
161 }
162 this.lock.lock();
163 try {
164 if (ioctrl != null) {
165 this.ioctrl = ioctrl;
166 }
167 setOutputMode();
168 int bytesWritten = 0;
169 if (super.hasData()) {
170 bytesWritten = encoder.write(this.buffer);
171 if (encoder.isCompleted()) {
172 this.endOfStream = true;
173 }
174 }
175 if (!super.hasData()) {
176
177
178 if (this.endOfStream && !encoder.isCompleted()) {
179 encoder.complete();
180 }
181 if (!this.endOfStream) {
182
183 if (this.ioctrl != null) {
184 this.ioctrl.suspendOutput();
185 }
186 }
187 }
188 this.condition.signalAll();
189 return bytesWritten;
190 } finally {
191 this.lock.unlock();
192 }
193 }
194
195 public void close() {
196 shutdown();
197 }
198
199 public void shutdown() {
200 if (this.shutdown) {
201 return;
202 }
203 this.shutdown = true;
204 this.lock.lock();
205 try {
206 this.condition.signalAll();
207 } finally {
208 this.lock.unlock();
209 }
210 }
211
212 public void write(final byte[] b, int off, final int len) throws IOException {
213 if (b == null) {
214 return;
215 }
216 this.lock.lock();
217 try {
218 Asserts.check(!this.shutdown && !this.endOfStream, "Buffer already closed for writing");
219 setInputMode();
220 int remaining = len;
221 while (remaining > 0) {
222 if (!this.buffer.hasRemaining()) {
223 flushContent();
224 setInputMode();
225 }
226 final int chunk = Math.min(remaining, this.buffer.remaining());
227 this.buffer.put(b, off, chunk);
228 remaining -= chunk;
229 off += chunk;
230 }
231 } finally {
232 this.lock.unlock();
233 }
234 }
235
236 public void write(final byte[] b) throws IOException {
237 if (b == null) {
238 return;
239 }
240 write(b, 0, b.length);
241 }
242
243 public void write(final int b) throws IOException {
244 this.lock.lock();
245 try {
246 Asserts.check(!this.shutdown && !this.endOfStream, "Buffer already closed for writing");
247 setInputMode();
248 if (!this.buffer.hasRemaining()) {
249 flushContent();
250 setInputMode();
251 }
252 this.buffer.put((byte)b);
253 } finally {
254 this.lock.unlock();
255 }
256 }
257
258 public void flush() throws IOException {
259 }
260
261 private void flushContent() throws IOException {
262 this.lock.lock();
263 try {
264 try {
265 while (super.hasData()) {
266 if (this.shutdown) {
267 throw new InterruptedIOException("Output operation aborted");
268 }
269 if (this.ioctrl != null) {
270 this.ioctrl.requestOutput();
271 }
272 this.condition.await();
273 }
274 } catch (final InterruptedException ex) {
275 throw new IOException("Interrupted while flushing the content buffer");
276 }
277 } finally {
278 this.lock.unlock();
279 }
280 }
281
282 public void writeCompleted() throws IOException {
283 this.lock.lock();
284 try {
285 if (this.endOfStream) {
286 return;
287 }
288 this.endOfStream = true;
289 if (this.ioctrl != null) {
290 this.ioctrl.requestOutput();
291 }
292 } finally {
293 this.lock.unlock();
294 }
295 }
296
297 }