View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.channels.ByteChannel;
33  import java.nio.channels.SelectionKey;
34  import java.nio.channels.SocketChannel;
35  import java.util.Deque;
36  import java.util.concurrent.ConcurrentLinkedDeque;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.atomic.AtomicLong;
39  
40  import org.apache.hc.core5.io.ShutdownType;
41  import org.apache.hc.core5.util.Args;
42  
43  class IOSessionImpl implements IOSession {
44  
45      private final static AtomicLong COUNT = new AtomicLong(0);
46  
47      private final SelectionKey key;
48      private final SocketChannel channel;
49      private final String id;
50      private final AtomicInteger status;
51      private final Deque<Command> commandQueue;
52  
53      private volatile IOEventHandler eventHandler;
54      private volatile int socketTimeout;
55      private volatile long lastReadTime;
56      private volatile long lastWriteTime;
57  
58      /**
59       * Creates new instance of IOSessionImpl.
60       *
61       * @param key the selection key.
62       * @param socketChannel the socket channel
63       */
64      public IOSessionImpl(final SelectionKey key, final SocketChannel socketChannel) {
65          super();
66          this.key = Args.notNull(key, "Selection key");
67          this.channel = Args.notNull(socketChannel, "Socket channel");
68          this.commandQueue = new ConcurrentLinkedDeque<>();
69          this.socketTimeout = 0;
70          this.id = String.format("i/o-%08X", COUNT.getAndIncrement());
71          this.status = new AtomicInteger(ACTIVE);
72          this.lastReadTime = System.currentTimeMillis();
73          this.lastWriteTime = System.currentTimeMillis();
74      }
75  
76      @Override
77      public String getId() {
78          return id;
79      }
80  
81      @Override
82      public IOEventHandler getHandler() {
83          return this.eventHandler;
84      }
85  
86      @Override
87      public void setHandler(final IOEventHandler handler) {
88          this.eventHandler = handler;
89      }
90  
91      @Override
92      public void addLast(final Command command) {
93          commandQueue.addLast(command);
94          setEvent(SelectionKey.OP_WRITE);
95      }
96  
97      @Override
98      public void addFirst(final Command command) {
99          commandQueue.addFirst(command);
100         setEvent(SelectionKey.OP_WRITE);
101     }
102 
103     @Override
104     public Command getCommand() {
105         return commandQueue.poll();
106     }
107 
108     @Override
109     public ByteChannel channel() {
110         return this.channel;
111     }
112 
113     @Override
114     public SocketAddress getLocalAddress() {
115         return this.channel.socket().getLocalSocketAddress();
116     }
117 
118     @Override
119     public SocketAddress getRemoteAddress() {
120         return this.channel.socket().getRemoteSocketAddress();
121     }
122 
123     @Override
124     public int getEventMask() {
125         return this.key.interestOps();
126     }
127 
128     @Override
129     public void setEventMask(final int newValue) {
130         if (this.status.get() == CLOSED) {
131             return;
132         }
133         synchronized (this.key) {
134             this.key.interestOps(newValue);
135             this.key.selector().wakeup();
136         }
137     }
138 
139     @Override
140     public void setEvent(final int op) {
141         if (this.status.get() == CLOSED) {
142             return;
143         }
144         synchronized (this.key) {
145             this.key.interestOps(this.key.interestOps() | op);
146             this.key.selector().wakeup();
147         }
148     }
149 
150     @Override
151     public void clearEvent(final int op) {
152         if (this.status.get() == CLOSED) {
153             return;
154         }
155         synchronized (this.key) {
156             this.key.interestOps(this.key.interestOps() & ~op);
157             this.key.selector().wakeup();
158         }
159     }
160 
161     @Override
162     public int getSocketTimeout() {
163         return this.socketTimeout;
164     }
165 
166     @Override
167     public void setSocketTimeout(final int timeout) {
168         this.socketTimeout = timeout;
169     }
170 
171     @Override
172     public void updateReadTime() {
173         lastReadTime = System.currentTimeMillis();
174     }
175 
176     @Override
177     public void updateWriteTime() {
178         lastWriteTime = System.currentTimeMillis();
179     }
180 
181     @Override
182     public long getLastReadTime() {
183         return lastReadTime;
184     }
185 
186     @Override
187     public long getLastWriteTime() {
188         return lastWriteTime;
189     }
190 
191     @Override
192     public void close() {
193         if (this.status.compareAndSet(ACTIVE, CLOSED)) {
194             this.key.cancel();
195             this.key.attach(null);
196             try {
197                 this.key.channel().close();
198             } catch (final IOException ignore) {
199             }
200             if (this.key.selector().isOpen()) {
201                 this.key.selector().wakeup();
202             }
203         }
204     }
205 
206     @Override
207     public int getStatus() {
208         return this.status.get();
209     }
210 
211     @Override
212     public boolean isClosed() {
213         return this.status.get() == CLOSED || !this.channel.isOpen();
214     }
215 
216     @Override
217     public void shutdown(final ShutdownType shutdownType) {
218         // For this type of session, a close() does exactly
219         // what we need and nothing more.
220         close();
221     }
222 
223     private static void formatOps(final StringBuilder buffer, final int ops) {
224         if ((ops & SelectionKey.OP_READ) > 0) {
225             buffer.append('r');
226         }
227         if ((ops & SelectionKey.OP_WRITE) > 0) {
228             buffer.append('w');
229         }
230         if ((ops & SelectionKey.OP_ACCEPT) > 0) {
231             buffer.append('a');
232         }
233         if ((ops & SelectionKey.OP_CONNECT) > 0) {
234             buffer.append('c');
235         }
236     }
237 
238     @Override
239     public String toString() {
240         final StringBuilder buffer = new StringBuilder();
241         buffer.append(id).append("[");
242         switch (this.status.get()) {
243         case ACTIVE:
244             buffer.append("ACTIVE");
245             break;
246         case CLOSING:
247             buffer.append("CLOSING");
248             break;
249         case CLOSED:
250             buffer.append("CLOSED");
251             break;
252         }
253         buffer.append("][");
254         if (this.key.isValid()) {
255             formatOps(buffer, this.key.interestOps());
256             buffer.append(":");
257             formatOps(buffer, this.key.readyOps());
258         }
259         buffer.append("]");
260         return buffer.toString();
261     }
262 
263 }