View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.ConnectionReuseStrategy;
35  import org.apache.hc.core5.http.EntityDetails;
36  import org.apache.hc.core5.http.Header;
37  import org.apache.hc.core5.http.HttpException;
38  import org.apache.hc.core5.http.HttpHeaders;
39  import org.apache.hc.core5.http.HttpRequest;
40  import org.apache.hc.core5.http.HttpResponse;
41  import org.apache.hc.core5.http.HttpStatus;
42  import org.apache.hc.core5.http.HttpVersion;
43  import org.apache.hc.core5.http.ProtocolException;
44  import org.apache.hc.core5.http.ProtocolVersion;
45  import org.apache.hc.core5.http.UnsupportedHttpVersionException;
46  import org.apache.hc.core5.http.config.H1Config;
47  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
48  import org.apache.hc.core5.http.nio.CapacityChannel;
49  import org.apache.hc.core5.http.nio.DataStreamChannel;
50  import org.apache.hc.core5.http.nio.RequestChannel;
51  import org.apache.hc.core5.http.nio.ResourceHolder;
52  import org.apache.hc.core5.http.protocol.HttpCoreContext;
53  import org.apache.hc.core5.http.protocol.HttpProcessor;
54  
55  class ClientHttp1StreamHandler implements ResourceHolder {
56  
57      private final Http1StreamChannel<HttpRequest> outputChannel;
58      private final DataStreamChannel internalDataChannel;
59      private final HttpProcessor httpProcessor;
60      private final H1Config h1Config;
61      private final ConnectionReuseStrategy connectionReuseStrategy;
62      private final AsyncClientExchangeHandler exchangeHandler;
63      private final HttpCoreContext context;
64      private final AtomicBoolean requestCommitted;
65      private final AtomicBoolean done;
66  
67      private volatile boolean keepAlive;
68      private volatile int timeout;
69      private volatile HttpRequest committedRequest;
70      private volatile MessageState requestState;
71      private volatile MessageState responseState;
72  
73      ClientHttp1StreamHandler(
74              final Http1StreamChannel<HttpRequest> outputChannel,
75              final HttpProcessor httpProcessor,
76              final H1Config h1Config,
77              final ConnectionReuseStrategy connectionReuseStrategy,
78              final AsyncClientExchangeHandler exchangeHandler,
79              final HttpCoreContext context) {
80          this.outputChannel = outputChannel;
81          this.internalDataChannel = new DataStreamChannel() {
82  
83              @Override
84              public void requestOutput() {
85                  outputChannel.requestOutput();
86              }
87  
88              @Override
89              public void endStream(final List<? extends Header> trailers) throws IOException {
90                  outputChannel.complete(trailers);
91                  requestState = MessageState.COMPLETE;
92              }
93  
94              @Override
95              public int write(final ByteBuffer src) throws IOException {
96                  return outputChannel.write(src);
97              }
98  
99              @Override
100             public void endStream() throws IOException {
101                 endStream(null);
102             }
103 
104         };
105 
106         this.httpProcessor = httpProcessor;
107         this.h1Config = h1Config;
108         this.connectionReuseStrategy = connectionReuseStrategy;
109         this.exchangeHandler = exchangeHandler;
110         this.context = context;
111         this.requestCommitted = new AtomicBoolean(false);
112         this.done = new AtomicBoolean(false);
113         this.keepAlive = true;
114         this.requestState = MessageState.IDLE;
115         this.responseState = MessageState.HEADERS;
116     }
117 
118     boolean isResponseFinal() {
119         return responseState == MessageState.COMPLETE;
120     }
121 
122     boolean isCompleted() {
123         return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
124     }
125 
126     String getRequestMethod() {
127         return committedRequest != null ? committedRequest.getMethod() : null;
128     }
129 
130     boolean isOutputReady() {
131         switch (requestState) {
132             case IDLE:
133             case ACK:
134                 return true;
135             case BODY:
136                 return exchangeHandler.available() > 0;
137             default:
138                 return false;
139         }
140     }
141 
142     private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws IOException, HttpException {
143         if (requestCommitted.compareAndSet(false, true)) {
144             final ProtocolVersion transportVersion = request.getVersion();
145             if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
146                 throw new UnsupportedHttpVersionException("Unsupported version: " + transportVersion);
147             }
148             context.setProtocolVersion(transportVersion);
149             context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
150 
151             httpProcessor.process(request, entityDetails, context);
152 
153             final boolean endStream = entityDetails == null;
154             outputChannel.submit(request, endStream);
155             committedRequest = request;
156             if (endStream) {
157                 requestState = MessageState.COMPLETE;
158             } else {
159                 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
160                 final boolean expectContinue = h != null && "100-continue".equalsIgnoreCase(h.getValue());
161                 if (expectContinue) {
162                     requestState = MessageState.ACK;
163                     timeout = outputChannel.getSocketTimeout();
164                     outputChannel.setSocketTimeout(h1Config.getWaitForContinueTimeout());
165                 } else {
166                     requestState = MessageState.BODY;
167                     exchangeHandler.produce(internalDataChannel);
168                 }
169             }
170         } else {
171             throw new HttpException("Request already committed");
172         }
173     }
174 
175     void produceOutput() throws HttpException, IOException {
176         switch (requestState) {
177             case IDLE:
178                 requestState = MessageState.HEADERS;
179                 exchangeHandler.produceRequest(new RequestChannel() {
180 
181                     @Override
182                     public void sendRequest(
183                             final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
184                         commitRequest(request, entityDetails);
185                     }
186 
187                 });
188                 break;
189             case ACK:
190                 outputChannel.suspendOutput();
191                 break;
192             case BODY:
193                 exchangeHandler.produce(internalDataChannel);
194                 break;
195         }
196     }
197 
198     void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
199         if (done.get() || responseState != MessageState.HEADERS) {
200             throw new ProtocolException("Unexpected message head");
201         }
202         final ProtocolVersion transportVersion = response.getVersion();
203         if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
204             throw new UnsupportedHttpVersionException("Unsupported version: " + transportVersion);
205         }
206 
207         final int status = response.getCode();
208         if (status < HttpStatus.SC_INFORMATIONAL) {
209             throw new ProtocolException("Invalid response: " + status);
210         }
211         if (status > HttpStatus.SC_CONTINUE && status < HttpStatus.SC_SUCCESS) {
212             exchangeHandler.consumeInformation(response);
213         } else {
214             if (!connectionReuseStrategy.keepAlive(committedRequest, response, context)) {
215                 keepAlive = false;
216             }
217         }
218         if (requestState == MessageState.ACK) {
219             if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
220                 outputChannel.setSocketTimeout(timeout);
221                 requestState = MessageState.BODY;
222                 if (status < HttpStatus.SC_CLIENT_ERROR) {
223                     exchangeHandler.produce(internalDataChannel);
224                 }
225             }
226         }
227         if (status < HttpStatus.SC_SUCCESS) {
228             return;
229         }
230         if (requestState == MessageState.BODY) {
231             if (keepAlive && status >= HttpStatus.SC_CLIENT_ERROR) {
232                 requestState = MessageState.COMPLETE;
233                 if (!outputChannel.abortGracefully()) {
234                     keepAlive = false;
235                 }
236             }
237         }
238 
239         context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
240         httpProcessor.process(response, entityDetails, context);
241 
242         exchangeHandler.consumeResponse(response, entityDetails);
243         if (entityDetails == null) {
244             if (!keepAlive) {
245                 outputChannel.close();
246             }
247             responseState = MessageState.COMPLETE;
248         } else {
249             responseState = MessageState.BODY;
250         }
251     }
252 
253     int consumeData(final ByteBuffer src) throws HttpException, IOException {
254         if (done.get() || responseState != MessageState.BODY) {
255             throw new ProtocolException("Unexpected message data");
256         }
257         return exchangeHandler.consume(src);
258     }
259 
260     void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
261         exchangeHandler.updateCapacity(capacityChannel);
262     }
263 
264     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
265         if (done.get() || responseState != MessageState.BODY) {
266             throw new ProtocolException("Unexpected message data");
267         }
268         if (!keepAlive) {
269             outputChannel.close();
270         }
271         responseState = MessageState.COMPLETE;
272         exchangeHandler.streamEnd(trailers);
273     }
274 
275     boolean handleTimeout() {
276         if (requestState == MessageState.ACK) {
277             requestState = MessageState.BODY;
278             outputChannel.setSocketTimeout(timeout);
279             outputChannel.requestOutput();
280             return true;
281         } else {
282             return false;
283         }
284     }
285 
286     void failed(final Exception cause) {
287         exchangeHandler.failed(cause);
288     }
289 
290     @Override
291     public void releaseResources() {
292         if (done.compareAndSet(false, true)) {
293             responseState = MessageState.COMPLETE;
294             requestState = MessageState.COMPLETE;
295             exchangeHandler.releaseResources();
296         }
297     }
298 
299     @Override
300     public String toString() {
301         return "[" +
302                 "requestState=" + requestState +
303                 ", responseState=" + responseState +
304                 ']';
305     }
306 
307 }
308