View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.net.SocketAddress;
31  import java.net.SocketException;
32  import java.nio.channels.ByteChannel;
33  import java.nio.channels.SelectionKey;
34  import java.nio.channels.SocketChannel;
35  import java.util.Deque;
36  import java.util.concurrent.ConcurrentLinkedDeque;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.atomic.AtomicLong;
39  import java.util.concurrent.locks.Lock;
40  import java.util.concurrent.locks.ReentrantLock;
41  
42  import org.apache.hc.core5.io.CloseMode;
43  import org.apache.hc.core5.io.Closer;
44  import org.apache.hc.core5.util.Args;
45  import org.apache.hc.core5.util.Timeout;
46  
47  class IOSessionImpl implements IOSession {
48  
49      private final static AtomicLong COUNT = new AtomicLong(0);
50  
51      private final SelectionKey key;
52      private final SocketChannel channel;
53      private final Deque<Command> commandQueue;
54      private final Lock lock;
55      private final String id;
56      private final AtomicInteger status;
57  
58      private volatile Timeout socketTimeout;
59      private volatile long lastReadTime;
60      private volatile long lastWriteTime;
61  
62      /**
63       * Creates new instance of IOSessionImpl.
64       *
65       * @param key the selection key.
66       * @param socketChannel the socket channel
67       */
68      public IOSessionImpl(final SelectionKey key, final SocketChannel socketChannel) {
69          super();
70          this.key = Args.notNull(key, "Selection key");
71          this.channel = Args.notNull(socketChannel, "Socket channel");
72          this.commandQueue = new ConcurrentLinkedDeque<>();
73          this.lock = new ReentrantLock();
74          this.socketTimeout = Timeout.DISABLED;
75          this.id = String.format("i/o-%08X", COUNT.getAndIncrement());
76          this.status = new AtomicInteger(ACTIVE);
77          final long currentTimeMillis = System.currentTimeMillis();
78          this.lastReadTime = currentTimeMillis;
79          this.lastWriteTime = currentTimeMillis;
80      }
81  
82      @Override
83      public String getId() {
84          return id;
85      }
86  
87      @Override
88      public Lock lock() {
89          return lock;
90      }
91  
92      @Override
93      public void enqueue(final Command command, final Command.Priority priority) {
94          if (priority == Command.Priority.IMMEDIATE) {
95              commandQueue.addFirst(command);
96          } else {
97              commandQueue.add(command);
98          }
99          setEvent(SelectionKey.OP_WRITE);
100     }
101 
102     @Override
103     public boolean hasCommands() {
104         return !commandQueue.isEmpty();
105     }
106 
107     @Override
108     public Command poll() {
109         return commandQueue.poll();
110     }
111 
112     @Override
113     public ByteChannel channel() {
114         return this.channel;
115     }
116 
117     @Override
118     public SocketAddress getLocalAddress() {
119         return this.channel.socket().getLocalSocketAddress();
120     }
121 
122     @Override
123     public SocketAddress getRemoteAddress() {
124         return this.channel.socket().getRemoteSocketAddress();
125     }
126 
127     @Override
128     public int getEventMask() {
129         return this.key.interestOps();
130     }
131 
132     @Override
133     public void setEventMask(final int newValue) {
134         if (isStatusClosed()) {
135             return;
136         }
137         this.key.interestOps(newValue);
138         this.key.selector().wakeup();
139     }
140 
141     @Override
142     public void setEvent(final int op) {
143         if (isStatusClosed()) {
144             return;
145         }
146         lock.lock();
147         try {
148             this.key.interestOps(this.key.interestOps() | op);
149         } finally {
150             lock.unlock();
151         }
152         this.key.selector().wakeup();
153     }
154 
155     @Override
156     public void clearEvent(final int op) {
157         if (isStatusClosed()) {
158             return;
159         }
160         lock.lock();
161         try {
162             this.key.interestOps(this.key.interestOps() & ~op);
163         } finally {
164             lock.unlock();
165         }
166         this.key.selector().wakeup();
167     }
168 
169     @Override
170     public Timeout getSocketTimeout() {
171         return this.socketTimeout;
172     }
173 
174     @Override
175     public void setSocketTimeout(final Timeout timeout) {
176         this.socketTimeout = Timeout.defaultsToDisabled(timeout);
177     }
178 
179     @Override
180     public void updateReadTime() {
181         lastReadTime = System.currentTimeMillis();
182     }
183 
184     @Override
185     public void updateWriteTime() {
186         lastWriteTime = System.currentTimeMillis();
187     }
188 
189     @Override
190     public long getLastReadTime() {
191         return lastReadTime;
192     }
193 
194     @Override
195     public long getLastWriteTime() {
196         return lastWriteTime;
197     }
198 
199     @Override
200     public void close() {
201         if (this.status.compareAndSet(ACTIVE, CLOSED)) {
202             this.key.cancel();
203             this.key.attach(null);
204             Closer.closeQuietly(this.key.channel());
205             if (this.key.selector().isOpen()) {
206                 this.key.selector().wakeup();
207             }
208         }
209     }
210 
211     @Override
212     public int getStatus() {
213         return this.status.get();
214     }
215 
216     @Override
217     public boolean isClosed() {
218         return isStatusClosed() || !this.channel.isOpen();
219     }
220 
221     private boolean isStatusClosed() {
222         return this.status.get() == CLOSED;
223     }
224 
225     @Override
226     public void close(final CloseMode closeMode) {
227         if (closeMode == CloseMode.IMMEDIATE) {
228             try {
229                 this.channel.socket().setSoLinger(true, 0);
230             } catch (final SocketException e) {
231                 // Quietly ignore
232             }
233         }
234         close();
235     }
236 
237     private static void formatOps(final StringBuilder buffer, final int ops) {
238         if ((ops & SelectionKey.OP_READ) > 0) {
239             buffer.append('r');
240         }
241         if ((ops & SelectionKey.OP_WRITE) > 0) {
242             buffer.append('w');
243         }
244         if ((ops & SelectionKey.OP_ACCEPT) > 0) {
245             buffer.append('a');
246         }
247         if ((ops & SelectionKey.OP_CONNECT) > 0) {
248             buffer.append('c');
249         }
250     }
251 
252     @Override
253     public String toString() {
254         final StringBuilder buffer = new StringBuilder();
255         buffer.append(id).append("[");
256         switch (this.status.get()) {
257         case ACTIVE:
258             buffer.append("ACTIVE");
259             break;
260         case CLOSING:
261             buffer.append("CLOSING");
262             break;
263         case CLOSED:
264             buffer.append("CLOSED");
265             break;
266         }
267         buffer.append("][");
268         if (this.key.isValid()) {
269             formatOps(buffer, this.key.interestOps());
270             buffer.append(":");
271             formatOps(buffer, this.key.readyOps());
272         }
273         buffer.append("]");
274         return buffer.toString();
275     }
276 
277 }