1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.nio.channels.ByteChannel;
35 import java.nio.channels.Channel;
36 import java.nio.channels.SelectionKey;
37 import java.nio.channels.SocketChannel;
38 import java.util.Collections;
39 import java.util.HashMap;
40 import java.util.Map;
41
42 import org.apache.http.annotation.ThreadSafe;
43 import org.apache.http.nio.reactor.IOSession;
44 import org.apache.http.nio.reactor.SessionBufferStatus;
45 import org.apache.http.nio.reactor.SocketAccessor;
46
47
48
49
50
51
52 @ThreadSafe
53 public class IOSessionImpl implements IOSession, SocketAccessor {
54
55 private final SelectionKey key;
56 private final ByteChannel channel;
57 private final Map<String, Object> attributes;
58 private final InterestOpsCallback interestOpsCallback;
59 private final SessionClosedCallback sessionClosedCallback;
60
61 private volatile int status;
62 private volatile int currentEventMask;
63 private volatile SessionBufferStatus bufferStatus;
64 private volatile int socketTimeout;
65
66 private final long startedTime;
67
68 private long lastReadTime;
69 private long lastWriteTime;
70 private long lastAccessTime;
71
72
73
74
75
76
77
78
79
80
81 public IOSessionImpl(
82 final SelectionKey key,
83 final InterestOpsCallback interestOpsCallback,
84 final SessionClosedCallback sessionClosedCallback) {
85 super();
86 if (key == null) {
87 throw new IllegalArgumentException("Selection key may not be null");
88 }
89 this.key = key;
90 this.channel = (ByteChannel) this.key.channel();
91 this.interestOpsCallback = interestOpsCallback;
92 this.sessionClosedCallback = sessionClosedCallback;
93 this.attributes = Collections.synchronizedMap(new HashMap<String, Object>());
94 this.currentEventMask = key.interestOps();
95 this.socketTimeout = 0;
96 this.status = ACTIVE;
97 long now = System.currentTimeMillis();
98 this.startedTime = now;
99 this.lastReadTime = now;
100 this.lastWriteTime = now;
101 this.lastAccessTime = now;
102 }
103
104
105
106
107
108
109
110 public IOSessionImpl(
111 final SelectionKey key,
112 final SessionClosedCallback sessionClosedCallback) {
113 this(key, null, sessionClosedCallback);
114 }
115
116 public ByteChannel channel() {
117 return this.channel;
118 }
119
120 public SocketAddress getLocalAddress() {
121 Channel channel = this.channel;
122 if (channel instanceof SocketChannel) {
123 return ((SocketChannel)channel).socket().getLocalSocketAddress();
124 } else {
125 return null;
126 }
127 }
128
129 public SocketAddress getRemoteAddress() {
130 Channel channel = this.channel;
131 if (channel instanceof SocketChannel) {
132 return ((SocketChannel)channel).socket().getRemoteSocketAddress();
133 } else {
134 return null;
135 }
136 }
137
138 public synchronized int getEventMask() {
139 return this.interestOpsCallback != null ? this.currentEventMask : this.key.interestOps();
140 }
141
142 public synchronized void setEventMask(int ops) {
143 if (this.status == CLOSED) {
144 return;
145 }
146 if (this.interestOpsCallback != null) {
147
148 this.currentEventMask = ops;
149
150
151 InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
152
153
154 this.interestOpsCallback.addInterestOps(entry);
155 } else {
156 this.key.interestOps(ops);
157 }
158 this.key.selector().wakeup();
159 }
160
161 public synchronized void setEvent(int op) {
162 if (this.status == CLOSED) {
163 return;
164 }
165 if (this.interestOpsCallback != null) {
166
167 this.currentEventMask |= op;
168
169
170 InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
171
172
173 this.interestOpsCallback.addInterestOps(entry);
174 } else {
175 int ops = this.key.interestOps();
176 this.key.interestOps(ops | op);
177 }
178 this.key.selector().wakeup();
179 }
180
181 public synchronized void clearEvent(int op) {
182 if (this.status == CLOSED) {
183 return;
184 }
185 if (this.interestOpsCallback != null) {
186
187 this.currentEventMask &= ~op;
188
189
190 InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
191
192
193 this.interestOpsCallback.addInterestOps(entry);
194 } else {
195 int ops = this.key.interestOps();
196 this.key.interestOps(ops & ~op);
197 }
198 this.key.selector().wakeup();
199 }
200
201 public int getSocketTimeout() {
202 return this.socketTimeout;
203 }
204
205 public synchronized void setSocketTimeout(int timeout) {
206 this.socketTimeout = timeout;
207 this.lastAccessTime = System.currentTimeMillis();
208 }
209
210 public synchronized void close() {
211 if (this.status == CLOSED) {
212 return;
213 }
214 this.status = CLOSED;
215 this.key.cancel();
216 try {
217 this.key.channel().close();
218 } catch (IOException ex) {
219
220
221 }
222 if (this.sessionClosedCallback != null) {
223 this.sessionClosedCallback.sessionClosed(this);
224 }
225 if (this.key.selector().isOpen()) {
226 this.key.selector().wakeup();
227 }
228 }
229
230 public int getStatus() {
231 return this.status;
232 }
233
234 public boolean isClosed() {
235 return this.status == CLOSED;
236 }
237
238 public void shutdown() {
239
240
241 close();
242 }
243
244 public boolean hasBufferedInput() {
245 SessionBufferStatus bufferStatus = this.bufferStatus;
246 return bufferStatus != null && bufferStatus.hasBufferedInput();
247 }
248
249 public boolean hasBufferedOutput() {
250 SessionBufferStatus bufferStatus = this.bufferStatus;
251 return bufferStatus != null && bufferStatus.hasBufferedOutput();
252 }
253
254 public void setBufferStatus(final SessionBufferStatus bufferStatus) {
255 this.bufferStatus = bufferStatus;
256 }
257
258 public Object getAttribute(final String name) {
259 return this.attributes.get(name);
260 }
261
262 public Object removeAttribute(final String name) {
263 return this.attributes.remove(name);
264 }
265
266 public void setAttribute(final String name, final Object obj) {
267 this.attributes.put(name, obj);
268 }
269
270 public synchronized long getStartedTime() {
271 return this.startedTime;
272 }
273
274 public synchronized long getLastReadTime() {
275 return this.lastReadTime;
276 }
277
278 public synchronized long getLastWriteTime() {
279 return this.lastWriteTime;
280 }
281
282 public synchronized long getLastAccessTime() {
283 return this.lastAccessTime;
284 }
285
286 synchronized void resetLastRead() {
287 long now = System.currentTimeMillis();
288 this.lastReadTime = now;
289 this.lastAccessTime = now;
290 }
291
292 synchronized void resetLastWrite() {
293 long now = System.currentTimeMillis();
294 this.lastWriteTime = now;
295 this.lastAccessTime = now;
296 }
297
298 private static void formatOps(final StringBuilder buffer, int ops) {
299 if ((ops & SelectionKey.OP_READ) > 0) {
300 buffer.append('r');
301 }
302 if ((ops & SelectionKey.OP_WRITE) > 0) {
303 buffer.append('w');
304 }
305 if ((ops & SelectionKey.OP_ACCEPT) > 0) {
306 buffer.append('a');
307 }
308 if ((ops & SelectionKey.OP_CONNECT) > 0) {
309 buffer.append('c');
310 }
311 }
312
313 private static void formatAddress(final StringBuilder buffer, final SocketAddress socketAddress) {
314 if (socketAddress instanceof InetSocketAddress) {
315 InetSocketAddress addr = ((InetSocketAddress) socketAddress);
316 buffer.append(addr.getAddress() != null ? addr.getAddress().getHostAddress() :
317 addr.getAddress())
318 .append(':')
319 .append(addr.getPort());
320 } else {
321 buffer.append(socketAddress);
322 }
323 }
324
325 @Override
326 public synchronized String toString() {
327 StringBuilder buffer = new StringBuilder();
328 SocketAddress remoteAddress = getRemoteAddress();
329 SocketAddress localAddress = getLocalAddress();
330 if (remoteAddress != null && localAddress != null) {
331 formatAddress(buffer, localAddress);
332 buffer.append("<->");
333 formatAddress(buffer, remoteAddress);
334 }
335 buffer.append("[");
336 switch (this.status) {
337 case ACTIVE:
338 buffer.append("ACTIVE");
339 break;
340 case CLOSING:
341 buffer.append("CLOSING");
342 break;
343 case CLOSED:
344 buffer.append("CLOSED");
345 break;
346 }
347 buffer.append("][");
348 if (this.key.isValid()) {
349 formatOps(buffer, this.interestOpsCallback != null ?
350 this.currentEventMask : this.key.interestOps());
351 buffer.append(":");
352 formatOps(buffer, this.key.readyOps());
353 }
354 buffer.append("]");
355 return buffer.toString();
356 }
357
358 public Socket getSocket() {
359 Channel channel = this.channel;
360 if (channel instanceof SocketChannel) {
361 return ((SocketChannel) channel).socket();
362 } else {
363 return null;
364 }
365 }
366
367 }