View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.nio.channels.ByteChannel;
35  import java.nio.channels.Channel;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.SocketChannel;
38  import java.util.Collections;
39  import java.util.HashMap;
40  import java.util.Map;
41  
42  import org.apache.http.annotation.ThreadSafe;
43  import org.apache.http.nio.reactor.IOSession;
44  import org.apache.http.nio.reactor.SessionBufferStatus;
45  import org.apache.http.nio.reactor.SocketAccessor;
46  
47  /**
48   * Default implementation of {@link IOSession}.
49   *
50   * @since 4.0
51   */
52  @ThreadSafe
53  public class IOSessionImpl implements IOSession, SocketAccessor {
54  
55      private final SelectionKey key;
56      private final ByteChannel channel;
57      private final Map<String, Object> attributes;
58      private final InterestOpsCallback interestOpsCallback;
59      private final SessionClosedCallback sessionClosedCallback;
60  
61      private volatile int status;
62      private volatile int currentEventMask;
63      private volatile SessionBufferStatus bufferStatus;
64      private volatile int socketTimeout;
65  
66      private final long startedTime;
67  
68      private long lastReadTime;
69      private long lastWriteTime;
70      private long lastAccessTime;
71  
72      /**
73       * Creates new instance of IOSessionImpl.
74       *
75       * @param key the selection key.
76       * @param interestOpsCallback interestOps callback.
77       * @param sessionClosedCallback session closed callback.
78       *
79       * @since 4.1
80       */
81      public IOSessionImpl(
82              final SelectionKey key,
83              final InterestOpsCallback interestOpsCallback,
84              final SessionClosedCallback sessionClosedCallback) {
85          super();
86          if (key == null) {
87              throw new IllegalArgumentException("Selection key may not be null");
88          }
89          this.key = key;
90          this.channel = (ByteChannel) this.key.channel();
91          this.interestOpsCallback = interestOpsCallback;
92          this.sessionClosedCallback = sessionClosedCallback;
93          this.attributes = Collections.synchronizedMap(new HashMap<String, Object>());
94          this.currentEventMask = key.interestOps();
95          this.socketTimeout = 0;
96          this.status = ACTIVE;
97          long now = System.currentTimeMillis();
98          this.startedTime = now;
99          this.lastReadTime = now;
100         this.lastWriteTime = now;
101         this.lastAccessTime = now;
102     }
103 
104     /**
105      * Creates new instance of IOSessionImpl.
106      *
107      * @param key the selection key.
108      * @param sessionClosedCallback session closed callback.
109      */
110     public IOSessionImpl(
111             final SelectionKey key,
112             final SessionClosedCallback sessionClosedCallback) {
113         this(key, null, sessionClosedCallback);
114     }
115 
116     public ByteChannel channel() {
117         return this.channel;
118     }
119 
120     public SocketAddress getLocalAddress() {
121         Channel channel = this.channel;
122         if (channel instanceof SocketChannel) {
123             return ((SocketChannel)channel).socket().getLocalSocketAddress();
124         } else {
125             return null;
126         }
127     }
128 
129     public SocketAddress getRemoteAddress() {
130         Channel channel = this.channel;
131         if (channel instanceof SocketChannel) {
132             return ((SocketChannel)channel).socket().getRemoteSocketAddress();
133         } else {
134             return null;
135         }
136     }
137 
138     public synchronized int getEventMask() {
139         return this.interestOpsCallback != null ? this.currentEventMask : this.key.interestOps();
140     }
141 
142     public synchronized void setEventMask(int ops) {
143         if (this.status == CLOSED) {
144             return;
145         }
146         if (this.interestOpsCallback != null) {
147             // update the current event mask
148             this.currentEventMask = ops;
149 
150             // local variable
151             InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
152 
153             // add this operation to the interestOps() queue
154             this.interestOpsCallback.addInterestOps(entry);
155         } else {
156             this.key.interestOps(ops);
157         }
158         this.key.selector().wakeup();
159     }
160 
161     public synchronized void setEvent(int op) {
162         if (this.status == CLOSED) {
163             return;
164         }
165         if (this.interestOpsCallback != null) {
166             // update the current event mask
167             this.currentEventMask |= op;
168 
169             // local variable
170             InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
171 
172             // add this operation to the interestOps() queue
173             this.interestOpsCallback.addInterestOps(entry);
174         } else {
175             int ops = this.key.interestOps();
176             this.key.interestOps(ops | op);
177         }
178         this.key.selector().wakeup();
179     }
180 
181     public synchronized void clearEvent(int op) {
182         if (this.status == CLOSED) {
183             return;
184         }
185         if (this.interestOpsCallback != null) {
186             // update the current event mask
187             this.currentEventMask &= ~op;
188 
189             // local variable
190             InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
191 
192             // add this operation to the interestOps() queue
193             this.interestOpsCallback.addInterestOps(entry);
194         } else {
195             int ops = this.key.interestOps();
196             this.key.interestOps(ops & ~op);
197         }
198         this.key.selector().wakeup();
199     }
200 
201     public int getSocketTimeout() {
202         return this.socketTimeout;
203     }
204 
205     public synchronized void setSocketTimeout(int timeout) {
206         this.socketTimeout = timeout;
207         this.lastAccessTime = System.currentTimeMillis();
208     }
209 
210     public synchronized void close() {
211         if (this.status == CLOSED) {
212             return;
213         }
214         this.status = CLOSED;
215         this.key.cancel();
216         try {
217             this.key.channel().close();
218         } catch (IOException ex) {
219             // Munching exceptions is not nice
220             // but in this case it is justified
221         }
222         if (this.sessionClosedCallback != null) {
223             this.sessionClosedCallback.sessionClosed(this);
224         }
225         if (this.key.selector().isOpen()) {
226             this.key.selector().wakeup();
227         }
228     }
229 
230     public int getStatus() {
231         return this.status;
232     }
233 
234     public boolean isClosed() {
235         return this.status == CLOSED;
236     }
237 
238     public void shutdown() {
239         // For this type of session, a close() does exactly
240         // what we need and nothing more.
241         close();
242     }
243 
244     public boolean hasBufferedInput() {
245         SessionBufferStatus bufferStatus = this.bufferStatus;
246         return bufferStatus != null && bufferStatus.hasBufferedInput();
247     }
248 
249     public boolean hasBufferedOutput() {
250         SessionBufferStatus bufferStatus = this.bufferStatus;
251         return bufferStatus != null && bufferStatus.hasBufferedOutput();
252     }
253 
254     public void setBufferStatus(final SessionBufferStatus bufferStatus) {
255         this.bufferStatus = bufferStatus;
256     }
257 
258     public Object getAttribute(final String name) {
259         return this.attributes.get(name);
260     }
261 
262     public Object removeAttribute(final String name) {
263         return this.attributes.remove(name);
264     }
265 
266     public void setAttribute(final String name, final Object obj) {
267         this.attributes.put(name, obj);
268     }
269 
270     public synchronized long getStartedTime() {
271         return this.startedTime;
272     }
273 
274     public synchronized long getLastReadTime() {
275         return this.lastReadTime;
276     }
277 
278     public synchronized long getLastWriteTime() {
279         return this.lastWriteTime;
280     }
281 
282     public synchronized long getLastAccessTime() {
283         return this.lastAccessTime;
284     }
285 
286     synchronized void resetLastRead() {
287         long now = System.currentTimeMillis();
288         this.lastReadTime = now;
289         this.lastAccessTime = now;
290     }
291 
292     synchronized void resetLastWrite() {
293         long now = System.currentTimeMillis();
294         this.lastWriteTime = now;
295         this.lastAccessTime = now;
296     }
297 
298     private static void formatOps(final StringBuilder buffer, int ops) {
299         if ((ops & SelectionKey.OP_READ) > 0) {
300             buffer.append('r');
301         }
302         if ((ops & SelectionKey.OP_WRITE) > 0) {
303             buffer.append('w');
304         }
305         if ((ops & SelectionKey.OP_ACCEPT) > 0) {
306             buffer.append('a');
307         }
308         if ((ops & SelectionKey.OP_CONNECT) > 0) {
309             buffer.append('c');
310         }
311     }
312 
313     private static void formatAddress(final StringBuilder buffer, final SocketAddress socketAddress) {
314         if (socketAddress instanceof InetSocketAddress) {
315             InetSocketAddress addr = ((InetSocketAddress) socketAddress);
316             buffer.append(addr.getAddress() != null ? addr.getAddress().getHostAddress() :
317                 addr.getAddress())
318             .append(':')
319             .append(addr.getPort());
320         } else {
321             buffer.append(socketAddress);
322         }
323     }
324 
325     @Override
326     public synchronized String toString() {
327         StringBuilder buffer = new StringBuilder();
328         SocketAddress remoteAddress = getRemoteAddress();
329         SocketAddress localAddress = getLocalAddress();
330         if (remoteAddress != null && localAddress != null) {
331             formatAddress(buffer, localAddress);
332             buffer.append("<->");
333             formatAddress(buffer, remoteAddress);
334         }
335         buffer.append("[");
336         switch (this.status) {
337         case ACTIVE:
338             buffer.append("ACTIVE");
339             break;
340         case CLOSING:
341             buffer.append("CLOSING");
342             break;
343         case CLOSED:
344             buffer.append("CLOSED");
345             break;
346         }
347         buffer.append("][");
348         if (this.key.isValid()) {
349             formatOps(buffer, this.interestOpsCallback != null ?
350                     this.currentEventMask : this.key.interestOps());
351             buffer.append(":");
352             formatOps(buffer, this.key.readyOps());
353         }
354         buffer.append("]");
355         return buffer.toString();
356     }
357     
358     public Socket getSocket() {
359         Channel channel = this.channel;
360         if (channel instanceof SocketChannel) {
361             return ((SocketChannel) channel).socket();
362         } else {
363             return null;
364         }
365     }
366 
367 }