1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.compat;
28
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.InterruptedIOException;
32 import java.io.OutputStream;
33 import java.nio.ByteBuffer;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.Set;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import org.apache.hc.core5.annotation.Experimental;
41 import org.apache.hc.core5.annotation.Internal;
42 import org.apache.hc.core5.concurrent.FutureCallback;
43 import org.apache.hc.core5.function.Supplier;
44 import org.apache.hc.core5.http.ClassicHttpResponse;
45 import org.apache.hc.core5.http.EntityDetails;
46 import org.apache.hc.core5.http.Header;
47 import org.apache.hc.core5.http.HttpEntity;
48 import org.apache.hc.core5.http.HttpException;
49 import org.apache.hc.core5.http.HttpResponse;
50 import org.apache.hc.core5.http.io.entity.AbstractHttpEntity;
51 import org.apache.hc.core5.http.io.support.ClassicResponseBuilder;
52 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
53 import org.apache.hc.core5.http.nio.CapacityChannel;
54 import org.apache.hc.core5.http.protocol.HttpContext;
55 import org.apache.hc.core5.io.Closer;
56 import org.apache.hc.core5.util.Args;
57 import org.apache.hc.core5.util.Asserts;
58 import org.apache.hc.core5.util.Timeout;
59
60
61
62
63 @Experimental
64 @Internal
65 class ClassicToAsyncResponseConsumer implements AsyncResponseConsumer<Void> {
66
67 static class ResponseData {
68
69 final HttpResponse head;
70 final EntityDetails entityDetails;
71
72 ResponseData(final HttpResponse head,
73 final EntityDetails entityDetails) {
74 this.head = head;
75 this.entityDetails = entityDetails;
76 }
77
78 }
79
80 private final int initialBufferSize;
81 private final Timeout timeout;
82 private final CountDownLatch countDownLatch;
83 private final AtomicReference<ResponseData> responseRef;
84 private final AtomicReference<FutureCallback<Void>> callbackRef;
85 private final AtomicReference<SharedInputBuffer> bufferRef;
86 private final AtomicReference<Exception> exceptionRef;
87
88 public ClassicToAsyncResponseConsumer(final int initialBufferSize, final Timeout timeout) {
89 this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
90 this.timeout = timeout;
91 this.countDownLatch = new CountDownLatch(1);
92 this.responseRef = new AtomicReference<>();
93 this.callbackRef = new AtomicReference<>();
94 this.bufferRef = new AtomicReference<>();
95 this.exceptionRef = new AtomicReference<>();
96 }
97
98 public ClassicToAsyncResponseConsumer(final Timeout timeout) {
99 this(ClassicToAsyncSupport.INITIAL_BUF_SIZE, timeout);
100 }
101
102 void propagateException() throws IOException {
103 final Exception ex = exceptionRef.getAndSet(null);
104 if (ex != null) {
105 ClassicToAsyncSupport.rethrow(ex);
106 }
107 }
108
109 void fireComplete() throws IOException {
110 final FutureCallback<Void> callback = callbackRef.getAndSet(null);
111 if (callback != null) {
112 callback.completed(null);
113 }
114 }
115
116 public ClassicHttpResponse blockWaiting() throws IOException, InterruptedException {
117 if (timeout == null) {
118 countDownLatch.await();
119 } else {
120 if (!countDownLatch.await(timeout.getDuration(), timeout.getTimeUnit())) {
121 throw new InterruptedIOException("Timeout blocked waiting for input (" + timeout + ")");
122 }
123 }
124 propagateException();
125 final ResponseData r = responseRef.getAndSet(null);
126 Asserts.notNull(r, "HTTP response is missing");
127 final SharedInputBuffer inputBuffer = bufferRef.get();
128 return ClassicResponseBuilder.create(r.head.getCode())
129 .setHeaders(r.head.getHeaders())
130 .setVersion(r.head.getVersion())
131 .setEntity(r.entityDetails != null ?
132 new IncomingHttpEntity(new InternalInputStream(inputBuffer), r.entityDetails) :
133 null)
134 .build();
135 }
136
137 @Override
138 public void consumeResponse(final HttpResponse asyncResponse,
139 final EntityDetails entityDetails,
140 final HttpContext context,
141 final FutureCallback<Void> resultCallback) throws HttpException, IOException {
142 callbackRef.set(resultCallback);
143 final ResponseData responseData = new ResponseData(asyncResponse, entityDetails);
144 responseRef.set(responseData);
145 if (entityDetails != null) {
146 bufferRef.set(new SharedInputBuffer(initialBufferSize));
147 } else {
148 fireComplete();
149 }
150 countDownLatch.countDown();
151 }
152
153 @Override
154 public void informationResponse(final HttpResponse response,
155 final HttpContext context) throws HttpException, IOException {
156 }
157
158 @Override
159 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
160 final SharedInputBuffer buffer = bufferRef.get();
161 if (buffer != null) {
162 buffer.updateCapacity(capacityChannel);
163 }
164 }
165
166 @Override
167 public final void consume(final ByteBuffer src) throws IOException {
168 final SharedInputBuffer buffer = bufferRef.get();
169 if (buffer != null) {
170 buffer.fill(src);
171 }
172 }
173
174 @Override
175 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
176 final SharedInputBuffer buffer = bufferRef.get();
177 if (buffer != null) {
178 buffer.markEndStream();
179 }
180 }
181
182 @Override
183 public final void failed(final Exception cause) {
184 try {
185 exceptionRef.set(cause);
186 } finally {
187 countDownLatch.countDown();
188 }
189 }
190
191 @Override
192 public void releaseResources() {
193 }
194
195 class InternalInputStream extends InputStream {
196
197 private final SharedInputBuffer buffer;
198
199 InternalInputStream(final SharedInputBuffer buffer) {
200 super();
201 Args.notNull(buffer, "Input buffer");
202 this.buffer = buffer;
203 }
204
205 @Override
206 public int available() throws IOException {
207 propagateException();
208 return this.buffer.length();
209 }
210
211 @Override
212 public int read(final byte[] b, final int off, final int len) throws IOException {
213 propagateException();
214 if (len == 0) {
215 return 0;
216 }
217 final int bytesRead = this.buffer.read(b, off, len, timeout);
218 if (bytesRead == -1) {
219 fireComplete();
220 }
221 return bytesRead;
222 }
223
224 @Override
225 public int read(final byte[] b) throws IOException {
226 propagateException();
227 if (b == null) {
228 return 0;
229 }
230 final int bytesRead = this.buffer.read(b, 0, b.length, timeout);
231 if (bytesRead == -1) {
232 fireComplete();
233 }
234 return bytesRead;
235 }
236
237 @Override
238 public int read() throws IOException {
239 propagateException();
240 final int b = this.buffer.read(timeout);
241 if (b == -1) {
242 fireComplete();
243 }
244 return b;
245 }
246
247 @Override
248 public void close() throws IOException {
249
250 final byte[] tmp = new byte[1024];
251 do {
252
253 } while (read(tmp) >= 0);
254 super.close();
255 }
256
257 }
258
259 static class IncomingHttpEntity implements HttpEntity {
260
261 private final InputStream content;
262 private final EntityDetails entityDetails;
263
264 IncomingHttpEntity(final InputStream content, final EntityDetails entityDetails) {
265 this.content = content;
266 this.entityDetails = entityDetails;
267 }
268
269 @Override
270 public boolean isRepeatable() {
271 return false;
272 }
273
274 @Override
275 public boolean isChunked() {
276 return entityDetails.isChunked();
277 }
278
279 @Override
280 public long getContentLength() {
281 return entityDetails.getContentLength();
282 }
283
284 @Override
285 public String getContentType() {
286 return entityDetails.getContentType();
287 }
288
289 @Override
290 public String getContentEncoding() {
291 return entityDetails.getContentEncoding();
292 }
293
294 @Override
295 public InputStream getContent() throws IOException, IllegalStateException {
296 return content;
297 }
298
299 @Override
300 public boolean isStreaming() {
301 return content != null;
302 }
303
304 @Override
305 public void writeTo(final OutputStream outStream) throws IOException {
306 AbstractHttpEntity.writeTo(this, outStream);
307 }
308
309 @Override
310 public Supplier<List<? extends Header>> getTrailers() {
311 return null;
312 }
313
314 @Override
315 public Set<String> getTrailerNames() {
316 return Collections.emptySet();
317 }
318
319 @Override
320 public void close() throws IOException {
321 Closer.close(content);
322 }
323
324 @Override
325 public String toString() {
326 return entityDetails.toString();
327 }
328
329 }
330
331 }