View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.ConnectionReuseStrategy;
35  import org.apache.hc.core5.http.EntityDetails;
36  import org.apache.hc.core5.http.Header;
37  import org.apache.hc.core5.http.HttpException;
38  import org.apache.hc.core5.http.HttpHeaders;
39  import org.apache.hc.core5.http.HttpRequest;
40  import org.apache.hc.core5.http.HttpResponse;
41  import org.apache.hc.core5.http.HttpStatus;
42  import org.apache.hc.core5.http.HttpVersion;
43  import org.apache.hc.core5.http.ProtocolException;
44  import org.apache.hc.core5.http.ProtocolVersion;
45  import org.apache.hc.core5.http.UnsupportedHttpVersionException;
46  import org.apache.hc.core5.http.config.H1Config;
47  import org.apache.hc.core5.http.message.StatusLine;
48  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
49  import org.apache.hc.core5.http.nio.CapacityChannel;
50  import org.apache.hc.core5.http.nio.DataStreamChannel;
51  import org.apache.hc.core5.http.nio.RequestChannel;
52  import org.apache.hc.core5.http.nio.ResourceHolder;
53  import org.apache.hc.core5.http.protocol.HttpContext;
54  import org.apache.hc.core5.http.protocol.HttpCoreContext;
55  import org.apache.hc.core5.http.protocol.HttpProcessor;
56  import org.apache.hc.core5.util.Timeout;
57  
58  class ClientHttp1StreamHandler implements ResourceHolder {
59  
60      private final Http1StreamChannel<HttpRequest> outputChannel;
61      private final DataStreamChannel internalDataChannel;
62      private final HttpProcessor httpProcessor;
63      private final H1Config h1Config;
64      private final ConnectionReuseStrategy connectionReuseStrategy;
65      private final AsyncClientExchangeHandler exchangeHandler;
66      private final HttpCoreContext context;
67      private final AtomicBoolean requestCommitted;
68      private final AtomicBoolean done;
69  
70      private volatile boolean keepAlive;
71      private volatile Timeout timeout;
72      private volatile HttpRequest committedRequest;
73      private volatile MessageState requestState;
74      private volatile MessageState responseState;
75  
76      ClientHttp1StreamHandler(
77              final Http1StreamChannel<HttpRequest> outputChannel,
78              final HttpProcessor httpProcessor,
79              final H1Config h1Config,
80              final ConnectionReuseStrategy connectionReuseStrategy,
81              final AsyncClientExchangeHandler exchangeHandler,
82              final HttpCoreContext context) {
83          this.outputChannel = outputChannel;
84          this.internalDataChannel = new DataStreamChannel() {
85  
86              @Override
87              public void requestOutput() {
88                  outputChannel.requestOutput();
89              }
90  
91              @Override
92              public void endStream(final List<? extends Header> trailers) throws IOException {
93                  outputChannel.complete(trailers);
94                  requestState = MessageState.COMPLETE;
95              }
96  
97              @Override
98              public int write(final ByteBuffer src) throws IOException {
99                  return outputChannel.write(src);
100             }
101 
102             @Override
103             public void endStream() throws IOException {
104                 endStream(null);
105             }
106 
107         };
108 
109         this.httpProcessor = httpProcessor;
110         this.h1Config = h1Config;
111         this.connectionReuseStrategy = connectionReuseStrategy;
112         this.exchangeHandler = exchangeHandler;
113         this.context = context;
114         this.requestCommitted = new AtomicBoolean(false);
115         this.done = new AtomicBoolean(false);
116         this.keepAlive = true;
117         this.requestState = MessageState.IDLE;
118         this.responseState = MessageState.HEADERS;
119     }
120 
121     boolean isResponseFinal() {
122         return responseState == MessageState.COMPLETE;
123     }
124 
125     boolean isCompleted() {
126         return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
127     }
128 
129     String getRequestMethod() {
130         return committedRequest != null ? committedRequest.getMethod() : null;
131     }
132 
133     boolean isOutputReady() {
134         switch (requestState) {
135             case IDLE:
136             case ACK:
137                 return true;
138             case BODY:
139                 return exchangeHandler.available() > 0;
140             default:
141                 return false;
142         }
143     }
144 
145     private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws IOException, HttpException {
146         if (requestCommitted.compareAndSet(false, true)) {
147             final ProtocolVersion transportVersion = request.getVersion();
148             if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
149                 throw new UnsupportedHttpVersionException(transportVersion);
150             }
151             if (transportVersion != null) {
152                 context.setProtocolVersion(transportVersion);
153             }
154             context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
155 
156             httpProcessor.process(request, entityDetails, context);
157 
158             final boolean endStream = entityDetails == null;
159             if (endStream) {
160                 outputChannel.submit(request, endStream, FlushMode.IMMEDIATE);
161                 committedRequest = request;
162                 requestState = MessageState.COMPLETE;
163             } else {
164                 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
165                 final boolean expectContinue = h != null && "100-continue".equalsIgnoreCase(h.getValue());
166                 outputChannel.submit(request, endStream, expectContinue ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
167                 committedRequest = request;
168                 if (expectContinue) {
169                     requestState = MessageState.ACK;
170                     timeout = outputChannel.getSocketTimeout();
171                     outputChannel.setSocketTimeout(h1Config.getWaitForContinueTimeout());
172                 } else {
173                     requestState = MessageState.BODY;
174                     exchangeHandler.produce(internalDataChannel);
175                 }
176             }
177         } else {
178             throw new HttpException("Request already committed");
179         }
180     }
181 
182     void produceOutput() throws HttpException, IOException {
183         switch (requestState) {
184             case IDLE:
185                 requestState = MessageState.HEADERS;
186                 exchangeHandler.produceRequest(new RequestChannel() {
187 
188                     @Override
189                     public void sendRequest(
190                             final HttpRequest request, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
191                         commitRequest(request, entityDetails);
192                     }
193 
194                 }, context);
195                 break;
196             case ACK:
197                 outputChannel.suspendOutput();
198                 break;
199             case BODY:
200                 exchangeHandler.produce(internalDataChannel);
201                 break;
202         }
203     }
204 
205     void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
206         if (done.get() || responseState != MessageState.HEADERS) {
207             throw new ProtocolException("Unexpected message head");
208         }
209         final ProtocolVersion transportVersion = response.getVersion();
210         if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
211             throw new UnsupportedHttpVersionException(transportVersion);
212         }
213 
214         final int status = response.getCode();
215         if (status < HttpStatus.SC_INFORMATIONAL) {
216             throw new ProtocolException("Invalid response: " + new StatusLine(response));
217         }
218         if (status > HttpStatus.SC_CONTINUE && status < HttpStatus.SC_SUCCESS) {
219             exchangeHandler.consumeInformation(response, context);
220         } else {
221             if (!connectionReuseStrategy.keepAlive(committedRequest, response, context)) {
222                 keepAlive = false;
223             }
224         }
225         if (requestState == MessageState.ACK) {
226             if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
227                 outputChannel.setSocketTimeout(timeout);
228                 requestState = MessageState.BODY;
229                 if (status < HttpStatus.SC_CLIENT_ERROR) {
230                     exchangeHandler.produce(internalDataChannel);
231                 }
232             }
233         }
234         if (status < HttpStatus.SC_SUCCESS) {
235             return;
236         }
237         if (requestState == MessageState.BODY) {
238             if (keepAlive && status >= HttpStatus.SC_CLIENT_ERROR) {
239                 requestState = MessageState.COMPLETE;
240                 if (!outputChannel.abortGracefully()) {
241                     keepAlive = false;
242                 }
243             }
244         }
245 
246         context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
247         httpProcessor.process(response, entityDetails, context);
248 
249         exchangeHandler.consumeResponse(response, entityDetails, context);
250         if (entityDetails == null) {
251             if (!keepAlive) {
252                 outputChannel.close();
253             }
254             responseState = MessageState.COMPLETE;
255         } else {
256             responseState = MessageState.BODY;
257         }
258     }
259 
260     void consumeData(final ByteBuffer src) throws HttpException, IOException {
261         if (done.get() || responseState != MessageState.BODY) {
262             throw new ProtocolException("Unexpected message data");
263         }
264         exchangeHandler.consume(src);
265     }
266 
267     void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
268         exchangeHandler.updateCapacity(capacityChannel);
269     }
270 
271     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
272         if (done.get() || responseState != MessageState.BODY) {
273             throw new ProtocolException("Unexpected message data");
274         }
275         if (!keepAlive) {
276             outputChannel.close();
277         }
278         responseState = MessageState.COMPLETE;
279         exchangeHandler.streamEnd(trailers);
280     }
281 
282     boolean handleTimeout() {
283         if (requestState == MessageState.ACK) {
284             requestState = MessageState.BODY;
285             outputChannel.setSocketTimeout(timeout);
286             outputChannel.requestOutput();
287             return true;
288         }
289         return false;
290     }
291 
292     void failed(final Exception cause) {
293         if (!done.get()) {
294             exchangeHandler.failed(cause);
295         }
296     }
297 
298     @Override
299     public void releaseResources() {
300         if (done.compareAndSet(false, true)) {
301             responseState = MessageState.COMPLETE;
302             requestState = MessageState.COMPLETE;
303             exchangeHandler.releaseResources();
304         }
305     }
306 
307     void appendState(final StringBuilder buf) {
308         buf.append("requestState=").append(requestState)
309                 .append(", responseState=").append(responseState)
310                 .append(", responseCommitted=").append(requestCommitted)
311                 .append(", keepAlive=").append(keepAlive)
312                 .append(", done=").append(done);
313     }
314 
315     @Override
316     public String toString() {
317         final StringBuilder buf = new StringBuilder();
318         buf.append("[");
319         appendState(buf);
320         buf.append("]");
321         return buf.toString();
322     }
323 
324 }
325