1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.compat;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.io.OutputStream;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.core5.annotation.Experimental;
36 import org.apache.hc.core5.annotation.Internal;
37 import org.apache.hc.core5.http.ClassicHttpRequest;
38 import org.apache.hc.core5.http.HttpEntity;
39 import org.apache.hc.core5.http.HttpException;
40 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
41 import org.apache.hc.core5.http.nio.DataStreamChannel;
42 import org.apache.hc.core5.http.nio.RequestChannel;
43 import org.apache.hc.core5.http.protocol.HttpContext;
44 import org.apache.hc.core5.util.Args;
45 import org.apache.hc.core5.util.Asserts;
46 import org.apache.hc.core5.util.Timeout;
47
48
49
50
51 @Experimental
52 @Internal
53 class ClassicToAsyncRequestProducer implements AsyncRequestProducer {
54
55 private final ClassicHttpRequest request;
56 private final int initialBufferSize;
57 private final Timeout timeout;
58 private final CountDownLatch countDownLatch;
59 private final AtomicReference<SharedOutputBuffer> bufferRef;
60 private final AtomicReference<Exception> exceptionRef;
61
62 private volatile boolean repeatable;
63
64 public interface IORunnable {
65
66 void execute() throws IOException;
67
68 }
69
70 public ClassicToAsyncRequestProducer(final ClassicHttpRequest request, final int initialBufferSize, final Timeout timeout) {
71 this.request = Args.notNull(request, "HTTP request");
72 this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
73 this.timeout = timeout;
74 this.countDownLatch = new CountDownLatch(1);
75 this.bufferRef = new AtomicReference<>();
76 this.exceptionRef = new AtomicReference<>();
77 }
78
79 public ClassicToAsyncRequestProducer(final ClassicHttpRequest request, final Timeout timeout) {
80 this(request, ClassicToAsyncSupport.INITIAL_BUF_SIZE, timeout);
81 }
82
83 void propagateException() throws IOException {
84 final Exception ex = exceptionRef.getAndSet(null);
85 if (ex != null) {
86 ClassicToAsyncSupport.rethrow(ex);
87 }
88 }
89
90 public IORunnable blockWaiting() throws IOException, InterruptedException {
91 if (timeout == null) {
92 countDownLatch.await();
93 } else {
94 if (!countDownLatch.await(timeout.getDuration(), timeout.getTimeUnit())) {
95 throw new InterruptedIOException("Timeout blocked waiting for output (" + timeout + ")");
96 }
97 }
98 propagateException();
99 final SharedOutputBuffer outputBuffer = bufferRef.get();
100 return () -> {
101 final HttpEntity requestEntity = request.getEntity();
102 if (requestEntity != null) {
103 try (final InternalOutputStream outputStream = new InternalOutputStream(outputBuffer)) {
104 requestEntity.writeTo(outputStream);
105 }
106 }
107 };
108 }
109
110 @Override
111 public void sendRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
112 final HttpEntity requestEntity = request.getEntity();
113 final SharedOutputBuffer buffer = requestEntity != null ? new SharedOutputBuffer(initialBufferSize) : null;
114 bufferRef.set(buffer);
115 repeatable = requestEntity == null || requestEntity.isRepeatable();
116 channel.sendRequest(request, requestEntity, null);
117 countDownLatch.countDown();
118 }
119
120 @Override
121 public boolean isRepeatable() {
122 return repeatable;
123 }
124
125 @Override
126 public int available() {
127 final SharedOutputBuffer buffer = bufferRef.get();
128 if (buffer != null) {
129 return buffer.length();
130 }
131 return 0;
132 }
133
134 @Override
135 public void produce(final DataStreamChannel channel) throws IOException {
136 final SharedOutputBuffer buffer = bufferRef.get();
137 if (buffer != null) {
138 buffer.flush(channel);
139 }
140 }
141
142 @Override
143 public void failed(final Exception cause) {
144 try {
145 exceptionRef.set(cause);
146 } finally {
147 countDownLatch.countDown();
148 }
149 }
150
151 @Override
152 public void releaseResources() {
153 }
154
155 class InternalOutputStream extends OutputStream {
156
157 private final SharedOutputBuffer buffer;
158
159 public InternalOutputStream(final SharedOutputBuffer buffer) {
160 Asserts.notNull(buffer, "Shared buffer");
161 this.buffer = buffer;
162 }
163
164 @Override
165 public void close() throws IOException {
166 propagateException();
167 this.buffer.writeCompleted(timeout);
168 }
169
170 @Override
171 public void flush() throws IOException {
172 propagateException();
173 }
174
175 @Override
176 public void write(final byte[] b, final int off, final int len) throws IOException {
177 propagateException();
178 this.buffer.write(b, off, len, timeout);
179 }
180
181 @Override
182 public void write(final byte[] b) throws IOException {
183 propagateException();
184 if (b == null) {
185 return;
186 }
187 this.buffer.write(b, 0, b.length, timeout);
188 }
189
190 @Override
191 public void write(final int b) throws IOException {
192 propagateException();
193 this.buffer.write(b, timeout);
194 }
195
196 }
197
198 }