View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.net.InetAddress;
32  import java.net.InetSocketAddress;
33  import java.net.Socket;
34  import java.net.SocketAddress;
35  import java.nio.channels.ReadableByteChannel;
36  import java.nio.channels.WritableByteChannel;
37  import java.nio.charset.Charset;
38  import java.nio.charset.CharsetDecoder;
39  import java.nio.charset.CharsetEncoder;
40  import java.nio.charset.CodingErrorAction;
41  
42  import org.apache.http.ConnectionClosedException;
43  import org.apache.http.Consts;
44  import org.apache.http.Header;
45  import org.apache.http.HttpConnectionMetrics;
46  import org.apache.http.HttpEntity;
47  import org.apache.http.HttpException;
48  import org.apache.http.HttpInetConnection;
49  import org.apache.http.HttpMessage;
50  import org.apache.http.HttpRequest;
51  import org.apache.http.HttpResponse;
52  import org.apache.http.annotation.NotThreadSafe;
53  import org.apache.http.entity.BasicHttpEntity;
54  import org.apache.http.entity.ContentLengthStrategy;
55  import org.apache.http.impl.HttpConnectionMetricsImpl;
56  import org.apache.http.impl.entity.LaxContentLengthStrategy;
57  import org.apache.http.impl.entity.StrictContentLengthStrategy;
58  import org.apache.http.impl.io.HttpTransportMetricsImpl;
59  import org.apache.http.impl.nio.codecs.ChunkDecoder;
60  import org.apache.http.impl.nio.codecs.ChunkEncoder;
61  import org.apache.http.impl.nio.codecs.IdentityDecoder;
62  import org.apache.http.impl.nio.codecs.IdentityEncoder;
63  import org.apache.http.impl.nio.codecs.LengthDelimitedDecoder;
64  import org.apache.http.impl.nio.codecs.LengthDelimitedEncoder;
65  import org.apache.http.impl.nio.reactor.SessionInputBufferImpl;
66  import org.apache.http.impl.nio.reactor.SessionOutputBufferImpl;
67  import org.apache.http.io.HttpTransportMetrics;
68  import org.apache.http.nio.ContentDecoder;
69  import org.apache.http.nio.ContentEncoder;
70  import org.apache.http.nio.NHttpConnection;
71  import org.apache.http.nio.reactor.EventMask;
72  import org.apache.http.nio.reactor.IOSession;
73  import org.apache.http.nio.reactor.SessionBufferStatus;
74  import org.apache.http.nio.reactor.SessionInputBuffer;
75  import org.apache.http.nio.reactor.SessionOutputBuffer;
76  import org.apache.http.nio.reactor.SocketAccessor;
77  import org.apache.http.nio.util.ByteBufferAllocator;
78  import org.apache.http.params.CoreConnectionPNames;
79  import org.apache.http.params.CoreProtocolPNames;
80  import org.apache.http.params.HttpParams;
81  import org.apache.http.protocol.HTTP;
82  import org.apache.http.protocol.HttpContext;
83  import org.apache.http.util.Args;
84  import org.apache.http.util.CharsetUtils;
85  import org.apache.http.util.NetUtils;
86  
87  /**
88   * This class serves as a base for all {@link NHttpConnection} implementations and provides
89   * functionality common to both client and server HTTP connections.
90   *
91   * @since 4.0
92   */
93  @SuppressWarnings("deprecation")
94  @NotThreadSafe
95  public class NHttpConnectionBase
96          implements NHttpConnection, HttpInetConnection, SessionBufferStatus, SocketAccessor {
97  
98      protected final ContentLengthStrategy incomingContentStrategy;
99      protected final ContentLengthStrategy outgoingContentStrategy;
100 
101     protected final SessionInputBufferImpl inbuf;
102     protected final SessionOutputBufferImpl outbuf;
103     private final int fragmentSizeHint;
104 
105     protected final HttpTransportMetricsImpl inTransportMetrics;
106     protected final HttpTransportMetricsImpl outTransportMetrics;
107     protected final HttpConnectionMetricsImpl connMetrics;
108 
109     protected HttpContext context;
110     protected IOSession session;
111     protected SocketAddress remote;
112     protected volatile ContentDecoder contentDecoder;
113     protected volatile boolean hasBufferedInput;
114     protected volatile ContentEncoder contentEncoder;
115     protected volatile boolean hasBufferedOutput;
116     protected volatile HttpRequest request;
117     protected volatile HttpResponse response;
118 
119     protected volatile int status;
120 
121     /**
122      * Creates a new instance of this class given the underlying I/O session.
123      *
124      * @param session the underlying I/O session.
125      * @param allocator byte buffer allocator.
126      * @param params HTTP parameters.
127      *
128      * @deprecated (4.3) use
129      *   {@link NHttpConnectionBase#NHttpConnectionBase(IOSession, int, int, ByteBufferAllocator,
130      *   CharsetDecoder, CharsetEncoder, ContentLengthStrategy, ContentLengthStrategy)}
131      */
132     @Deprecated
133     public NHttpConnectionBase(
134             final IOSession session,
135             final ByteBufferAllocator allocator,
136             final HttpParams params) {
137         super();
138         Args.notNull(session, "I/O session");
139         Args.notNull(params, "HTTP params");
140 
141         int buffersize = params.getIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, -1);
142         if (buffersize <= 0) {
143             buffersize = 4096;
144         }
145         int linebuffersize = buffersize;
146         if (linebuffersize > 512) {
147             linebuffersize = 512;
148         }
149 
150         CharsetDecoder decoder = null;
151         CharsetEncoder encoder = null;
152         Charset charset = CharsetUtils.lookup(
153                 (String) params.getParameter(CoreProtocolPNames.HTTP_ELEMENT_CHARSET));
154         if (charset != null) {
155             charset = Consts.ASCII;
156             decoder = charset.newDecoder();
157             encoder = charset.newEncoder();
158             final CodingErrorAction malformedCharAction = (CodingErrorAction) params.getParameter(
159                     CoreProtocolPNames.HTTP_MALFORMED_INPUT_ACTION);
160             final CodingErrorAction unmappableCharAction = (CodingErrorAction) params.getParameter(
161                     CoreProtocolPNames.HTTP_UNMAPPABLE_INPUT_ACTION);
162             decoder.onMalformedInput(malformedCharAction).onUnmappableCharacter(unmappableCharAction);
163             encoder.onMalformedInput(malformedCharAction).onUnmappableCharacter(unmappableCharAction);
164         }
165         this.inbuf = new SessionInputBufferImpl(buffersize, linebuffersize, decoder, allocator);
166         this.outbuf = new SessionOutputBufferImpl(buffersize, linebuffersize, encoder, allocator);
167         this.fragmentSizeHint = buffersize;
168 
169         this.incomingContentStrategy = createIncomingContentStrategy();
170         this.outgoingContentStrategy = createOutgoingContentStrategy();
171 
172         this.inTransportMetrics = createTransportMetrics();
173         this.outTransportMetrics = createTransportMetrics();
174         this.connMetrics = createConnectionMetrics(
175                 this.inTransportMetrics,
176                 this.outTransportMetrics);
177 
178         setSession(session);
179         this.status = ACTIVE;
180     }
181 
182     /**
183      * Creates new instance NHttpConnectionBase given the underlying I/O session.
184      *
185      * @param session the underlying I/O session.
186      * @param buffersize buffer size. Must be a positive number.
187      * @param fragmentSizeHint fragment size hint.
188      * @param allocator memory allocator.
189      *   If <code>null</code> {@link org.apache.http.nio.util.HeapByteBufferAllocator#INSTANCE}
190      *   will be used.
191      * @param chardecoder decoder to be used for decoding HTTP protocol elements.
192      *   If <code>null</code> simple type cast will be used for byte to char conversion.
193      * @param charencoder encoder to be used for encoding HTTP protocol elements.
194      *   If <code>null</code> simple type cast will be used for char to byte conversion.
195      * @param incomingContentStrategy incoming content length strategy. If <code>null</code>
196      *   {@link LaxContentLengthStrategy#INSTANCE} will be used.
197      * @param outgoingContentStrategy outgoing content length strategy. If <code>null</code>
198      *   {@link StrictContentLengthStrategy#INSTANCE} will be used.
199      *
200      * @since 4.3
201      */
202     protected NHttpConnectionBase(
203             final IOSession session,
204             final int buffersize,
205             final int fragmentSizeHint,
206             final ByteBufferAllocator allocator,
207             final CharsetDecoder chardecoder,
208             final CharsetEncoder charencoder,
209             final ContentLengthStrategy incomingContentStrategy,
210             final ContentLengthStrategy outgoingContentStrategy) {
211         Args.notNull(session, "I/O session");
212         Args.positive(buffersize, "Buffer size");
213         int linebuffersize = buffersize;
214         if (linebuffersize > 512) {
215             linebuffersize = 512;
216         }
217         this.inbuf = new SessionInputBufferImpl(buffersize, linebuffersize, chardecoder, allocator);
218         this.outbuf = new SessionOutputBufferImpl(buffersize, linebuffersize, charencoder, allocator);
219         this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : buffersize;
220 
221         this.inTransportMetrics = new HttpTransportMetricsImpl();
222         this.outTransportMetrics = new HttpTransportMetricsImpl();
223         this.connMetrics = new HttpConnectionMetricsImpl(this.inTransportMetrics, this.outTransportMetrics);
224         this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
225             LaxContentLengthStrategy.INSTANCE;
226         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
227             StrictContentLengthStrategy.INSTANCE;
228 
229         setSession(session);
230         this.status = ACTIVE;
231     }
232 
233     private void setSession(final IOSession session) {
234         this.session = session;
235         this.context = new SessionHttpContext(this.session);
236         this.session.setBufferStatus(this);
237         this.remote = this.session.getRemoteAddress();
238     }
239 
240     /**
241      * Binds the connection to a different {@link IOSession}. This may be necessary
242      * when the underlying I/O session gets upgraded with SSL/TLS encryption.
243      *
244      * @since 4.2
245      */
246     protected void bind(final IOSession session) {
247         Args.notNull(session, "I/O session");
248         setSession(session);
249     }
250 
251     /**
252      * @since 4.2
253      *
254      * @deprecated (4.3) use constructor.
255      */
256     @Deprecated
257     protected ContentLengthStrategy createIncomingContentStrategy() {
258         return new LaxContentLengthStrategy();
259     }
260 
261     /**
262      * @since 4.2
263      *
264      * @deprecated (4.3) use constructor.
265      */
266     @Deprecated
267     protected ContentLengthStrategy createOutgoingContentStrategy() {
268         return new StrictContentLengthStrategy();
269     }
270 
271     /**
272      * @since 4.1
273      *
274      * @deprecated (4.3) no longer used.
275      */
276     @Deprecated
277     protected HttpTransportMetricsImpl createTransportMetrics() {
278         return new HttpTransportMetricsImpl();
279     }
280 
281     /**
282      * @since 4.1
283      *
284      * @deprecated (4.3) use decorator to add additional metrics.
285      */
286     @Deprecated
287     protected HttpConnectionMetricsImpl createConnectionMetrics(
288             final HttpTransportMetrics inTransportMetric,
289             final HttpTransportMetrics outTransportMetric) {
290         return new HttpConnectionMetricsImpl(inTransportMetric, outTransportMetric);
291     }
292 
293     public int getStatus() {
294         return this.status;
295     }
296 
297     public HttpContext getContext() {
298         return this.context;
299     }
300 
301     public HttpRequest getHttpRequest() {
302         return this.request;
303     }
304 
305     public HttpResponse getHttpResponse() {
306         return this.response;
307     }
308 
309     public void requestInput() {
310         this.session.setEvent(EventMask.READ);
311     }
312 
313     public void requestOutput() {
314         this.session.setEvent(EventMask.WRITE);
315     }
316 
317     public void suspendInput() {
318         this.session.clearEvent(EventMask.READ);
319     }
320 
321     public void suspendOutput() {
322         this.session.clearEvent(EventMask.WRITE);
323     }
324 
325     /**
326      * Initializes a specific {@link ContentDecoder} implementation based on the
327      * properties of the given {@link HttpMessage} and generates an instance of
328      * {@link HttpEntity} matching the properties of the content decoder.
329      *
330      * @param message the HTTP message.
331      * @return HTTP entity.
332      * @throws HttpException in case of an HTTP protocol violation.
333      */
334     protected HttpEntity prepareDecoder(final HttpMessage message) throws HttpException {
335         final BasicHttpEntity entity = new BasicHttpEntity();
336         final long len = this.incomingContentStrategy.determineLength(message);
337         this.contentDecoder = createContentDecoder(
338                 len,
339                 this.session.channel(),
340                 this.inbuf,
341                 this.inTransportMetrics);
342         if (len == ContentLengthStrategy.CHUNKED) {
343             entity.setChunked(true);
344             entity.setContentLength(-1);
345         } else if (len == ContentLengthStrategy.IDENTITY) {
346             entity.setChunked(false);
347             entity.setContentLength(-1);
348         } else {
349             entity.setChunked(false);
350             entity.setContentLength(len);
351         }
352 
353         final Header contentTypeHeader = message.getFirstHeader(HTTP.CONTENT_TYPE);
354         if (contentTypeHeader != null) {
355             entity.setContentType(contentTypeHeader);
356         }
357         final Header contentEncodingHeader = message.getFirstHeader(HTTP.CONTENT_ENCODING);
358         if (contentEncodingHeader != null) {
359             entity.setContentEncoding(contentEncodingHeader);
360         }
361         return entity;
362     }
363 
364     /**
365      * Factory method for {@link ContentDecoder} instances.
366      *
367      * @param len content length, if known, {@link ContentLengthStrategy#CHUNKED} or
368      *   {@link ContentLengthStrategy#IDENTITY}, if unknown.
369      * @param channel the session channel.
370      * @param buffer the session buffer.
371      * @param metrics transport metrics.
372      *
373      * @return content decoder.
374      *
375      * @since 4.1
376      */
377     protected ContentDecoder createContentDecoder(
378             final long len,
379             final ReadableByteChannel channel,
380             final SessionInputBuffer buffer,
381             final HttpTransportMetricsImpl metrics) {
382         if (len == ContentLengthStrategy.CHUNKED) {
383             return new ChunkDecoder(channel, buffer, metrics);
384         } else if (len == ContentLengthStrategy.IDENTITY) {
385             return new IdentityDecoder(channel, buffer, metrics);
386         } else {
387             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
388         }
389     }
390 
391     /**
392      * Initializes a specific {@link ContentEncoder} implementation based on the
393      * properties of the given {@link HttpMessage}.
394      *
395      * @param message the HTTP message.
396      * @throws HttpException in case of an HTTP protocol violation.
397      */
398     protected void prepareEncoder(final HttpMessage message) throws HttpException {
399         final long len = this.outgoingContentStrategy.determineLength(message);
400         this.contentEncoder = createContentEncoder(
401                 len,
402                 this.session.channel(),
403                 this.outbuf,
404                 this.outTransportMetrics);
405     }
406 
407     /**
408      * Factory method for {@link ContentEncoder} instances.
409      *
410      * @param len content length, if known, {@link ContentLengthStrategy#CHUNKED} or
411      *   {@link ContentLengthStrategy#IDENTITY}, if unknown.
412      * @param channel the session channel.
413      * @param buffer the session buffer.
414      * @param metrics transport metrics.
415      *
416      * @return content encoder.
417      *
418      * @since 4.1
419      */
420     protected ContentEncoder createContentEncoder(
421             final long len,
422             final WritableByteChannel channel,
423             final SessionOutputBuffer buffer,
424             final HttpTransportMetricsImpl metrics) {
425         if (len == ContentLengthStrategy.CHUNKED) {
426             return new ChunkEncoder(channel, buffer, metrics, this.fragmentSizeHint);
427         } else if (len == ContentLengthStrategy.IDENTITY) {
428             return new IdentityEncoder(channel, buffer, metrics, this.fragmentSizeHint);
429         } else {
430             return new LengthDelimitedEncoder(channel, buffer, metrics, len, this.fragmentSizeHint);
431         }
432     }
433 
434     public boolean hasBufferedInput() {
435         return this.hasBufferedInput;
436     }
437 
438     public boolean hasBufferedOutput() {
439         return this.hasBufferedOutput;
440     }
441 
442     /**
443      * Assets if the connection is still open.
444      *
445      * @throws ConnectionClosedException in case the connection has already
446      *   been closed.
447      */
448     protected void assertNotClosed() throws ConnectionClosedException {
449         if (this.status != ACTIVE) {
450             throw new ConnectionClosedException("Connection is closed");
451         }
452     }
453 
454     public void close() throws IOException {
455         if (this.status != ACTIVE) {
456             return;
457         }
458         this.status = CLOSING;
459         if (this.outbuf.hasData()) {
460             this.session.setEvent(EventMask.WRITE);
461         } else {
462             this.session.close();
463             this.status = CLOSED;
464         }
465     }
466 
467     public boolean isOpen() {
468         return this.status == ACTIVE && !this.session.isClosed();
469     }
470 
471     public boolean isStale() {
472         return this.session.isClosed();
473     }
474 
475     public InetAddress getLocalAddress() {
476         final SocketAddress address = this.session.getLocalAddress();
477         if (address instanceof InetSocketAddress) {
478             return ((InetSocketAddress) address).getAddress();
479         } else {
480             return null;
481         }
482     }
483 
484     public int getLocalPort() {
485         final SocketAddress address = this.session.getLocalAddress();
486         if (address instanceof InetSocketAddress) {
487             return ((InetSocketAddress) address).getPort();
488         } else {
489             return -1;
490         }
491     }
492 
493     public InetAddress getRemoteAddress() {
494         final SocketAddress address = this.session.getRemoteAddress();
495         if (address instanceof InetSocketAddress) {
496             return ((InetSocketAddress) address).getAddress();
497         } else {
498             return null;
499         }
500     }
501 
502     public int getRemotePort() {
503         final SocketAddress address = this.session.getRemoteAddress();
504         if (address instanceof InetSocketAddress) {
505             return ((InetSocketAddress) address).getPort();
506         } else {
507             return -1;
508         }
509     }
510 
511     public void setSocketTimeout(final int timeout) {
512         this.session.setSocketTimeout(timeout);
513     }
514 
515     public int getSocketTimeout() {
516         return this.session.getSocketTimeout();
517     }
518 
519     public void shutdown() throws IOException {
520         this.status = CLOSED;
521         this.session.shutdown();
522     }
523 
524     public HttpConnectionMetrics getMetrics() {
525         return this.connMetrics;
526     }
527 
528     @Override
529     public String toString() {
530         final StringBuilder buffer = new StringBuilder();
531         final SocketAddress remoteAddress = this.session.getRemoteAddress();
532         final SocketAddress localAddress = this.session.getLocalAddress();
533         if (remoteAddress != null && localAddress != null) {
534             NetUtils.formatAddress(buffer, localAddress);
535             buffer.append("<->");
536             NetUtils.formatAddress(buffer, remoteAddress);
537         }
538         buffer.append("[");
539         switch (this.status) {
540         case ACTIVE:
541             buffer.append("ACTIVE");
542             break;
543         case CLOSING:
544             buffer.append("CLOSING");
545             break;
546         case CLOSED:
547             buffer.append("CLOSED");
548             break;
549         }
550         buffer.append("]");
551         return buffer.toString();
552     }
553 
554     public Socket getSocket() {
555         if (this.session instanceof SocketAccessor) {
556             return ((SocketAccessor) this.session).getSocket();
557         } else {
558             return null;
559         }
560     }
561 
562 }