View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.net.InetAddress;
32  import java.net.InetSocketAddress;
33  import java.net.Socket;
34  import java.net.SocketAddress;
35  import java.nio.channels.ReadableByteChannel;
36  import java.nio.channels.WritableByteChannel;
37  import java.nio.charset.Charset;
38  import java.nio.charset.CharsetDecoder;
39  import java.nio.charset.CharsetEncoder;
40  import java.nio.charset.CodingErrorAction;
41  
42  import org.apache.http.ConnectionClosedException;
43  import org.apache.http.Consts;
44  import org.apache.http.Header;
45  import org.apache.http.HttpConnectionMetrics;
46  import org.apache.http.HttpEntity;
47  import org.apache.http.HttpException;
48  import org.apache.http.HttpInetConnection;
49  import org.apache.http.HttpMessage;
50  import org.apache.http.HttpRequest;
51  import org.apache.http.HttpResponse;
52  import org.apache.http.annotation.NotThreadSafe;
53  import org.apache.http.entity.BasicHttpEntity;
54  import org.apache.http.entity.ContentLengthStrategy;
55  import org.apache.http.impl.HttpConnectionMetricsImpl;
56  import org.apache.http.impl.entity.LaxContentLengthStrategy;
57  import org.apache.http.impl.entity.StrictContentLengthStrategy;
58  import org.apache.http.impl.io.HttpTransportMetricsImpl;
59  import org.apache.http.impl.nio.codecs.ChunkDecoder;
60  import org.apache.http.impl.nio.codecs.ChunkEncoder;
61  import org.apache.http.impl.nio.codecs.IdentityDecoder;
62  import org.apache.http.impl.nio.codecs.IdentityEncoder;
63  import org.apache.http.impl.nio.codecs.LengthDelimitedDecoder;
64  import org.apache.http.impl.nio.codecs.LengthDelimitedEncoder;
65  import org.apache.http.impl.nio.reactor.SessionInputBufferImpl;
66  import org.apache.http.impl.nio.reactor.SessionOutputBufferImpl;
67  import org.apache.http.io.HttpTransportMetrics;
68  import org.apache.http.nio.ContentDecoder;
69  import org.apache.http.nio.ContentEncoder;
70  import org.apache.http.nio.NHttpConnection;
71  import org.apache.http.nio.reactor.EventMask;
72  import org.apache.http.nio.reactor.IOSession;
73  import org.apache.http.nio.reactor.SessionBufferStatus;
74  import org.apache.http.nio.reactor.SessionInputBuffer;
75  import org.apache.http.nio.reactor.SessionOutputBuffer;
76  import org.apache.http.nio.reactor.SocketAccessor;
77  import org.apache.http.nio.util.ByteBufferAllocator;
78  import org.apache.http.params.CoreConnectionPNames;
79  import org.apache.http.params.CoreProtocolPNames;
80  import org.apache.http.params.HttpParams;
81  import org.apache.http.protocol.HTTP;
82  import org.apache.http.protocol.HttpContext;
83  import org.apache.http.util.Args;
84  import org.apache.http.util.CharsetUtils;
85  import org.apache.http.util.NetUtils;
86  
87  /**
88   * This class serves as a base for all {@link NHttpConnection} implementations and provides
89   * functionality common to both client and server HTTP connections.
90   *
91   * @since 4.0
92   */
93  @SuppressWarnings("deprecation")
94  @NotThreadSafe
95  public class NHttpConnectionBase
96          implements NHttpConnection, HttpInetConnection, SessionBufferStatus, SocketAccessor {
97  
98      protected final ContentLengthStrategy incomingContentStrategy;
99      protected final ContentLengthStrategy outgoingContentStrategy;
100 
101     protected final SessionInputBufferImpl inbuf;
102     protected final SessionOutputBufferImpl outbuf;
103     private final int fragmentSizeHint;
104 
105     protected final HttpTransportMetricsImpl inTransportMetrics;
106     protected final HttpTransportMetricsImpl outTransportMetrics;
107     protected final HttpConnectionMetricsImpl connMetrics;
108 
109     protected HttpContext context;
110     protected IOSession session;
111     protected SocketAddress remote;
112     protected volatile ContentDecoder contentDecoder;
113     protected volatile boolean hasBufferedInput;
114     protected volatile ContentEncoder contentEncoder;
115     protected volatile boolean hasBufferedOutput;
116     protected volatile HttpRequest request;
117     protected volatile HttpResponse response;
118 
119     protected volatile int status;
120 
121     /**
122      * Creates a new instance of this class given the underlying I/O session.
123      *
124      * @param session the underlying I/O session.
125      * @param allocator byte buffer allocator.
126      * @param params HTTP parameters.
127      *
128      * @deprecated (4.3) use
129      *   {@link NHttpConnectionBase#NHttpConnectionBase(IOSession, int, int, ByteBufferAllocator,
130      *   CharsetDecoder, CharsetEncoder, ContentLengthStrategy, ContentLengthStrategy)}
131      */
132     @Deprecated
133     public NHttpConnectionBase(
134             final IOSession session,
135             final ByteBufferAllocator allocator,
136             final HttpParams params) {
137         super();
138         Args.notNull(session, "I/O session");
139         Args.notNull(params, "HTTP params");
140 
141         int buffersize = params.getIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, -1);
142         if (buffersize <= 0) {
143             buffersize = 4096;
144         }
145         int linebuffersize = buffersize;
146         if (linebuffersize > 512) {
147             linebuffersize = 512;
148         }
149 
150         CharsetDecoder decoder = null;
151         CharsetEncoder encoder = null;
152         Charset charset = CharsetUtils.lookup(
153                 (String) params.getParameter(CoreProtocolPNames.HTTP_ELEMENT_CHARSET));
154         if (charset != null) {
155             charset = Consts.ASCII;
156             decoder = charset.newDecoder();
157             encoder = charset.newEncoder();
158             final CodingErrorAction malformedCharAction = (CodingErrorAction) params.getParameter(
159                     CoreProtocolPNames.HTTP_MALFORMED_INPUT_ACTION);
160             final CodingErrorAction unmappableCharAction = (CodingErrorAction) params.getParameter(
161                     CoreProtocolPNames.HTTP_UNMAPPABLE_INPUT_ACTION);
162             decoder.onMalformedInput(malformedCharAction).onUnmappableCharacter(unmappableCharAction);
163             encoder.onMalformedInput(malformedCharAction).onUnmappableCharacter(unmappableCharAction);
164         }
165         this.inbuf = new SessionInputBufferImpl(buffersize, linebuffersize, decoder, allocator);
166         this.outbuf = new SessionOutputBufferImpl(buffersize, linebuffersize, encoder, allocator);
167         this.fragmentSizeHint = buffersize;
168 
169         this.incomingContentStrategy = createIncomingContentStrategy();
170         this.outgoingContentStrategy = createOutgoingContentStrategy();
171 
172         this.inTransportMetrics = createTransportMetrics();
173         this.outTransportMetrics = createTransportMetrics();
174         this.connMetrics = createConnectionMetrics(
175                 this.inTransportMetrics,
176                 this.outTransportMetrics);
177 
178         setSession(session);
179         this.status = ACTIVE;
180     }
181 
182     /**
183      * Creates new instance NHttpConnectionBase given the underlying I/O session.
184      *
185      * @param session the underlying I/O session.
186      * @param buffersize buffer size. Must be a positive number.
187      * @param fragmentSizeHint fragment size hint.
188      * @param allocator memory allocator.
189      *   If <code>null</code> {@link org.apache.http.nio.util.HeapByteBufferAllocator#INSTANCE}
190      *   will be used.
191      * @param chardecoder decoder to be used for decoding HTTP protocol elements.
192      *   If <code>null</code> simple type cast will be used for byte to char conversion.
193      * @param charencoder encoder to be used for encoding HTTP protocol elements.
194      *   If <code>null</code> simple type cast will be used for char to byte conversion.
195      * @param incomingContentStrategy incoming content length strategy. If <code>null</code>
196      *   {@link LaxContentLengthStrategy#INSTANCE} will be used.
197      * @param outgoingContentStrategy outgoing content length strategy. If <code>null</code>
198      *   {@link StrictContentLengthStrategy#INSTANCE} will be used.
199      *
200      * @since 4.3
201      */
202     protected NHttpConnectionBase(
203             final IOSession session,
204             final int buffersize,
205             final int fragmentSizeHint,
206             final ByteBufferAllocator allocator,
207             final CharsetDecoder chardecoder,
208             final CharsetEncoder charencoder,
209             final ContentLengthStrategy incomingContentStrategy,
210             final ContentLengthStrategy outgoingContentStrategy) {
211         Args.notNull(session, "I/O session");
212         Args.positive(buffersize, "Buffer size");
213         int linebuffersize = buffersize;
214         if (linebuffersize > 512) {
215             linebuffersize = 512;
216         }
217         this.inbuf = new SessionInputBufferImpl(buffersize, linebuffersize, chardecoder, allocator);
218         this.outbuf = new SessionOutputBufferImpl(buffersize, linebuffersize, charencoder, allocator);
219         this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : buffersize;
220 
221         this.inTransportMetrics = new HttpTransportMetricsImpl();
222         this.outTransportMetrics = new HttpTransportMetricsImpl();
223         this.connMetrics = new HttpConnectionMetricsImpl(this.inTransportMetrics, this.outTransportMetrics);
224         this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
225             LaxContentLengthStrategy.INSTANCE;
226         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
227             StrictContentLengthStrategy.INSTANCE;
228 
229         setSession(session);
230         this.status = ACTIVE;
231     }
232 
233     private void setSession(final IOSession session) {
234         this.session = session;
235         this.context = new SessionHttpContext(this.session);
236         this.session.setBufferStatus(this);
237         this.remote = this.session.getRemoteAddress();
238     }
239 
240     /**
241      * Binds the connection to a different {@link IOSession}. This may be necessary
242      * when the underlying I/O session gets upgraded with SSL/TLS encryption.
243      *
244      * @since 4.2
245      */
246     protected void bind(final IOSession session) {
247         Args.notNull(session, "I/O session");
248         this.session.setBufferStatus(null);
249         setSession(session);
250     }
251 
252     /**
253      * @since 4.2
254      *
255      * @deprecated (4.3) use constructor.
256      */
257     @Deprecated
258     protected ContentLengthStrategy createIncomingContentStrategy() {
259         return new LaxContentLengthStrategy();
260     }
261 
262     /**
263      * @since 4.2
264      *
265      * @deprecated (4.3) use constructor.
266      */
267     @Deprecated
268     protected ContentLengthStrategy createOutgoingContentStrategy() {
269         return new StrictContentLengthStrategy();
270     }
271 
272     /**
273      * @since 4.1
274      *
275      * @deprecated (4.3) no longer used.
276      */
277     @Deprecated
278     protected HttpTransportMetricsImpl createTransportMetrics() {
279         return new HttpTransportMetricsImpl();
280     }
281 
282     /**
283      * @since 4.1
284      *
285      * @deprecated (4.3) use decorator to add additional metrics.
286      */
287     @Deprecated
288     protected HttpConnectionMetricsImpl createConnectionMetrics(
289             final HttpTransportMetrics inTransportMetric,
290             final HttpTransportMetrics outTransportMetric) {
291         return new HttpConnectionMetricsImpl(inTransportMetric, outTransportMetric);
292     }
293 
294     public int getStatus() {
295         return this.status;
296     }
297 
298     public HttpContext getContext() {
299         return this.context;
300     }
301 
302     public HttpRequest getHttpRequest() {
303         return this.request;
304     }
305 
306     public HttpResponse getHttpResponse() {
307         return this.response;
308     }
309 
310     public void requestInput() {
311         this.session.setEvent(EventMask.READ);
312     }
313 
314     public void requestOutput() {
315         this.session.setEvent(EventMask.WRITE);
316     }
317 
318     public void suspendInput() {
319         this.session.clearEvent(EventMask.READ);
320     }
321 
322     public void suspendOutput() {
323         this.session.clearEvent(EventMask.WRITE);
324     }
325 
326     /**
327      * Initializes a specific {@link ContentDecoder} implementation based on the
328      * properties of the given {@link HttpMessage} and generates an instance of
329      * {@link HttpEntity} matching the properties of the content decoder.
330      *
331      * @param message the HTTP message.
332      * @return HTTP entity.
333      * @throws HttpException in case of an HTTP protocol violation.
334      */
335     protected HttpEntity prepareDecoder(final HttpMessage message) throws HttpException {
336         final BasicHttpEntity entity = new BasicHttpEntity();
337         final long len = this.incomingContentStrategy.determineLength(message);
338         this.contentDecoder = createContentDecoder(
339                 len,
340                 this.session.channel(),
341                 this.inbuf,
342                 this.inTransportMetrics);
343         if (len == ContentLengthStrategy.CHUNKED) {
344             entity.setChunked(true);
345             entity.setContentLength(-1);
346         } else if (len == ContentLengthStrategy.IDENTITY) {
347             entity.setChunked(false);
348             entity.setContentLength(-1);
349         } else {
350             entity.setChunked(false);
351             entity.setContentLength(len);
352         }
353 
354         final Header contentTypeHeader = message.getFirstHeader(HTTP.CONTENT_TYPE);
355         if (contentTypeHeader != null) {
356             entity.setContentType(contentTypeHeader);
357         }
358         final Header contentEncodingHeader = message.getFirstHeader(HTTP.CONTENT_ENCODING);
359         if (contentEncodingHeader != null) {
360             entity.setContentEncoding(contentEncodingHeader);
361         }
362         return entity;
363     }
364 
365     /**
366      * Factory method for {@link ContentDecoder} instances.
367      *
368      * @param len content length, if known, {@link ContentLengthStrategy#CHUNKED} or
369      *   {@link ContentLengthStrategy#IDENTITY}, if unknown.
370      * @param channel the session channel.
371      * @param buffer the session buffer.
372      * @param metrics transport metrics.
373      *
374      * @return content decoder.
375      *
376      * @since 4.1
377      */
378     protected ContentDecoder createContentDecoder(
379             final long len,
380             final ReadableByteChannel channel,
381             final SessionInputBuffer buffer,
382             final HttpTransportMetricsImpl metrics) {
383         if (len == ContentLengthStrategy.CHUNKED) {
384             return new ChunkDecoder(channel, buffer, metrics);
385         } else if (len == ContentLengthStrategy.IDENTITY) {
386             return new IdentityDecoder(channel, buffer, metrics);
387         } else {
388             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
389         }
390     }
391 
392     /**
393      * Initializes a specific {@link ContentEncoder} implementation based on the
394      * properties of the given {@link HttpMessage}.
395      *
396      * @param message the HTTP message.
397      * @throws HttpException in case of an HTTP protocol violation.
398      */
399     protected void prepareEncoder(final HttpMessage message) throws HttpException {
400         final long len = this.outgoingContentStrategy.determineLength(message);
401         this.contentEncoder = createContentEncoder(
402                 len,
403                 this.session.channel(),
404                 this.outbuf,
405                 this.outTransportMetrics);
406     }
407 
408     /**
409      * Factory method for {@link ContentEncoder} instances.
410      *
411      * @param len content length, if known, {@link ContentLengthStrategy#CHUNKED} or
412      *   {@link ContentLengthStrategy#IDENTITY}, if unknown.
413      * @param channel the session channel.
414      * @param buffer the session buffer.
415      * @param metrics transport metrics.
416      *
417      * @return content encoder.
418      *
419      * @since 4.1
420      */
421     protected ContentEncoder createContentEncoder(
422             final long len,
423             final WritableByteChannel channel,
424             final SessionOutputBuffer buffer,
425             final HttpTransportMetricsImpl metrics) {
426         if (len == ContentLengthStrategy.CHUNKED) {
427             return new ChunkEncoder(channel, buffer, metrics, this.fragmentSizeHint);
428         } else if (len == ContentLengthStrategy.IDENTITY) {
429             return new IdentityEncoder(channel, buffer, metrics, this.fragmentSizeHint);
430         } else {
431             return new LengthDelimitedEncoder(channel, buffer, metrics, len, this.fragmentSizeHint);
432         }
433     }
434 
435     public boolean hasBufferedInput() {
436         return this.hasBufferedInput;
437     }
438 
439     public boolean hasBufferedOutput() {
440         return this.hasBufferedOutput;
441     }
442 
443     /**
444      * Assets if the connection is still open.
445      *
446      * @throws ConnectionClosedException in case the connection has already
447      *   been closed.
448      */
449     protected void assertNotClosed() throws ConnectionClosedException {
450         if (this.status != ACTIVE) {
451             throw new ConnectionClosedException("Connection is closed");
452         }
453     }
454 
455     public void close() throws IOException {
456         if (this.status != ACTIVE) {
457             return;
458         }
459         this.status = CLOSING;
460         if (this.outbuf.hasData()) {
461             this.session.setEvent(EventMask.WRITE);
462         } else {
463             this.session.close();
464             this.status = CLOSED;
465         }
466     }
467 
468     public boolean isOpen() {
469         return this.status == ACTIVE && !this.session.isClosed();
470     }
471 
472     public boolean isStale() {
473         return this.session.isClosed();
474     }
475 
476     public InetAddress getLocalAddress() {
477         final SocketAddress address = this.session.getLocalAddress();
478         if (address instanceof InetSocketAddress) {
479             return ((InetSocketAddress) address).getAddress();
480         } else {
481             return null;
482         }
483     }
484 
485     public int getLocalPort() {
486         final SocketAddress address = this.session.getLocalAddress();
487         if (address instanceof InetSocketAddress) {
488             return ((InetSocketAddress) address).getPort();
489         } else {
490             return -1;
491         }
492     }
493 
494     public InetAddress getRemoteAddress() {
495         final SocketAddress address = this.session.getRemoteAddress();
496         if (address instanceof InetSocketAddress) {
497             return ((InetSocketAddress) address).getAddress();
498         } else {
499             return null;
500         }
501     }
502 
503     public int getRemotePort() {
504         final SocketAddress address = this.session.getRemoteAddress();
505         if (address instanceof InetSocketAddress) {
506             return ((InetSocketAddress) address).getPort();
507         } else {
508             return -1;
509         }
510     }
511 
512     public void setSocketTimeout(final int timeout) {
513         this.session.setSocketTimeout(timeout);
514     }
515 
516     public int getSocketTimeout() {
517         return this.session.getSocketTimeout();
518     }
519 
520     public void shutdown() throws IOException {
521         this.status = CLOSED;
522         this.session.shutdown();
523     }
524 
525     public HttpConnectionMetrics getMetrics() {
526         return this.connMetrics;
527     }
528 
529     @Override
530     public String toString() {
531         final StringBuilder buffer = new StringBuilder();
532         final SocketAddress remoteAddress = this.session.getRemoteAddress();
533         final SocketAddress localAddress = this.session.getLocalAddress();
534         if (remoteAddress != null && localAddress != null) {
535             NetUtils.formatAddress(buffer, localAddress);
536             buffer.append("<->");
537             NetUtils.formatAddress(buffer, remoteAddress);
538         }
539         buffer.append("[");
540         switch (this.status) {
541         case ACTIVE:
542             buffer.append("ACTIVE");
543             break;
544         case CLOSING:
545             buffer.append("CLOSING");
546             break;
547         case CLOSED:
548             buffer.append("CLOSED");
549             break;
550         }
551         buffer.append("]");
552         return buffer.toString();
553     }
554 
555     public Socket getSocket() {
556         if (this.session instanceof SocketAccessor) {
557             return ((SocketAccessor) this.session).getSocket();
558         } else {
559             return null;
560         }
561     }
562 
563 }