View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.ConnectionReuseStrategy;
35  import org.apache.hc.core5.http.EntityDetails;
36  import org.apache.hc.core5.http.Header;
37  import org.apache.hc.core5.http.HttpException;
38  import org.apache.hc.core5.http.HttpRequest;
39  import org.apache.hc.core5.http.HttpResponse;
40  import org.apache.hc.core5.http.HttpStatus;
41  import org.apache.hc.core5.http.HttpVersion;
42  import org.apache.hc.core5.http.MisdirectedRequestException;
43  import org.apache.hc.core5.http.ProtocolException;
44  import org.apache.hc.core5.http.ProtocolVersion;
45  import org.apache.hc.core5.http.UnsupportedHttpVersionException;
46  import org.apache.hc.core5.http.nio.AsyncPushProducer;
47  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
48  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
49  import org.apache.hc.core5.http.nio.CapacityChannel;
50  import org.apache.hc.core5.http.nio.DataStreamChannel;
51  import org.apache.hc.core5.http.nio.HandlerFactory;
52  import org.apache.hc.core5.http.nio.ResourceHolder;
53  import org.apache.hc.core5.http.nio.ResponseChannel;
54  import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
55  import org.apache.hc.core5.http.protocol.HttpContext;
56  import org.apache.hc.core5.http.protocol.HttpCoreContext;
57  import org.apache.hc.core5.http.protocol.HttpProcessor;
58  import org.apache.hc.core5.util.Asserts;
59  
60  class ServerHttp1StreamHandler implements ResourceHolder {
61  
62      private final Http1StreamChannel<HttpResponse> outputChannel;
63      private final DataStreamChannel internalDataChannel;
64      private final HttpProcessor httpProcessor;
65      private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
66      private final ConnectionReuseStrategy connectionReuseStrategy;
67      private final HttpCoreContext context;
68      private final AtomicBoolean responseCommitted;
69      private final AtomicBoolean done;
70  
71      private volatile boolean keepAlive;
72      private volatile AsyncServerExchangeHandler exchangeHandler;
73      private volatile HttpRequest receivedRequest;
74      private volatile MessageState requestState;
75      private volatile MessageState responseState;
76  
77      ServerHttp1StreamHandler(
78              final Http1StreamChannel<HttpResponse> outputChannel,
79              final HttpProcessor httpProcessor,
80              final ConnectionReuseStrategy connectionReuseStrategy,
81              final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
82              final HttpCoreContext context) {
83          this.outputChannel = outputChannel;
84          this.internalDataChannel = new DataStreamChannel() {
85  
86              @Override
87              public void requestOutput() {
88                  outputChannel.requestOutput();
89              }
90  
91              @Override
92              public void endStream(final List<? extends Header> trailers) throws IOException {
93                  outputChannel.complete(trailers);
94                  if (!keepAlive) {
95                      outputChannel.close();
96                  }
97                  responseState = MessageState.COMPLETE;
98              }
99  
100             @Override
101             public int write(final ByteBuffer src) throws IOException {
102                 return outputChannel.write(src);
103             }
104 
105             @Override
106             public void endStream() throws IOException {
107                 endStream(null);
108             }
109 
110         };
111 
112         this.httpProcessor = httpProcessor;
113         this.connectionReuseStrategy = connectionReuseStrategy;
114         this.exchangeHandlerFactory = exchangeHandlerFactory;
115         this.context = context;
116         this.responseCommitted = new AtomicBoolean(false);
117         this.done = new AtomicBoolean(false);
118         this.keepAlive = true;
119         this.requestState = MessageState.HEADERS;
120         this.responseState = MessageState.IDLE;
121     }
122 
123     private void commitResponse(
124             final HttpResponse response,
125             final EntityDetails responseEntityDetails) throws HttpException, IOException {
126         if (responseCommitted.compareAndSet(false, true)) {
127 
128             final ProtocolVersion transportVersion = response.getVersion();
129             if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
130                 throw new UnsupportedHttpVersionException(transportVersion);
131             }
132 
133             final int status = response.getCode();
134             if (status < HttpStatus.SC_SUCCESS) {
135                 throw new HttpException("Invalid response: " + status);
136             }
137 
138             Asserts.notNull(receivedRequest, "Received request");
139             final String method = receivedRequest.getMethod();
140             context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
141             httpProcessor.process(response, responseEntityDetails, context);
142 
143             final boolean endStream = responseEntityDetails == null || method.equalsIgnoreCase("HEAD");
144 
145             if (!connectionReuseStrategy.keepAlive(receivedRequest, response, context)) {
146                 keepAlive = false;
147             }
148 
149             outputChannel.submit(response, endStream, endStream ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
150             if (endStream) {
151                 if (!keepAlive) {
152                     outputChannel.close();
153                 }
154                 responseState = MessageState.COMPLETE;
155             } else {
156                 responseState = MessageState.BODY;
157                 exchangeHandler.produce(internalDataChannel);
158             }
159         } else {
160             throw new HttpException("Response already committed");
161         }
162     }
163 
164     private void commitInformation(final HttpResponse response) throws IOException, HttpException {
165         if (responseCommitted.get()) {
166             throw new HttpException("Response already committed");
167         }
168         final int status = response.getCode();
169         if (status < HttpStatus.SC_INFORMATIONAL || status >= HttpStatus.SC_SUCCESS) {
170             throw new HttpException("Invalid intermediate response: " + status);
171         }
172         outputChannel.submit(response, true, FlushMode.IMMEDIATE);
173     }
174 
175     private void commitPromise() throws HttpException {
176         throw new HttpException("HTTP/1.1 does not support server push");
177     }
178 
179     void activateChannel() throws IOException, HttpException {
180         outputChannel.activate();
181     }
182 
183     boolean isResponseFinal() {
184         return responseState == MessageState.COMPLETE;
185     }
186 
187     boolean isCompleted() {
188         return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
189     }
190 
191     void consumeHeader(final HttpRequest request, final EntityDetails requestEntityDetails) throws HttpException, IOException {
192         if (done.get() || requestState != MessageState.HEADERS) {
193             throw new ProtocolException("Unexpected message head");
194         }
195         receivedRequest = request;
196         requestState = requestEntityDetails == null ? MessageState.COMPLETE : MessageState.BODY;
197 
198         AsyncServerExchangeHandler handler;
199         try {
200             handler = exchangeHandlerFactory.create(request, context);
201         } catch (final MisdirectedRequestException ex) {
202             handler =  new ImmediateResponseExchangeHandler(HttpStatus.SC_MISDIRECTED_REQUEST, ex.getMessage());
203         } catch (final HttpException ex) {
204             handler =  new ImmediateResponseExchangeHandler(HttpStatus.SC_INTERNAL_SERVER_ERROR, ex.getMessage());
205         }
206         if (handler == null) {
207             handler = new ImmediateResponseExchangeHandler(HttpStatus.SC_NOT_FOUND, "Cannot handle request");
208         }
209 
210         exchangeHandler = handler;
211 
212         final ProtocolVersion transportVersion = request.getVersion();
213         if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
214             throw new UnsupportedHttpVersionException(transportVersion);
215         }
216         context.setProtocolVersion(transportVersion);
217         context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
218 
219         final ResponseChannel responseChannel = new ResponseChannel() {
220 
221             @Override
222             public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
223                 commitInformation(response);
224             }
225 
226             @Override
227             public void sendResponse(
228                     final HttpResponse response, final EntityDetails responseEntityDetails, final HttpContext httpContext) throws HttpException, IOException {
229                 ServerSupport.validateResponse(response, responseEntityDetails);
230                 commitResponse(response, responseEntityDetails);
231             }
232 
233             @Override
234             public void pushPromise(
235                     final HttpRequest promise, final HttpContext httpContext, final AsyncPushProducer pushProducer) throws HttpException, IOException {
236                 commitPromise();
237             }
238 
239             @Override
240             public String toString() {
241                 return super.toString() + " " + ServerHttp1StreamHandler.this.toString();
242             }
243 
244         };
245         try {
246             httpProcessor.process(request, requestEntityDetails, context);
247             exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
248         } catch (final HttpException ex) {
249             if (!responseCommitted.get()) {
250                 final AsyncResponseProducer responseProducer = ServerSupport.handleException(ex);
251                 exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
252                 exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
253             } else {
254                 throw ex;
255             }
256         }
257 
258     }
259 
260     boolean isOutputReady() {
261         switch (responseState) {
262             case BODY:
263                 return exchangeHandler.available() > 0;
264             default:
265                 return false;
266         }
267     }
268 
269     void produceOutput() throws HttpException, IOException {
270         switch (responseState) {
271             case BODY:
272                 exchangeHandler.produce(internalDataChannel);
273                 break;
274         }
275     }
276 
277     int consumeData(final ByteBuffer src) throws HttpException, IOException {
278         if (done.get() || requestState != MessageState.BODY) {
279             throw new ProtocolException("Unexpected message data");
280         }
281         if (responseState == MessageState.ACK) {
282             outputChannel.requestOutput();
283         }
284         return exchangeHandler.consume(src);
285     }
286 
287     void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
288         exchangeHandler.updateCapacity(capacityChannel);
289     }
290 
291     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
292         if (done.get() || requestState != MessageState.BODY) {
293             throw new ProtocolException("Unexpected message data");
294         }
295         requestState = MessageState.COMPLETE;
296         exchangeHandler.streamEnd(trailers);
297     }
298 
299     void failed(final Exception cause) {
300         if (!done.get()) {
301             exchangeHandler.failed(cause);
302         }
303     }
304 
305     @Override
306     public void releaseResources() {
307         if (done.compareAndSet(false, true)) {
308             requestState = MessageState.COMPLETE;
309             responseState = MessageState.COMPLETE;
310             exchangeHandler.releaseResources();
311         }
312     }
313 
314     @Override
315     public String toString() {
316         return "[" +
317                 "requestState=" + requestState +
318                 ", responseState=" + responseState +
319                 ", outputChannel=" + outputChannel +
320                 ']';
321     }
322 
323 }
324