View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.nio.channels.ByteChannel;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.SocketChannel;
37  import java.util.Collections;
38  import java.util.HashMap;
39  import java.util.Map;
40  
41  import org.apache.http.annotation.ThreadSafe;
42  import org.apache.http.nio.reactor.IOSession;
43  import org.apache.http.nio.reactor.SessionBufferStatus;
44  import org.apache.http.nio.reactor.SocketAccessor;
45  import org.apache.http.util.Args;
46  
47  /**
48   * Default implementation of {@link IOSession}.
49   *
50   * @since 4.0
51   */
52  @ThreadSafe
53  public class IOSessionImpl implements IOSession, SocketAccessor {
54  
55      private final SelectionKey key;
56      private final ByteChannel channel;
57      private final Map<String, Object> attributes;
58      private final InterestOpsCallback interestOpsCallback;
59      private final SessionClosedCallback sessionClosedCallback;
60  
61      private volatile int status;
62      private volatile int currentEventMask;
63      private volatile SessionBufferStatus bufferStatus;
64      private volatile int socketTimeout;
65  
66      private final long startedTime;
67  
68      private volatile long lastReadTime;
69      private volatile long lastWriteTime;
70      private volatile long lastAccessTime;
71  
72      /**
73       * Creates new instance of IOSessionImpl.
74       *
75       * @param key the selection key.
76       * @param interestOpsCallback interestOps callback.
77       * @param sessionClosedCallback session closed callback.
78       *
79       * @since 4.1
80       */
81      public IOSessionImpl(
82              final SelectionKey key,
83              final InterestOpsCallback interestOpsCallback,
84              final SessionClosedCallback sessionClosedCallback) {
85          super();
86          Args.notNull(key, "Selection key");
87          this.key = key;
88          this.channel = (ByteChannel) this.key.channel();
89          this.interestOpsCallback = interestOpsCallback;
90          this.sessionClosedCallback = sessionClosedCallback;
91          this.attributes = Collections.synchronizedMap(new HashMap<String, Object>());
92          this.currentEventMask = key.interestOps();
93          this.socketTimeout = 0;
94          this.status = ACTIVE;
95          final long now = System.currentTimeMillis();
96          this.startedTime = now;
97          this.lastReadTime = now;
98          this.lastWriteTime = now;
99          this.lastAccessTime = now;
100     }
101 
102     /**
103      * Creates new instance of IOSessionImpl.
104      *
105      * @param key the selection key.
106      * @param sessionClosedCallback session closed callback.
107      */
108     public IOSessionImpl(
109             final SelectionKey key,
110             final SessionClosedCallback sessionClosedCallback) {
111         this(key, null, sessionClosedCallback);
112     }
113 
114     @Override
115     public ByteChannel channel() {
116         return this.channel;
117     }
118 
119     @Override
120     public SocketAddress getLocalAddress() {
121         if (this.channel instanceof SocketChannel) {
122             return ((SocketChannel)this.channel).socket().getLocalSocketAddress();
123         } else {
124             return null;
125         }
126     }
127 
128     @Override
129     public SocketAddress getRemoteAddress() {
130         if (this.channel instanceof SocketChannel) {
131             return ((SocketChannel)this.channel).socket().getRemoteSocketAddress();
132         } else {
133             return null;
134         }
135     }
136 
137     @Override
138     public int getEventMask() {
139         return this.interestOpsCallback != null ? this.currentEventMask : this.key.interestOps();
140     }
141 
142     @Override
143     public synchronized void setEventMask(final int ops) {
144         if (this.status == CLOSED) {
145             return;
146         }
147         if (this.interestOpsCallback != null) {
148             // update the current event mask
149             this.currentEventMask = ops;
150 
151             // local variable
152             final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
153 
154             // add this operation to the interestOps() queue
155             this.interestOpsCallback.addInterestOps(entry);
156         } else {
157             this.key.interestOps(ops);
158         }
159         this.key.selector().wakeup();
160     }
161 
162     @Override
163     public synchronized void setEvent(final int op) {
164         if (this.status == CLOSED) {
165             return;
166         }
167         if (this.interestOpsCallback != null) {
168             // update the current event mask
169             this.currentEventMask |= op;
170 
171             // local variable
172             final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
173 
174             // add this operation to the interestOps() queue
175             this.interestOpsCallback.addInterestOps(entry);
176         } else {
177             final int ops = this.key.interestOps();
178             this.key.interestOps(ops | op);
179         }
180         this.key.selector().wakeup();
181     }
182 
183     @Override
184     public synchronized void clearEvent(final int op) {
185         if (this.status == CLOSED) {
186             return;
187         }
188         if (this.interestOpsCallback != null) {
189             // update the current event mask
190             this.currentEventMask &= ~op;
191 
192             // local variable
193             final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
194 
195             // add this operation to the interestOps() queue
196             this.interestOpsCallback.addInterestOps(entry);
197         } else {
198             final int ops = this.key.interestOps();
199             this.key.interestOps(ops & ~op);
200         }
201         this.key.selector().wakeup();
202     }
203 
204     @Override
205     public int getSocketTimeout() {
206         return this.socketTimeout;
207     }
208 
209     @Override
210     public void setSocketTimeout(final int timeout) {
211         this.socketTimeout = timeout;
212         this.lastAccessTime = System.currentTimeMillis();
213     }
214 
215     @Override
216     public void close() {
217         synchronized (this) {
218             if (this.status == CLOSED) {
219                 return;
220             }
221             this.status = CLOSED;
222         }
223         synchronized (this.key) {
224             this.key.cancel();
225             try {
226                 this.key.channel().close();
227             } catch (final IOException ex) {
228                 // Munching exceptions is not nice
229                 // but in this case it is justified
230             }
231             if (this.sessionClosedCallback != null) {
232                 this.sessionClosedCallback.sessionClosed(this);
233             }
234             if (this.key.selector().isOpen()) {
235                 this.key.selector().wakeup();
236             }
237         }
238     }
239 
240     @Override
241     public int getStatus() {
242         return this.status;
243     }
244 
245     @Override
246     public boolean isClosed() {
247         return this.status == CLOSED;
248     }
249 
250     @Override
251     public void shutdown() {
252         // For this type of session, a close() does exactly
253         // what we need and nothing more.
254         close();
255     }
256 
257     @Override
258     public boolean hasBufferedInput() {
259         final SessionBufferStatus buffStatus = this.bufferStatus;
260         return buffStatus != null && buffStatus.hasBufferedInput();
261     }
262 
263     @Override
264     public boolean hasBufferedOutput() {
265         final SessionBufferStatus buffStatus = this.bufferStatus;
266         return buffStatus != null && buffStatus.hasBufferedOutput();
267     }
268 
269     @Override
270     public void setBufferStatus(final SessionBufferStatus bufferStatus) {
271         this.bufferStatus = bufferStatus;
272     }
273 
274     @Override
275     public Object getAttribute(final String name) {
276         return this.attributes.get(name);
277     }
278 
279     @Override
280     public Object removeAttribute(final String name) {
281         return this.attributes.remove(name);
282     }
283 
284     @Override
285     public void setAttribute(final String name, final Object obj) {
286         this.attributes.put(name, obj);
287     }
288 
289     public long getStartedTime() {
290         return this.startedTime;
291     }
292 
293     public long getLastReadTime() {
294         return this.lastReadTime;
295     }
296 
297     public long getLastWriteTime() {
298         return this.lastWriteTime;
299     }
300 
301     public long getLastAccessTime() {
302         return this.lastAccessTime;
303     }
304 
305     void resetLastRead() {
306         final long now = System.currentTimeMillis();
307         this.lastReadTime = now;
308         this.lastAccessTime = now;
309     }
310 
311     void resetLastWrite() {
312         final long now = System.currentTimeMillis();
313         this.lastWriteTime = now;
314         this.lastAccessTime = now;
315     }
316 
317     private static void formatOps(final StringBuilder buffer, final int ops) {
318         if ((ops & SelectionKey.OP_READ) > 0) {
319             buffer.append('r');
320         }
321         if ((ops & SelectionKey.OP_WRITE) > 0) {
322             buffer.append('w');
323         }
324         if ((ops & SelectionKey.OP_ACCEPT) > 0) {
325             buffer.append('a');
326         }
327         if ((ops & SelectionKey.OP_CONNECT) > 0) {
328             buffer.append('c');
329         }
330     }
331 
332     private static void formatAddress(final StringBuilder buffer, final SocketAddress socketAddress) {
333         if (socketAddress instanceof InetSocketAddress) {
334             final InetSocketAddress addr = ((InetSocketAddress) socketAddress);
335             buffer.append(addr.getAddress() != null ? addr.getAddress().getHostAddress() :
336                 addr.getAddress())
337             .append(':')
338             .append(addr.getPort());
339         } else {
340             buffer.append(socketAddress);
341         }
342     }
343 
344     @Override
345     public String toString() {
346         final StringBuilder buffer = new StringBuilder();
347         synchronized (this.key) {
348             final SocketAddress remoteAddress = getRemoteAddress();
349             final SocketAddress localAddress = getLocalAddress();
350             if (remoteAddress != null && localAddress != null) {
351                 formatAddress(buffer, localAddress);
352                 buffer.append("<->");
353                 formatAddress(buffer, remoteAddress);
354             }
355             buffer.append('[');
356             switch (this.status) {
357             case ACTIVE:
358                 buffer.append("ACTIVE");
359                 break;
360             case CLOSING:
361                 buffer.append("CLOSING");
362                 break;
363             case CLOSED:
364                 buffer.append("CLOSED");
365                 break;
366             }
367             buffer.append("][");
368             if (this.key.isValid()) {
369                 formatOps(buffer, this.interestOpsCallback != null ?
370                         this.currentEventMask : this.key.interestOps());
371                 buffer.append(':');
372                 formatOps(buffer, this.key.readyOps());
373             }
374         }
375         buffer.append(']');
376         return new String(buffer);
377     }
378 
379     @Override
380     public Socket getSocket() {
381         if (this.channel instanceof SocketChannel) {
382             return ((SocketChannel) this.channel).socket();
383         } else {
384             return null;
385         }
386     }
387 
388 }