1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.compat;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.locks.ReentrantLock;
34
35 import org.apache.hc.core5.annotation.Internal;
36 import org.apache.hc.core5.http.nio.DataStreamChannel;
37 import org.apache.hc.core5.http.nio.support.classic.ContentOutputBuffer;
38 import org.apache.hc.core5.util.Timeout;
39
40
41
42
43 @Internal
44 final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer {
45
46 private final AtomicBoolean endStreamPropagated;
47 private volatile DataStreamChannel dataStreamChannel;
48 private volatile boolean hasCapacity;
49
50 public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) {
51 super(lock, initialBufferSize);
52 this.hasCapacity = false;
53 this.endStreamPropagated = new AtomicBoolean();
54 }
55
56 public SharedOutputBuffer(final int bufferSize) {
57 this(new ReentrantLock(), bufferSize);
58 }
59
60 public void flush(final DataStreamChannel channel) throws IOException {
61 lock.lock();
62 try {
63 dataStreamChannel = channel;
64 hasCapacity = true;
65 setOutputMode();
66 if (buffer().hasRemaining()) {
67 dataStreamChannel.write(buffer());
68 }
69 if (!buffer().hasRemaining() && endStream) {
70 propagateEndStream();
71 }
72 condition.signalAll();
73 } finally {
74 lock.unlock();
75 }
76 }
77
78 private void ensureNotAborted() throws InterruptedIOException {
79 if (aborted) {
80 throw new InterruptedIOException("Operation aborted");
81 }
82 }
83
84
85
86
87 public void write(final byte[] b, final int off, final int len, final Timeout timeout) throws IOException {
88 final ByteBuffer src = ByteBuffer.wrap(b, off, len);
89 lock.lock();
90 try {
91 ensureNotAborted();
92 setInputMode();
93 while (src.hasRemaining()) {
94
95 if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) {
96 buffer().put(src);
97 } else {
98 if (buffer().position() > 0 || dataStreamChannel == null) {
99 waitFlush(timeout);
100 }
101 if (buffer().position() == 0 && dataStreamChannel != null) {
102 final int bytesWritten = dataStreamChannel.write(src);
103 if (bytesWritten == 0) {
104 hasCapacity = false;
105 waitFlush(timeout);
106 }
107 }
108 }
109 }
110 } finally {
111 lock.unlock();
112 }
113 }
114
115 @Override
116 public void write(final byte[] b, final int off, final int len) throws IOException {
117 write(b, off, len, null);
118 }
119
120
121
122
123 public void write(final int b, final Timeout timeout) throws IOException {
124 lock.lock();
125 try {
126 ensureNotAborted();
127 setInputMode();
128 if (!buffer().hasRemaining()) {
129 waitFlush(timeout);
130 }
131 buffer().put((byte)b);
132 } finally {
133 lock.unlock();
134 }
135 }
136
137 @Override
138 public void write(final int b) throws IOException {
139 write(b, null);
140 }
141
142
143
144
145 public void writeCompleted(final Timeout timeout) throws IOException {
146 if (endStream) {
147 return;
148 }
149 lock.lock();
150 try {
151 if (!endStream) {
152 endStream = true;
153 if (dataStreamChannel != null) {
154 setOutputMode();
155 if (buffer().hasRemaining()) {
156 dataStreamChannel.requestOutput();
157 waitEndStream(timeout);
158 } else {
159 propagateEndStream();
160 }
161 }
162 }
163 } finally {
164 lock.unlock();
165 }
166 }
167
168 @Override
169 public void writeCompleted() throws IOException {
170 writeCompleted(null);
171 }
172
173 private void waitFlush(final Timeout timeout) throws InterruptedIOException {
174 if (dataStreamChannel != null) {
175 dataStreamChannel.requestOutput();
176 }
177 setOutputMode();
178 while (buffer().hasRemaining() || !hasCapacity) {
179 ensureNotAborted();
180 waitForSignal(timeout);
181 }
182 setInputMode();
183 }
184
185 private void waitEndStream(final Timeout timeout) throws InterruptedIOException {
186 if (dataStreamChannel != null) {
187 dataStreamChannel.requestOutput();
188 }
189 while (!endStreamPropagated.get() && !aborted) {
190 waitForSignal(timeout);
191 }
192 }
193
194 private void waitForSignal(final Timeout timeout) throws InterruptedIOException {
195 try {
196 if (timeout == null) {
197 condition.await();
198 } else {
199 if (!condition.await(timeout.getDuration(), timeout.getTimeUnit())) {
200 aborted = true;
201 throw new InterruptedIOException("Timeout blocked waiting for output (" + timeout + ")");
202 }
203 }
204 } catch (final InterruptedException ex) {
205 Thread.currentThread().interrupt();
206 throw new InterruptedIOException(ex.getMessage());
207 }
208 }
209
210 private void propagateEndStream() throws IOException {
211 if (endStreamPropagated.compareAndSet(false, true)) {
212 dataStreamChannel.endStream();
213 }
214 }
215
216 }