View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.net.InetAddress;
32  import java.net.InetSocketAddress;
33  import java.net.Socket;
34  import java.net.SocketAddress;
35  import java.nio.channels.ReadableByteChannel;
36  import java.nio.channels.WritableByteChannel;
37  
38  import org.apache.http.ConnectionClosedException;
39  import org.apache.http.Header;
40  import org.apache.http.HttpConnectionMetrics;
41  import org.apache.http.HttpEntity;
42  import org.apache.http.HttpException;
43  import org.apache.http.HttpInetConnection;
44  import org.apache.http.HttpMessage;
45  import org.apache.http.HttpRequest;
46  import org.apache.http.HttpResponse;
47  import org.apache.http.annotation.NotThreadSafe;
48  import org.apache.http.entity.BasicHttpEntity;
49  import org.apache.http.entity.ContentLengthStrategy;
50  import org.apache.http.impl.HttpConnectionMetricsImpl;
51  import org.apache.http.impl.entity.LaxContentLengthStrategy;
52  import org.apache.http.impl.entity.StrictContentLengthStrategy;
53  import org.apache.http.impl.io.HttpTransportMetricsImpl;
54  import org.apache.http.nio.ContentDecoder;
55  import org.apache.http.nio.ContentEncoder;
56  import org.apache.http.nio.NHttpConnection;
57  import org.apache.http.impl.nio.codecs.ChunkDecoder;
58  import org.apache.http.impl.nio.codecs.ChunkEncoder;
59  import org.apache.http.impl.nio.codecs.IdentityDecoder;
60  import org.apache.http.impl.nio.codecs.IdentityEncoder;
61  import org.apache.http.impl.nio.codecs.LengthDelimitedDecoder;
62  import org.apache.http.impl.nio.codecs.LengthDelimitedEncoder;
63  import org.apache.http.impl.nio.reactor.SessionInputBufferImpl;
64  import org.apache.http.impl.nio.reactor.SessionOutputBufferImpl;
65  import org.apache.http.io.HttpTransportMetrics;
66  import org.apache.http.nio.reactor.EventMask;
67  import org.apache.http.nio.reactor.IOSession;
68  import org.apache.http.nio.reactor.SessionBufferStatus;
69  import org.apache.http.nio.reactor.SessionInputBuffer;
70  import org.apache.http.nio.reactor.SessionOutputBuffer;
71  import org.apache.http.nio.reactor.SocketAccessor;
72  import org.apache.http.nio.util.ByteBufferAllocator;
73  import org.apache.http.params.HttpConnectionParams;
74  import org.apache.http.params.HttpParams;
75  import org.apache.http.protocol.HTTP;
76  import org.apache.http.protocol.HttpContext;
77  
78  /**
79   * This class serves as a base for all {@link NHttpConnection} implementations
80   * and implements functionality common to both client and server
81   * HTTP connections.
82   * <p>
83   * The following parameters can be used to customize the behavior of this
84   * class:
85   * <ul>
86   *  <li>{@link org.apache.http.params.CoreProtocolPNames#HTTP_ELEMENT_CHARSET}</li>
87   *  <li>{@link org.apache.http.params.CoreConnectionPNames#SOCKET_BUFFER_SIZE}</li>
88   * </ul>
89   *
90   * @since 4.0
91   */
92  @NotThreadSafe
93  public class NHttpConnectionBase
94          implements NHttpConnection, HttpInetConnection, SessionBufferStatus, SocketAccessor {
95  
96      protected final ContentLengthStrategy incomingContentStrategy;
97      protected final ContentLengthStrategy outgoingContentStrategy;
98  
99      protected final SessionInputBufferImpl inbuf;
100     protected final SessionOutputBufferImpl outbuf;
101 
102     protected final HttpTransportMetricsImpl inTransportMetrics;
103     protected final HttpTransportMetricsImpl outTransportMetrics;
104     protected final HttpConnectionMetricsImpl connMetrics;
105 
106     protected HttpContext context;
107     protected IOSession session;
108     protected SocketAddress remote;
109     protected volatile ContentDecoder contentDecoder;
110     protected volatile boolean hasBufferedInput;
111     protected volatile ContentEncoder contentEncoder;
112     protected volatile boolean hasBufferedOutput;
113     protected volatile HttpRequest request;
114     protected volatile HttpResponse response;
115 
116     protected volatile int status;
117 
118     /**
119      * Creates a new instance of this class given the underlying I/O session.
120      *
121      * @param session the underlying I/O session.
122      * @param allocator byte buffer allocator.
123      * @param params HTTP parameters.
124      */
125     public NHttpConnectionBase(
126             final IOSession session,
127             final ByteBufferAllocator allocator,
128             final HttpParams params) {
129         super();
130         if (session == null) {
131             throw new IllegalArgumentException("I/O session may not be null");
132         }
133         if (params == null) {
134             throw new IllegalArgumentException("HTTP params may not be null");
135         }
136 
137         int buffersize = HttpConnectionParams.getSocketBufferSize(params);
138         if (buffersize <= 0) {
139             buffersize = 4096;
140         }
141         int linebuffersize = buffersize;
142         if (linebuffersize > 512) {
143             linebuffersize = 512;
144         }
145 
146         this.inbuf = new SessionInputBufferImpl(buffersize, linebuffersize, allocator, params);
147         this.outbuf = new SessionOutputBufferImpl(buffersize, linebuffersize, allocator, params);
148 
149         this.incomingContentStrategy = createIncomingContentStrategy();
150         this.outgoingContentStrategy = createOutgoingContentStrategy();
151 
152         this.inTransportMetrics = createTransportMetrics();
153         this.outTransportMetrics = createTransportMetrics();
154         this.connMetrics = createConnectionMetrics(
155                 this.inTransportMetrics,
156                 this.outTransportMetrics);
157 
158         setSession(session);
159         this.status = ACTIVE;
160     }
161 
162     private void setSession(final IOSession session) {
163         this.session = session;
164         this.context = new SessionHttpContext(this.session);
165         this.session.setBufferStatus(this);
166         this.remote = this.session.getRemoteAddress();
167     }
168 
169     /**
170      * Binds the connection to a different {@link IOSession}. This may be necessary
171      * when the underlying I/O session gets upgraded with SSL/TLS encryption.
172      *
173      * @since 4.2
174      */
175     protected void bind(final IOSession session) {
176         if (session == null) {
177             throw new IllegalArgumentException("I/O session may not be null");
178         }
179         this.session.setBufferStatus(null);
180         setSession(session);
181     }
182 
183     /**
184      * @since 4.2
185      */
186     protected ContentLengthStrategy createIncomingContentStrategy() {
187         return new LaxContentLengthStrategy();
188     }
189 
190     /**
191      * @since 4.2
192      */
193     protected ContentLengthStrategy createOutgoingContentStrategy() {
194         return new StrictContentLengthStrategy();
195     }
196 
197     /**
198      * @since 4.1
199      */
200     protected HttpTransportMetricsImpl createTransportMetrics() {
201         return new HttpTransportMetricsImpl();
202     }
203 
204     /**
205      * @since 4.1
206      */
207     protected HttpConnectionMetricsImpl createConnectionMetrics(
208             final HttpTransportMetrics inTransportMetric,
209             final HttpTransportMetrics outTransportMetric) {
210         return new HttpConnectionMetricsImpl(inTransportMetric, outTransportMetric);
211     }
212 
213     public int getStatus() {
214         return this.status;
215     }
216 
217     public HttpContext getContext() {
218         return this.context;
219     }
220 
221     public HttpRequest getHttpRequest() {
222         return this.request;
223     }
224 
225     public HttpResponse getHttpResponse() {
226         return this.response;
227     }
228 
229     public void requestInput() {
230         this.session.setEvent(EventMask.READ);
231     }
232 
233     public void requestOutput() {
234         this.session.setEvent(EventMask.WRITE);
235     }
236 
237     public void suspendInput() {
238         this.session.clearEvent(EventMask.READ);
239     }
240 
241     public void suspendOutput() {
242         this.session.clearEvent(EventMask.WRITE);
243     }
244 
245     /**
246      * Initializes a specific {@link ContentDecoder} implementation based on the
247      * properties of the given {@link HttpMessage} and generates an instance of
248      * {@link HttpEntity} matching the properties of the content decoder.
249      *
250      * @param message the HTTP message.
251      * @return HTTP entity.
252      * @throws HttpException in case of an HTTP protocol violation.
253      */
254     protected HttpEntity prepareDecoder(final HttpMessage message) throws HttpException {
255         BasicHttpEntity entity = new BasicHttpEntity();
256         long len = this.incomingContentStrategy.determineLength(message);
257         this.contentDecoder = createContentDecoder(
258                 len,
259                 this.session.channel(),
260                 this.inbuf,
261                 this.inTransportMetrics);
262         if (len == ContentLengthStrategy.CHUNKED) {
263             entity.setChunked(true);
264             entity.setContentLength(-1);
265         } else if (len == ContentLengthStrategy.IDENTITY) {
266             entity.setChunked(false);
267             entity.setContentLength(-1);
268         } else {
269             entity.setChunked(false);
270             entity.setContentLength(len);
271         }
272 
273         Header contentTypeHeader = message.getFirstHeader(HTTP.CONTENT_TYPE);
274         if (contentTypeHeader != null) {
275             entity.setContentType(contentTypeHeader);
276         }
277         Header contentEncodingHeader = message.getFirstHeader(HTTP.CONTENT_ENCODING);
278         if (contentEncodingHeader != null) {
279             entity.setContentEncoding(contentEncodingHeader);
280         }
281         return entity;
282     }
283 
284     /**
285      * Factory method for {@link ContentDecoder} instances.
286      *
287      * @param len content length, if known, {@link ContentLengthStrategy#CHUNKED} or
288      *   {@link ContentLengthStrategy#IDENTITY}, if unknown.
289      * @param channel the session channel.
290      * @param buffer the session buffer.
291      * @param metrics transport metrics.
292      *
293      * @return content decoder.
294      *
295      * @since 4.1
296      */
297     protected ContentDecoder createContentDecoder(
298             final long len,
299             final ReadableByteChannel channel,
300             final SessionInputBuffer buffer,
301             final HttpTransportMetricsImpl metrics) {
302         if (len == ContentLengthStrategy.CHUNKED) {
303             return new ChunkDecoder(channel, buffer, metrics);
304         } else if (len == ContentLengthStrategy.IDENTITY) {
305             return new IdentityDecoder(channel, buffer, metrics);
306         } else {
307             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
308         }
309     }
310 
311     /**
312      * Initializes a specific {@link ContentEncoder} implementation based on the
313      * properties of the given {@link HttpMessage}.
314      *
315      * @param message the HTTP message.
316      * @throws HttpException in case of an HTTP protocol violation.
317      */
318     protected void prepareEncoder(final HttpMessage message) throws HttpException {
319         long len = this.outgoingContentStrategy.determineLength(message);
320         this.contentEncoder = createContentEncoder(
321                 len,
322                 this.session.channel(),
323                 this.outbuf,
324                 this.outTransportMetrics);
325     }
326 
327     /**
328      * Factory method for {@link ContentEncoder} instances.
329      *
330      * @param len content length, if known, {@link ContentLengthStrategy#CHUNKED} or
331      *   {@link ContentLengthStrategy#IDENTITY}, if unknown.
332      * @param channel the session channel.
333      * @param buffer the session buffer.
334      * @param metrics transport metrics.
335      *
336      * @return content encoder.
337      *
338      * @since 4.1
339      */
340     protected ContentEncoder createContentEncoder(
341             final long len,
342             final WritableByteChannel channel,
343             final SessionOutputBuffer buffer,
344             final HttpTransportMetricsImpl metrics) {
345         if (len == ContentLengthStrategy.CHUNKED) {
346             return new ChunkEncoder(channel, buffer, metrics);
347         } else if (len == ContentLengthStrategy.IDENTITY) {
348             return new IdentityEncoder(channel, buffer, metrics);
349         } else {
350             return new LengthDelimitedEncoder(channel, buffer, metrics, len);
351         }
352     }
353 
354     public boolean hasBufferedInput() {
355         return this.hasBufferedInput;
356     }
357 
358     public boolean hasBufferedOutput() {
359         return this.hasBufferedOutput;
360     }
361 
362     /**
363      * Assets if the connection is still open.
364      *
365      * @throws ConnectionClosedException in case the connection has already
366      *   been closed.
367      */
368     protected void assertNotClosed() throws ConnectionClosedException {
369         if (this.status != ACTIVE) {
370             throw new ConnectionClosedException("Connection is closed");
371         }
372     }
373 
374     public void close() throws IOException {
375         if (this.status != ACTIVE) {
376             return;
377         }
378         this.status = CLOSING;
379         if (this.outbuf.hasData()) {
380             this.session.setEvent(EventMask.WRITE);
381         } else {
382             this.session.close();
383             this.status = CLOSED;
384         }
385     }
386 
387     public boolean isOpen() {
388         return this.status == ACTIVE && !this.session.isClosed();
389     }
390 
391     public boolean isStale() {
392         return this.session.isClosed();
393     }
394 
395     public InetAddress getLocalAddress() {
396         SocketAddress address = this.session.getLocalAddress();
397         if (address instanceof InetSocketAddress) {
398             return ((InetSocketAddress) address).getAddress();
399         } else {
400             return null;
401         }
402     }
403 
404     public int getLocalPort() {
405         SocketAddress address = this.session.getLocalAddress();
406         if (address instanceof InetSocketAddress) {
407             return ((InetSocketAddress) address).getPort();
408         } else {
409             return -1;
410         }
411     }
412 
413     public InetAddress getRemoteAddress() {
414         SocketAddress address = this.session.getRemoteAddress();
415         if (address instanceof InetSocketAddress) {
416             return ((InetSocketAddress) address).getAddress();
417         } else {
418             return null;
419         }
420     }
421 
422     public int getRemotePort() {
423         SocketAddress address = this.session.getRemoteAddress();
424         if (address instanceof InetSocketAddress) {
425             return ((InetSocketAddress) address).getPort();
426         } else {
427             return -1;
428         }
429     }
430 
431     public void setSocketTimeout(int timeout) {
432         this.session.setSocketTimeout(timeout);
433     }
434 
435     public int getSocketTimeout() {
436         return this.session.getSocketTimeout();
437     }
438 
439     public void shutdown() throws IOException {
440         this.status = CLOSED;
441         this.session.shutdown();
442     }
443 
444     public HttpConnectionMetrics getMetrics() {
445         return this.connMetrics;
446     }
447 
448     private static void formatAddress(final StringBuilder buffer, final SocketAddress socketAddress) {
449         if (socketAddress instanceof InetSocketAddress) {
450             InetSocketAddress addr = ((InetSocketAddress) socketAddress);
451             buffer.append(addr.getAddress() != null ? addr.getAddress().getHostAddress() :
452                 addr.getAddress())
453             .append(':')
454             .append(addr.getPort());
455         } else {
456             buffer.append(socketAddress);
457         }
458     }
459 
460     @Override
461     public String toString() {
462         StringBuilder buffer = new StringBuilder();
463         SocketAddress remoteAddress = this.session.getRemoteAddress();
464         SocketAddress localAddress = this.session.getLocalAddress();
465         if (remoteAddress != null && localAddress != null) {
466             formatAddress(buffer, localAddress);
467             buffer.append("<->");
468             formatAddress(buffer, remoteAddress);
469         }
470         buffer.append("[");
471         switch (this.status) {
472         case ACTIVE:
473             buffer.append("ACTIVE");
474             break;
475         case CLOSING:
476             buffer.append("CLOSING");
477             break;
478         case CLOSED:
479             buffer.append("CLOSED");
480             break;
481         }
482         buffer.append("]");
483         return buffer.toString();
484     }
485     
486     public Socket getSocket() {
487         if (this.session instanceof SocketAccessor) {
488             return ((SocketAccessor) this.session).getSocket();
489         } else {
490             return null;
491         }
492     }
493 
494 }