View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.ConnectionReuseStrategy;
35  import org.apache.hc.core5.http.EntityDetails;
36  import org.apache.hc.core5.http.Header;
37  import org.apache.hc.core5.http.HttpException;
38  import org.apache.hc.core5.http.HttpRequest;
39  import org.apache.hc.core5.http.HttpResponse;
40  import org.apache.hc.core5.http.HttpStatus;
41  import org.apache.hc.core5.http.HttpVersion;
42  import org.apache.hc.core5.http.MisdirectedRequestException;
43  import org.apache.hc.core5.http.ProtocolException;
44  import org.apache.hc.core5.http.ProtocolVersion;
45  import org.apache.hc.core5.http.UnsupportedHttpVersionException;
46  import org.apache.hc.core5.http.nio.AsyncPushProducer;
47  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
48  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
49  import org.apache.hc.core5.http.nio.CapacityChannel;
50  import org.apache.hc.core5.http.nio.DataStreamChannel;
51  import org.apache.hc.core5.http.nio.HandlerFactory;
52  import org.apache.hc.core5.http.nio.ResourceHolder;
53  import org.apache.hc.core5.http.nio.ResponseChannel;
54  import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
55  import org.apache.hc.core5.http.protocol.HttpCoreContext;
56  import org.apache.hc.core5.http.protocol.HttpProcessor;
57  import org.apache.hc.core5.util.Asserts;
58  
59  class ServerHttp1StreamHandler implements ResourceHolder {
60  
61      private final Http1StreamChannel<HttpResponse> outputChannel;
62      private final DataStreamChannel internalDataChannel;
63      private final HttpProcessor httpProcessor;
64      private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
65      private final ConnectionReuseStrategy connectionReuseStrategy;
66      private final HttpCoreContext context;
67      private final AtomicBoolean responseCommitted;
68      private final AtomicBoolean done;
69  
70      private volatile boolean keepAlive;
71      private volatile AsyncServerExchangeHandler exchangeHandler;
72      private volatile HttpRequest receivedRequest;
73      private volatile MessageState requestState;
74      private volatile MessageState responseState;
75  
76      ServerHttp1StreamHandler(
77              final Http1StreamChannel<HttpResponse> outputChannel,
78              final HttpProcessor httpProcessor,
79              final ConnectionReuseStrategy connectionReuseStrategy,
80              final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
81              final HttpCoreContext context) {
82          this.outputChannel = outputChannel;
83          this.internalDataChannel = new DataStreamChannel() {
84  
85              @Override
86              public void requestOutput() {
87                  outputChannel.requestOutput();
88              }
89  
90              @Override
91              public void endStream(final List<? extends Header> trailers) throws IOException {
92                  outputChannel.complete(trailers);
93                  if (!keepAlive) {
94                      outputChannel.close();
95                  }
96                  responseState = MessageState.COMPLETE;
97              }
98  
99              @Override
100             public int write(final ByteBuffer src) throws IOException {
101                 return outputChannel.write(src);
102             }
103 
104             @Override
105             public void endStream() throws IOException {
106                 endStream(null);
107             }
108 
109         };
110 
111         this.httpProcessor = httpProcessor;
112         this.connectionReuseStrategy = connectionReuseStrategy;
113         this.exchangeHandlerFactory = exchangeHandlerFactory;
114         this.context = context;
115         this.responseCommitted = new AtomicBoolean(false);
116         this.done = new AtomicBoolean(false);
117         this.keepAlive = true;
118         this.requestState = MessageState.HEADERS;
119         this.responseState = MessageState.IDLE;
120     }
121 
122     private void commitResponse(
123             final HttpResponse response,
124             final EntityDetails responseEntityDetails) throws HttpException, IOException {
125         if (responseCommitted.compareAndSet(false, true)) {
126 
127             final ProtocolVersion transportVersion = response.getVersion();
128             if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
129                 throw new UnsupportedHttpVersionException("Unsupported version: " + transportVersion);
130             }
131 
132             final int status = response.getCode();
133             if (status < HttpStatus.SC_SUCCESS) {
134                 throw new HttpException("Invalid response: " + status);
135             }
136 
137             Asserts.notNull(receivedRequest, "Received request");
138             final String method = receivedRequest.getMethod();
139             context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
140             httpProcessor.process(response, responseEntityDetails, context);
141 
142             final boolean endStream = responseEntityDetails == null || method.equalsIgnoreCase("HEAD");
143 
144             if (!connectionReuseStrategy.keepAlive(receivedRequest, response, context)) {
145                 keepAlive = false;
146             }
147 
148             outputChannel.submit(response, endStream);
149             if (endStream) {
150                 if (!keepAlive) {
151                     outputChannel.close();
152                 }
153                 responseState = MessageState.COMPLETE;
154             } else {
155                 responseState = MessageState.BODY;
156                 exchangeHandler.produce(internalDataChannel);
157             }
158         } else {
159             throw new HttpException("Response already committed");
160         }
161     }
162 
163     private void commitInformation(final HttpResponse response) throws IOException, HttpException {
164         if (responseCommitted.get()) {
165             throw new HttpException("Response already committed");
166         }
167         final int status = response.getCode();
168         if (status < HttpStatus.SC_INFORMATIONAL || status >= HttpStatus.SC_SUCCESS) {
169             throw new HttpException("Invalid intermediate response: " + status);
170         }
171         outputChannel.submit(response, true);
172     }
173 
174     private void commitPromise() throws HttpException {
175         throw new HttpException("HTTP/1.1 does not support server push");
176     }
177 
178     void activateChannel() throws IOException, HttpException {
179         outputChannel.activate();
180     }
181 
182     boolean isResponseFinal() {
183         return responseState == MessageState.COMPLETE;
184     }
185 
186     boolean isCompleted() {
187         return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
188     }
189 
190     void consumeHeader(final HttpRequest request, final EntityDetails requestEntityDetails) throws HttpException, IOException {
191         if (done.get() || requestState != MessageState.HEADERS) {
192             throw new ProtocolException("Unexpected message head");
193         }
194         receivedRequest = request;
195         requestState = requestEntityDetails == null ? MessageState.COMPLETE : MessageState.BODY;
196 
197         AsyncServerExchangeHandler handler;
198         try {
199             handler = exchangeHandlerFactory.create(request, context);
200         } catch (final MisdirectedRequestException ex) {
201             handler =  new ImmediateResponseExchangeHandler(HttpStatus.SC_MISDIRECTED_REQUEST, ex.getMessage());
202         } catch (final HttpException ex) {
203             handler =  new ImmediateResponseExchangeHandler(HttpStatus.SC_INTERNAL_SERVER_ERROR, ex.getMessage());
204         }
205         if (handler == null) {
206             handler = new ImmediateResponseExchangeHandler(HttpStatus.SC_NOT_FOUND, "Cannot handle request");
207         }
208 
209         exchangeHandler = handler;
210 
211         final ProtocolVersion transportVersion = request.getVersion();
212         if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
213             throw new UnsupportedHttpVersionException("Unsupported version: " + transportVersion);
214         }
215         context.setProtocolVersion(transportVersion);
216         context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
217 
218         final ResponseChannel responseChannel = new ResponseChannel() {
219 
220             @Override
221             public void sendInformation(final HttpResponse response) throws HttpException, IOException {
222                 commitInformation(response);
223             }
224 
225             @Override
226             public void sendResponse(
227                     final HttpResponse response, final EntityDetails responseEntityDetails) throws HttpException, IOException {
228                 ServerSupport.validateResponse(response, responseEntityDetails);
229                 commitResponse(response, responseEntityDetails);
230             }
231 
232             @Override
233             public void pushPromise(
234                     final HttpRequest promise, final AsyncPushProducer pushProducer) throws HttpException, IOException {
235                 commitPromise();
236             }
237 
238         };
239         try {
240             httpProcessor.process(request, requestEntityDetails, context);
241             exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
242         } catch (final HttpException ex) {
243             if (!responseCommitted.get()) {
244                 final AsyncResponseProducer responseProducer = ServerSupport.handleException(ex);
245                 exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
246                 exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
247             } else {
248                 throw ex;
249             }
250         }
251 
252     }
253 
254     boolean isOutputReady() {
255         switch (responseState) {
256             case BODY:
257                 return exchangeHandler.available() > 0;
258             default:
259                 return false;
260         }
261     }
262 
263     void produceOutput() throws HttpException, IOException {
264         switch (responseState) {
265             case BODY:
266                 exchangeHandler.produce(internalDataChannel);
267                 break;
268         }
269     }
270 
271     int consumeData(final ByteBuffer src) throws HttpException, IOException {
272         if (done.get() || requestState != MessageState.BODY) {
273             throw new ProtocolException("Unexpected message data");
274         }
275         if (responseState == MessageState.ACK) {
276             outputChannel.requestOutput();
277         }
278         return exchangeHandler.consume(src);
279     }
280 
281     void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
282         exchangeHandler.updateCapacity(capacityChannel);
283     }
284 
285     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
286         if (done.get() || requestState != MessageState.BODY) {
287             throw new ProtocolException("Unexpected message data");
288         }
289         requestState = MessageState.COMPLETE;
290         exchangeHandler.streamEnd(trailers);
291     }
292 
293     void failed(final Exception cause) {
294         exchangeHandler.failed(cause);
295     }
296 
297     @Override
298     public void releaseResources() {
299         if (done.compareAndSet(false, true)) {
300             requestState = MessageState.COMPLETE;
301             responseState = MessageState.COMPLETE;
302             exchangeHandler.releaseResources();
303         }
304     }
305 
306     @Override
307     public String toString() {
308         return "[" +
309                 "requestState=" + requestState +
310                 ", responseState=" + responseState +
311                 ']';
312     }
313 
314 }
315