1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.util;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.locks.Condition;
32 import java.util.concurrent.locks.ReentrantLock;
33
34 import org.apache.http.annotation.ThreadSafe;
35 import org.apache.http.nio.ContentEncoder;
36 import org.apache.http.nio.IOControl;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 @ThreadSafe
55 public class SharedOutputBuffer extends ExpandableBuffer implements ContentOutputBuffer {
56
57 private final IOControl ioctrl;
58 private final ReentrantLock lock;
59 private final Condition condition;
60
61 private volatile boolean shutdown = false;
62 private volatile boolean endOfStream = false;
63
64 public SharedOutputBuffer(int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
65 super(buffersize, allocator);
66 if (ioctrl == null) {
67 throw new IllegalArgumentException("I/O content control may not be null");
68 }
69 this.ioctrl = ioctrl;
70 this.lock = new ReentrantLock();
71 this.condition = this.lock.newCondition();
72 }
73
74 public void reset() {
75 if (this.shutdown) {
76 return;
77 }
78 this.lock.lock();
79 try {
80 clear();
81 this.endOfStream = false;
82 } finally {
83 this.lock.unlock();
84 }
85 }
86
87 @Override
88 public boolean hasData() {
89 this.lock.lock();
90 try {
91 return super.hasData();
92 } finally {
93 this.lock.unlock();
94 }
95 }
96
97 @Override
98 public int available() {
99 this.lock.lock();
100 try {
101 return super.available();
102 } finally {
103 this.lock.unlock();
104 }
105 }
106
107 @Override
108 public int capacity() {
109 this.lock.lock();
110 try {
111 return super.capacity();
112 } finally {
113 this.lock.unlock();
114 }
115 }
116
117 @Override
118 public int length() {
119 this.lock.lock();
120 try {
121 return super.length();
122 } finally {
123 this.lock.unlock();
124 }
125 }
126
127 public int produceContent(final ContentEncoder encoder) throws IOException {
128 if (this.shutdown) {
129 return -1;
130 }
131 this.lock.lock();
132 try {
133 setOutputMode();
134 int bytesWritten = 0;
135 if (super.hasData()) {
136 bytesWritten = encoder.write(this.buffer);
137 if (encoder.isCompleted()) {
138 this.endOfStream = true;
139 }
140 }
141 if (!super.hasData()) {
142
143
144 if (this.endOfStream && !encoder.isCompleted()) {
145 encoder.complete();
146 }
147 if (!this.endOfStream) {
148
149 this.ioctrl.suspendOutput();
150 }
151 }
152 this.condition.signalAll();
153 return bytesWritten;
154 } finally {
155 this.lock.unlock();
156 }
157 }
158
159 public void close() {
160 shutdown();
161 }
162
163 public void shutdown() {
164 if (this.shutdown) {
165 return;
166 }
167 this.shutdown = true;
168 this.lock.lock();
169 try {
170 this.condition.signalAll();
171 } finally {
172 this.lock.unlock();
173 }
174 }
175
176 public void write(final byte[] b, int off, int len) throws IOException {
177 if (b == null) {
178 return;
179 }
180 this.lock.lock();
181 try {
182 if (this.shutdown || this.endOfStream) {
183 throw new IllegalStateException("Buffer already closed for writing");
184 }
185 setInputMode();
186 int remaining = len;
187 while (remaining > 0) {
188 if (!this.buffer.hasRemaining()) {
189 flushContent();
190 setInputMode();
191 }
192 int chunk = Math.min(remaining, this.buffer.remaining());
193 this.buffer.put(b, off, chunk);
194 remaining -= chunk;
195 off += chunk;
196 }
197 } finally {
198 this.lock.unlock();
199 }
200 }
201
202 public void write(final byte[] b) throws IOException {
203 if (b == null) {
204 return;
205 }
206 write(b, 0, b.length);
207 }
208
209 public void write(int b) throws IOException {
210 this.lock.lock();
211 try {
212 if (this.shutdown || this.endOfStream) {
213 throw new IllegalStateException("Buffer already closed for writing");
214 }
215 setInputMode();
216 if (!this.buffer.hasRemaining()) {
217 flushContent();
218 setInputMode();
219 }
220 this.buffer.put((byte)b);
221 } finally {
222 this.lock.unlock();
223 }
224 }
225
226 public void flush() throws IOException {
227 }
228
229 private void flushContent() throws IOException {
230 this.lock.lock();
231 try {
232 try {
233 while (super.hasData()) {
234 if (this.shutdown) {
235 throw new InterruptedIOException("Output operation aborted");
236 }
237 this.ioctrl.requestOutput();
238 this.condition.await();
239 }
240 } catch (InterruptedException ex) {
241 throw new IOException("Interrupted while flushing the content buffer");
242 }
243 } finally {
244 this.lock.unlock();
245 }
246 }
247
248 public void writeCompleted() throws IOException {
249 this.lock.lock();
250 try {
251 if (this.endOfStream) {
252 return;
253 }
254 this.endOfStream = true;
255 this.ioctrl.requestOutput();
256 } finally {
257 this.lock.unlock();
258 }
259 }
260
261 }