View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.nio.channels.ByteChannel;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.SocketChannel;
37  import java.util.Collections;
38  import java.util.HashMap;
39  import java.util.Map;
40  
41  import org.apache.http.annotation.ThreadSafe;
42  import org.apache.http.nio.reactor.IOSession;
43  import org.apache.http.nio.reactor.SessionBufferStatus;
44  import org.apache.http.nio.reactor.SocketAccessor;
45  import org.apache.http.util.Args;
46  
47  /**
48   * Default implementation of {@link IOSession}.
49   *
50   * @since 4.0
51   */
52  @ThreadSafe
53  public class IOSessionImpl implements IOSession, SocketAccessor {
54  
55      private final SelectionKey key;
56      private final ByteChannel channel;
57      private final Map<String, Object> attributes;
58      private final InterestOpsCallback interestOpsCallback;
59      private final SessionClosedCallback sessionClosedCallback;
60  
61      private volatile int status;
62      private volatile int currentEventMask;
63      private volatile SessionBufferStatus bufferStatus;
64      private volatile int socketTimeout;
65  
66      private final long startedTime;
67  
68      private long lastReadTime;
69      private long lastWriteTime;
70      private long lastAccessTime;
71  
72      /**
73       * Creates new instance of IOSessionImpl.
74       *
75       * @param key the selection key.
76       * @param interestOpsCallback interestOps callback.
77       * @param sessionClosedCallback session closed callback.
78       *
79       * @since 4.1
80       */
81      public IOSessionImpl(
82              final SelectionKey key,
83              final InterestOpsCallback interestOpsCallback,
84              final SessionClosedCallback sessionClosedCallback) {
85          super();
86          Args.notNull(key, "Selection key");
87          this.key = key;
88          this.channel = (ByteChannel) this.key.channel();
89          this.interestOpsCallback = interestOpsCallback;
90          this.sessionClosedCallback = sessionClosedCallback;
91          this.attributes = Collections.synchronizedMap(new HashMap<String, Object>());
92          this.currentEventMask = key.interestOps();
93          this.socketTimeout = 0;
94          this.status = ACTIVE;
95          final long now = System.currentTimeMillis();
96          this.startedTime = now;
97          this.lastReadTime = now;
98          this.lastWriteTime = now;
99          this.lastAccessTime = now;
100     }
101 
102     /**
103      * Creates new instance of IOSessionImpl.
104      *
105      * @param key the selection key.
106      * @param sessionClosedCallback session closed callback.
107      */
108     public IOSessionImpl(
109             final SelectionKey key,
110             final SessionClosedCallback sessionClosedCallback) {
111         this(key, null, sessionClosedCallback);
112     }
113 
114     public ByteChannel channel() {
115         return this.channel;
116     }
117 
118     public SocketAddress getLocalAddress() {
119         if (this.channel instanceof SocketChannel) {
120             return ((SocketChannel)this.channel).socket().getLocalSocketAddress();
121         } else {
122             return null;
123         }
124     }
125 
126     public SocketAddress getRemoteAddress() {
127         if (this.channel instanceof SocketChannel) {
128             return ((SocketChannel)this.channel).socket().getRemoteSocketAddress();
129         } else {
130             return null;
131         }
132     }
133 
134     public synchronized int getEventMask() {
135         return this.interestOpsCallback != null ? this.currentEventMask : this.key.interestOps();
136     }
137 
138     public synchronized void setEventMask(final int ops) {
139         if (this.status == CLOSED) {
140             return;
141         }
142         if (this.interestOpsCallback != null) {
143             // update the current event mask
144             this.currentEventMask = ops;
145 
146             // local variable
147             final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
148 
149             // add this operation to the interestOps() queue
150             this.interestOpsCallback.addInterestOps(entry);
151         } else {
152             this.key.interestOps(ops);
153         }
154         this.key.selector().wakeup();
155     }
156 
157     public synchronized void setEvent(final int op) {
158         if (this.status == CLOSED) {
159             return;
160         }
161         if (this.interestOpsCallback != null) {
162             // update the current event mask
163             this.currentEventMask |= op;
164 
165             // local variable
166             final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
167 
168             // add this operation to the interestOps() queue
169             this.interestOpsCallback.addInterestOps(entry);
170         } else {
171             final int ops = this.key.interestOps();
172             this.key.interestOps(ops | op);
173         }
174         this.key.selector().wakeup();
175     }
176 
177     public synchronized void clearEvent(final int op) {
178         if (this.status == CLOSED) {
179             return;
180         }
181         if (this.interestOpsCallback != null) {
182             // update the current event mask
183             this.currentEventMask &= ~op;
184 
185             // local variable
186             final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
187 
188             // add this operation to the interestOps() queue
189             this.interestOpsCallback.addInterestOps(entry);
190         } else {
191             final int ops = this.key.interestOps();
192             this.key.interestOps(ops & ~op);
193         }
194         this.key.selector().wakeup();
195     }
196 
197     public int getSocketTimeout() {
198         return this.socketTimeout;
199     }
200 
201     public synchronized void setSocketTimeout(final int timeout) {
202         this.socketTimeout = timeout;
203         this.lastAccessTime = System.currentTimeMillis();
204     }
205 
206     public synchronized void close() {
207         if (this.status == CLOSED) {
208             return;
209         }
210         this.status = CLOSED;
211         this.key.cancel();
212         try {
213             this.key.channel().close();
214         } catch (final IOException ex) {
215             // Munching exceptions is not nice
216             // but in this case it is justified
217         }
218         if (this.sessionClosedCallback != null) {
219             this.sessionClosedCallback.sessionClosed(this);
220         }
221         if (this.key.selector().isOpen()) {
222             this.key.selector().wakeup();
223         }
224     }
225 
226     public int getStatus() {
227         return this.status;
228     }
229 
230     public boolean isClosed() {
231         return this.status == CLOSED;
232     }
233 
234     public void shutdown() {
235         // For this type of session, a close() does exactly
236         // what we need and nothing more.
237         close();
238     }
239 
240     public boolean hasBufferedInput() {
241         final SessionBufferStatus buffStatus = this.bufferStatus;
242         return buffStatus != null && buffStatus.hasBufferedInput();
243     }
244 
245     public boolean hasBufferedOutput() {
246         final SessionBufferStatus buffStatus = this.bufferStatus;
247         return buffStatus != null && buffStatus.hasBufferedOutput();
248     }
249 
250     public void setBufferStatus(final SessionBufferStatus bufferStatus) {
251         this.bufferStatus = bufferStatus;
252     }
253 
254     public Object getAttribute(final String name) {
255         return this.attributes.get(name);
256     }
257 
258     public Object removeAttribute(final String name) {
259         return this.attributes.remove(name);
260     }
261 
262     public void setAttribute(final String name, final Object obj) {
263         this.attributes.put(name, obj);
264     }
265 
266     public synchronized long getStartedTime() {
267         return this.startedTime;
268     }
269 
270     public synchronized long getLastReadTime() {
271         return this.lastReadTime;
272     }
273 
274     public synchronized long getLastWriteTime() {
275         return this.lastWriteTime;
276     }
277 
278     public synchronized long getLastAccessTime() {
279         return this.lastAccessTime;
280     }
281 
282     synchronized void resetLastRead() {
283         final long now = System.currentTimeMillis();
284         this.lastReadTime = now;
285         this.lastAccessTime = now;
286     }
287 
288     synchronized void resetLastWrite() {
289         final long now = System.currentTimeMillis();
290         this.lastWriteTime = now;
291         this.lastAccessTime = now;
292     }
293 
294     private static void formatOps(final StringBuilder buffer, final int ops) {
295         if ((ops & SelectionKey.OP_READ) > 0) {
296             buffer.append('r');
297         }
298         if ((ops & SelectionKey.OP_WRITE) > 0) {
299             buffer.append('w');
300         }
301         if ((ops & SelectionKey.OP_ACCEPT) > 0) {
302             buffer.append('a');
303         }
304         if ((ops & SelectionKey.OP_CONNECT) > 0) {
305             buffer.append('c');
306         }
307     }
308 
309     private static void formatAddress(final StringBuilder buffer, final SocketAddress socketAddress) {
310         if (socketAddress instanceof InetSocketAddress) {
311             final InetSocketAddress addr = ((InetSocketAddress) socketAddress);
312             buffer.append(addr.getAddress() != null ? addr.getAddress().getHostAddress() :
313                 addr.getAddress())
314             .append(':')
315             .append(addr.getPort());
316         } else {
317             buffer.append(socketAddress);
318         }
319     }
320 
321     @Override
322     public synchronized String toString() {
323         final StringBuilder buffer = new StringBuilder();
324         final SocketAddress remoteAddress = getRemoteAddress();
325         final SocketAddress localAddress = getLocalAddress();
326         if (remoteAddress != null && localAddress != null) {
327             formatAddress(buffer, localAddress);
328             buffer.append("<->");
329             formatAddress(buffer, remoteAddress);
330         }
331         buffer.append("[");
332         switch (this.status) {
333         case ACTIVE:
334             buffer.append("ACTIVE");
335             break;
336         case CLOSING:
337             buffer.append("CLOSING");
338             break;
339         case CLOSED:
340             buffer.append("CLOSED");
341             break;
342         }
343         buffer.append("][");
344         if (this.key.isValid()) {
345             formatOps(buffer, this.interestOpsCallback != null ?
346                     this.currentEventMask : this.key.interestOps());
347             buffer.append(":");
348             formatOps(buffer, this.key.readyOps());
349         }
350         buffer.append("]");
351         return buffer.toString();
352     }
353 
354     public Socket getSocket() {
355         if (this.channel instanceof SocketChannel) {
356             return ((SocketChannel) this.channel).socket();
357         } else {
358             return null;
359         }
360     }
361 
362 }