View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.nio.support;
28  
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.io.OutputStream;
32  import java.nio.ByteBuffer;
33  import java.util.List;
34  import java.util.Locale;
35  import java.util.Set;
36  import java.util.concurrent.Executor;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import org.apache.hc.core5.http.EntityDetails;
41  import org.apache.hc.core5.http.Header;
42  import org.apache.hc.core5.http.HttpException;
43  import org.apache.hc.core5.http.HttpHeaders;
44  import org.apache.hc.core5.http.HttpRequest;
45  import org.apache.hc.core5.http.HttpResponse;
46  import org.apache.hc.core5.http.HttpStatus;
47  import org.apache.hc.core5.http.ProtocolVersion;
48  import org.apache.hc.core5.http.message.BasicHttpResponse;
49  import org.apache.hc.core5.http.message.HttpResponseWrapper;
50  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
51  import org.apache.hc.core5.http.nio.CapacityChannel;
52  import org.apache.hc.core5.http.nio.DataStreamChannel;
53  import org.apache.hc.core5.http.nio.ResponseChannel;
54  import org.apache.hc.core5.http.nio.entity.ContentInputStream;
55  import org.apache.hc.core5.http.nio.entity.ContentOutputStream;
56  import org.apache.hc.core5.http.nio.entity.SharedInputBuffer;
57  import org.apache.hc.core5.http.nio.entity.SharedOutputBuffer;
58  import org.apache.hc.core5.http.protocol.HttpContext;
59  import org.apache.hc.core5.util.Args;
60  import org.apache.hc.core5.util.Asserts;
61  
62  /**
63   * @since 5.0
64   */
65  public abstract class AbstractClassicServerExchangeHandler implements AsyncServerExchangeHandler {
66  
67      private enum State { IDLE, ACTIVE, COMPLETED }
68  
69      private final int initialBufferSize;
70      private final Executor executor;
71      private final AtomicReference<State> state;
72      private final AtomicReference<Exception> exception;
73  
74      private volatile SharedInputBuffer inputBuffer;
75      private volatile SharedOutputBuffer outputBuffer;
76  
77      public AbstractClassicServerExchangeHandler(final int initialBufferSize, final Executor executor) {
78          this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
79          this.executor = Args.notNull(executor, "Executor");
80          this.exception = new AtomicReference<>(null);
81          this.state = new AtomicReference<>(State.IDLE);
82      }
83  
84      public Exception getException() {
85          return exception.get();
86      }
87  
88      protected abstract void handle(
89              HttpRequest request, InputStream requestStream,
90              HttpResponse response, OutputStream responseStream,
91              HttpContext context) throws IOException, HttpException;
92  
93      @Override
94      public final void handleRequest(
95              final HttpRequest request,
96              final EntityDetails entityDetails,
97              final ResponseChannel responseChannel,
98              final HttpContext context) throws HttpException, IOException {
99          final AtomicBoolean responseCommitted = new AtomicBoolean(false);
100 
101         final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
102         final HttpResponse responseWrapper = new HttpResponseWrapper(response){
103 
104             private void ensureNotCommitted() {
105                 Asserts.check(!responseCommitted.get(), "Response already committed");
106             }
107 
108             @Override
109             public void addHeader(final String name, final Object value) {
110                 ensureNotCommitted();
111                 super.addHeader(name, value);
112             }
113 
114             @Override
115             public void setHeader(final String name, final Object value) {
116                 ensureNotCommitted();
117                 super.setHeader(name, value);
118             }
119 
120             @Override
121             public void setVersion(final ProtocolVersion version) {
122                 ensureNotCommitted();
123                 super.setVersion(version);
124             }
125 
126             @Override
127             public void setCode(final int code) {
128                 ensureNotCommitted();
129                 super.setCode(code);
130             }
131 
132             @Override
133             public void setReasonPhrase(final String reason) {
134                 ensureNotCommitted();
135                 super.setReasonPhrase(reason);
136             }
137 
138             @Override
139             public void setLocale(final Locale locale) {
140                 ensureNotCommitted();
141                 super.setLocale(locale);
142             }
143 
144         };
145 
146         final InputStream inputStream;
147         if (entityDetails != null) {
148             inputBuffer = new SharedInputBuffer(initialBufferSize);
149             inputStream = new ContentInputStream(inputBuffer);
150         } else {
151             inputStream = null;
152         }
153         outputBuffer = new SharedOutputBuffer(initialBufferSize);
154 
155         final OutputStream outputStream = new ContentOutputStream(outputBuffer) {
156 
157             private void triggerResponse() throws IOException {
158                 try {
159                     if (responseCommitted.compareAndSet(false, true)) {
160                         responseChannel.sendResponse(response, new EntityDetails() {
161 
162                             @Override
163                             public long getContentLength() {
164                                 return -1;
165                             }
166 
167                             @Override
168                             public String getContentType() {
169                                 final Header h = response.getFirstHeader(HttpHeaders.CONTENT_TYPE);
170                                 return h != null ? h.getValue() : null;
171                             }
172 
173                             @Override
174                             public String getContentEncoding() {
175                                 final Header h = response.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
176                                 return h != null ? h.getValue() : null;
177                             }
178 
179                             @Override
180                             public boolean isChunked() {
181                                 return false;
182                             }
183 
184                             @Override
185                             public Set<String> getTrailerNames() {
186                                 return null;
187                             }
188 
189                         });
190                     }
191                 } catch (final HttpException ex) {
192                     throw new IOException(ex.getMessage(), ex);
193                 }
194             }
195 
196             @Override
197             public void close() throws IOException {
198                 triggerResponse();
199                 super.close();
200             }
201 
202             @Override
203             public void write(final byte[] b, final int off, final int len) throws IOException {
204                 triggerResponse();
205                 super.write(b, off, len);
206             }
207 
208             @Override
209             public void write(final byte[] b) throws IOException {
210                 triggerResponse();
211                 super.write(b);
212             }
213 
214             @Override
215             public void write(final int b) throws IOException {
216                 triggerResponse();
217                 super.write(b);
218             }
219 
220         };
221 
222         if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
223             executor.execute(new Runnable() {
224 
225                 @Override
226                 public void run() {
227                     try {
228                         handle(request, inputStream, responseWrapper, outputStream, context);
229                         if (inputStream != null) {
230                             inputStream.close();
231                         }
232                         outputStream.close();
233                     } catch (final Exception ex) {
234                         exception.compareAndSet(null, ex);
235                         if (inputBuffer != null) {
236                             inputBuffer.abort();
237                         }
238                         outputBuffer.abort();
239                     } finally {
240                         state.set(State.COMPLETED);
241                     }
242                 }
243 
244             });
245         }
246     }
247 
248     @Override
249     public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
250         if (inputBuffer != null) {
251             inputBuffer.updateCapacity(capacityChannel);
252         }
253     }
254 
255     @Override
256     public final int consume(final ByteBuffer src) throws IOException {
257         Asserts.notNull(inputBuffer, "Input buffer");
258         return inputBuffer.fill(src);
259     }
260 
261     @Override
262     public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
263         Asserts.notNull(inputBuffer, "Input buffer");
264         inputBuffer.markEndStream();
265     }
266 
267     @Override
268     public final int available() {
269         Asserts.notNull(outputBuffer, "Output buffer");
270         return outputBuffer.length();
271     }
272 
273     @Override
274     public final void produce(final DataStreamChannel channel) throws IOException {
275         Asserts.notNull(outputBuffer, "Output buffer");
276         outputBuffer.flush(channel);
277     }
278 
279     @Override
280     public final void failed(final Exception cause) {
281         exception.compareAndSet(null, cause);
282         releaseResources();
283     }
284 
285     @Override
286     public void releaseResources() {
287     }
288 
289 }