1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.InetSocketAddress;
33 import java.net.Socket;
34 import java.net.SocketAddress;
35 import java.nio.channels.ReadableByteChannel;
36 import java.nio.channels.WritableByteChannel;
37
38 import org.apache.http.ConnectionClosedException;
39 import org.apache.http.Header;
40 import org.apache.http.HttpConnectionMetrics;
41 import org.apache.http.HttpEntity;
42 import org.apache.http.HttpException;
43 import org.apache.http.HttpInetConnection;
44 import org.apache.http.HttpMessage;
45 import org.apache.http.HttpRequest;
46 import org.apache.http.HttpResponse;
47 import org.apache.http.annotation.NotThreadSafe;
48 import org.apache.http.entity.BasicHttpEntity;
49 import org.apache.http.entity.ContentLengthStrategy;
50 import org.apache.http.impl.HttpConnectionMetricsImpl;
51 import org.apache.http.impl.entity.LaxContentLengthStrategy;
52 import org.apache.http.impl.entity.StrictContentLengthStrategy;
53 import org.apache.http.impl.io.HttpTransportMetricsImpl;
54 import org.apache.http.nio.ContentDecoder;
55 import org.apache.http.nio.ContentEncoder;
56 import org.apache.http.nio.NHttpConnection;
57 import org.apache.http.impl.nio.codecs.ChunkDecoder;
58 import org.apache.http.impl.nio.codecs.ChunkEncoder;
59 import org.apache.http.impl.nio.codecs.IdentityDecoder;
60 import org.apache.http.impl.nio.codecs.IdentityEncoder;
61 import org.apache.http.impl.nio.codecs.LengthDelimitedDecoder;
62 import org.apache.http.impl.nio.codecs.LengthDelimitedEncoder;
63 import org.apache.http.impl.nio.reactor.SessionInputBufferImpl;
64 import org.apache.http.impl.nio.reactor.SessionOutputBufferImpl;
65 import org.apache.http.io.HttpTransportMetrics;
66 import org.apache.http.nio.reactor.EventMask;
67 import org.apache.http.nio.reactor.IOSession;
68 import org.apache.http.nio.reactor.SessionBufferStatus;
69 import org.apache.http.nio.reactor.SessionInputBuffer;
70 import org.apache.http.nio.reactor.SessionOutputBuffer;
71 import org.apache.http.nio.reactor.SocketAccessor;
72 import org.apache.http.nio.util.ByteBufferAllocator;
73 import org.apache.http.params.HttpConnectionParams;
74 import org.apache.http.params.HttpParams;
75 import org.apache.http.protocol.HTTP;
76 import org.apache.http.protocol.HttpContext;
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 @NotThreadSafe
93 public class NHttpConnectionBase
94 implements NHttpConnection, HttpInetConnection, SessionBufferStatus, SocketAccessor {
95
96 protected final ContentLengthStrategy incomingContentStrategy;
97 protected final ContentLengthStrategy outgoingContentStrategy;
98
99 protected final SessionInputBufferImpl inbuf;
100 protected final SessionOutputBufferImpl outbuf;
101
102 protected final HttpTransportMetricsImpl inTransportMetrics;
103 protected final HttpTransportMetricsImpl outTransportMetrics;
104 protected final HttpConnectionMetricsImpl connMetrics;
105
106 protected HttpContext context;
107 protected IOSession session;
108 protected SocketAddress remote;
109 protected volatile ContentDecoder contentDecoder;
110 protected volatile boolean hasBufferedInput;
111 protected volatile ContentEncoder contentEncoder;
112 protected volatile boolean hasBufferedOutput;
113 protected volatile HttpRequest request;
114 protected volatile HttpResponse response;
115
116 protected volatile int status;
117
118
119
120
121
122
123
124
125 public NHttpConnectionBase(
126 final IOSession session,
127 final ByteBufferAllocator allocator,
128 final HttpParams params) {
129 super();
130 if (session == null) {
131 throw new IllegalArgumentException("I/O session may not be null");
132 }
133 if (params == null) {
134 throw new IllegalArgumentException("HTTP params may not be null");
135 }
136
137 int buffersize = HttpConnectionParams.getSocketBufferSize(params);
138 if (buffersize <= 0) {
139 buffersize = 4096;
140 }
141 int linebuffersize = buffersize;
142 if (linebuffersize > 512) {
143 linebuffersize = 512;
144 }
145
146 this.inbuf = new SessionInputBufferImpl(buffersize, linebuffersize, allocator, params);
147 this.outbuf = new SessionOutputBufferImpl(buffersize, linebuffersize, allocator, params);
148
149 this.incomingContentStrategy = createIncomingContentStrategy();
150 this.outgoingContentStrategy = createOutgoingContentStrategy();
151
152 this.inTransportMetrics = createTransportMetrics();
153 this.outTransportMetrics = createTransportMetrics();
154 this.connMetrics = createConnectionMetrics(
155 this.inTransportMetrics,
156 this.outTransportMetrics);
157
158 setSession(session);
159 this.status = ACTIVE;
160 }
161
162 private void setSession(final IOSession session) {
163 this.session = session;
164 this.context = new SessionHttpContext(this.session);
165 this.session.setBufferStatus(this);
166 this.remote = this.session.getRemoteAddress();
167 }
168
169
170
171
172
173
174
175 protected void bind(final IOSession session) {
176 if (session == null) {
177 throw new IllegalArgumentException("I/O session may not be null");
178 }
179 this.session.setBufferStatus(null);
180 setSession(session);
181 }
182
183
184
185
186 protected ContentLengthStrategy createIncomingContentStrategy() {
187 return new LaxContentLengthStrategy();
188 }
189
190
191
192
193 protected ContentLengthStrategy createOutgoingContentStrategy() {
194 return new StrictContentLengthStrategy();
195 }
196
197
198
199
200 protected HttpTransportMetricsImpl createTransportMetrics() {
201 return new HttpTransportMetricsImpl();
202 }
203
204
205
206
207 protected HttpConnectionMetricsImpl createConnectionMetrics(
208 final HttpTransportMetrics inTransportMetric,
209 final HttpTransportMetrics outTransportMetric) {
210 return new HttpConnectionMetricsImpl(inTransportMetric, outTransportMetric);
211 }
212
213 public int getStatus() {
214 return this.status;
215 }
216
217 public HttpContext getContext() {
218 return this.context;
219 }
220
221 public HttpRequest getHttpRequest() {
222 return this.request;
223 }
224
225 public HttpResponse getHttpResponse() {
226 return this.response;
227 }
228
229 public void requestInput() {
230 this.session.setEvent(EventMask.READ);
231 }
232
233 public void requestOutput() {
234 this.session.setEvent(EventMask.WRITE);
235 }
236
237 public void suspendInput() {
238 this.session.clearEvent(EventMask.READ);
239 }
240
241 public void suspendOutput() {
242 this.session.clearEvent(EventMask.WRITE);
243 }
244
245
246
247
248
249
250
251
252
253
254 protected HttpEntity prepareDecoder(final HttpMessage message) throws HttpException {
255 BasicHttpEntity entity = new BasicHttpEntity();
256 long len = this.incomingContentStrategy.determineLength(message);
257 this.contentDecoder = createContentDecoder(
258 len,
259 this.session.channel(),
260 this.inbuf,
261 this.inTransportMetrics);
262 if (len == ContentLengthStrategy.CHUNKED) {
263 entity.setChunked(true);
264 entity.setContentLength(-1);
265 } else if (len == ContentLengthStrategy.IDENTITY) {
266 entity.setChunked(false);
267 entity.setContentLength(-1);
268 } else {
269 entity.setChunked(false);
270 entity.setContentLength(len);
271 }
272
273 Header contentTypeHeader = message.getFirstHeader(HTTP.CONTENT_TYPE);
274 if (contentTypeHeader != null) {
275 entity.setContentType(contentTypeHeader);
276 }
277 Header contentEncodingHeader = message.getFirstHeader(HTTP.CONTENT_ENCODING);
278 if (contentEncodingHeader != null) {
279 entity.setContentEncoding(contentEncodingHeader);
280 }
281 return entity;
282 }
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297 protected ContentDecoder createContentDecoder(
298 final long len,
299 final ReadableByteChannel channel,
300 final SessionInputBuffer buffer,
301 final HttpTransportMetricsImpl metrics) {
302 if (len == ContentLengthStrategy.CHUNKED) {
303 return new ChunkDecoder(channel, buffer, metrics);
304 } else if (len == ContentLengthStrategy.IDENTITY) {
305 return new IdentityDecoder(channel, buffer, metrics);
306 } else {
307 return new LengthDelimitedDecoder(channel, buffer, metrics, len);
308 }
309 }
310
311
312
313
314
315
316
317
318 protected void prepareEncoder(final HttpMessage message) throws HttpException {
319 long len = this.outgoingContentStrategy.determineLength(message);
320 this.contentEncoder = createContentEncoder(
321 len,
322 this.session.channel(),
323 this.outbuf,
324 this.outTransportMetrics);
325 }
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340 protected ContentEncoder createContentEncoder(
341 final long len,
342 final WritableByteChannel channel,
343 final SessionOutputBuffer buffer,
344 final HttpTransportMetricsImpl metrics) {
345 if (len == ContentLengthStrategy.CHUNKED) {
346 return new ChunkEncoder(channel, buffer, metrics);
347 } else if (len == ContentLengthStrategy.IDENTITY) {
348 return new IdentityEncoder(channel, buffer, metrics);
349 } else {
350 return new LengthDelimitedEncoder(channel, buffer, metrics, len);
351 }
352 }
353
354 public boolean hasBufferedInput() {
355 return this.hasBufferedInput;
356 }
357
358 public boolean hasBufferedOutput() {
359 return this.hasBufferedOutput;
360 }
361
362
363
364
365
366
367
368 protected void assertNotClosed() throws ConnectionClosedException {
369 if (this.status != ACTIVE) {
370 throw new ConnectionClosedException("Connection is closed");
371 }
372 }
373
374 public void close() throws IOException {
375 if (this.status != ACTIVE) {
376 return;
377 }
378 this.status = CLOSING;
379 if (this.outbuf.hasData()) {
380 this.session.setEvent(EventMask.WRITE);
381 } else {
382 this.session.close();
383 this.status = CLOSED;
384 }
385 }
386
387 public boolean isOpen() {
388 return this.status == ACTIVE && !this.session.isClosed();
389 }
390
391 public boolean isStale() {
392 return this.session.isClosed();
393 }
394
395 public InetAddress getLocalAddress() {
396 SocketAddress address = this.session.getLocalAddress();
397 if (address instanceof InetSocketAddress) {
398 return ((InetSocketAddress) address).getAddress();
399 } else {
400 return null;
401 }
402 }
403
404 public int getLocalPort() {
405 SocketAddress address = this.session.getLocalAddress();
406 if (address instanceof InetSocketAddress) {
407 return ((InetSocketAddress) address).getPort();
408 } else {
409 return -1;
410 }
411 }
412
413 public InetAddress getRemoteAddress() {
414 SocketAddress address = this.session.getRemoteAddress();
415 if (address instanceof InetSocketAddress) {
416 return ((InetSocketAddress) address).getAddress();
417 } else {
418 return null;
419 }
420 }
421
422 public int getRemotePort() {
423 SocketAddress address = this.session.getRemoteAddress();
424 if (address instanceof InetSocketAddress) {
425 return ((InetSocketAddress) address).getPort();
426 } else {
427 return -1;
428 }
429 }
430
431 public void setSocketTimeout(int timeout) {
432 this.session.setSocketTimeout(timeout);
433 }
434
435 public int getSocketTimeout() {
436 return this.session.getSocketTimeout();
437 }
438
439 public void shutdown() throws IOException {
440 this.status = CLOSED;
441 this.session.shutdown();
442 }
443
444 public HttpConnectionMetrics getMetrics() {
445 return this.connMetrics;
446 }
447
448 private static void formatAddress(final StringBuilder buffer, final SocketAddress socketAddress) {
449 if (socketAddress instanceof InetSocketAddress) {
450 InetSocketAddress addr = ((InetSocketAddress) socketAddress);
451 buffer.append(addr.getAddress() != null ? addr.getAddress().getHostAddress() :
452 addr.getAddress())
453 .append(':')
454 .append(addr.getPort());
455 } else {
456 buffer.append(socketAddress);
457 }
458 }
459
460 @Override
461 public String toString() {
462 StringBuilder buffer = new StringBuilder();
463 SocketAddress remoteAddress = this.session.getRemoteAddress();
464 SocketAddress localAddress = this.session.getLocalAddress();
465 if (remoteAddress != null && localAddress != null) {
466 formatAddress(buffer, localAddress);
467 buffer.append("<->");
468 formatAddress(buffer, remoteAddress);
469 }
470 buffer.append("[");
471 switch (this.status) {
472 case ACTIVE:
473 buffer.append("ACTIVE");
474 break;
475 case CLOSING:
476 buffer.append("CLOSING");
477 break;
478 case CLOSED:
479 buffer.append("CLOSED");
480 break;
481 }
482 buffer.append("]");
483 return buffer.toString();
484 }
485
486 public Socket getSocket() {
487 if (this.session instanceof SocketAccessor) {
488 return ((SocketAccessor) this.session).getSocket();
489 } else {
490 return null;
491 }
492 }
493
494 }