View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.net.SocketException;
33  import java.nio.ByteBuffer;
34  import java.nio.channels.ByteChannel;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.SocketChannel;
37  import java.util.Deque;
38  import java.util.concurrent.ConcurrentLinkedDeque;
39  import java.util.concurrent.atomic.AtomicLong;
40  import java.util.concurrent.atomic.AtomicReference;
41  import java.util.concurrent.locks.Lock;
42  import java.util.concurrent.locks.ReentrantLock;
43  
44  import org.apache.hc.core5.io.CloseMode;
45  import org.apache.hc.core5.io.Closer;
46  import org.apache.hc.core5.util.Args;
47  import org.apache.hc.core5.util.Timeout;
48  
49  class IOSessionImpl implements IOSession {
50  
51      /** Counts instances created. */
52      private final static AtomicLong COUNT = new AtomicLong(0);
53  
54      private final SelectionKey key;
55      private final SocketChannel channel;
56      private final Deque<Command> commandQueue;
57      private final Lock lock;
58      private final String id;
59      private final AtomicReference<IOEventHandler> handlerRef;
60      private final AtomicReference<IOSession.Status> status;
61  
62      private volatile Timeout socketTimeout;
63      private volatile long lastReadTime;
64      private volatile long lastWriteTime;
65      private volatile long lastEventTime;
66  
67      public IOSessionImpl(final String type, final SelectionKey key, final SocketChannel socketChannel) {
68          super();
69          this.key = Args.notNull(key, "Selection key");
70          this.channel = Args.notNull(socketChannel, "Socket channel");
71          this.commandQueue = new ConcurrentLinkedDeque<>();
72          this.lock = new ReentrantLock();
73          this.socketTimeout = Timeout.DISABLED;
74          this.id = String.format(type + "-%010d", COUNT.getAndIncrement());
75          this.handlerRef = new AtomicReference<>();
76          this.status = new AtomicReference<>(Status.ACTIVE);
77          final long currentTimeMillis = System.currentTimeMillis();
78          this.lastReadTime = currentTimeMillis;
79          this.lastWriteTime = currentTimeMillis;
80          this.lastEventTime = currentTimeMillis;
81      }
82  
83      @Override
84      public String getId() {
85          return id;
86      }
87  
88      @Override
89      public IOEventHandler getHandler() {
90          return handlerRef.get();
91      }
92  
93      @Override
94      public void upgrade(final IOEventHandler handler) {
95          handlerRef.set(handler);
96      }
97  
98      @Override
99      public Lock getLock() {
100         return lock;
101     }
102 
103     @Override
104     public void enqueue(final Command command, final Command.Priority priority) {
105         if (priority == Command.Priority.IMMEDIATE) {
106             commandQueue.addFirst(command);
107         } else {
108             commandQueue.add(command);
109         }
110         setEvent(SelectionKey.OP_WRITE);
111 
112         if (isStatusClosed()) {
113             command.cancel();
114         }
115     }
116 
117     @Override
118     public boolean hasCommands() {
119         return !commandQueue.isEmpty();
120     }
121 
122     @Override
123     public Command poll() {
124         return commandQueue.poll();
125     }
126 
127     @Override
128     public ByteChannel channel() {
129         return this.channel;
130     }
131 
132     @Override
133     public SocketAddress getLocalAddress() {
134         return this.channel.socket().getLocalSocketAddress();
135     }
136 
137     @Override
138     public SocketAddress getRemoteAddress() {
139         return this.channel.socket().getRemoteSocketAddress();
140     }
141 
142     @Override
143     public int getEventMask() {
144         return this.key.interestOps();
145     }
146 
147     @Override
148     public void setEventMask(final int newValue) {
149         lock.lock();
150         try {
151             if (isStatusClosed()) {
152                 return;
153             }
154             this.key.interestOps(newValue);
155         } finally {
156             lock.unlock();
157         }
158         this.key.selector().wakeup();
159     }
160 
161     @Override
162     public void setEvent(final int op) {
163         lock.lock();
164         try {
165             if (isStatusClosed()) {
166                 return;
167             }
168             this.key.interestOps(this.key.interestOps() | op);
169         } finally {
170             lock.unlock();
171         }
172         this.key.selector().wakeup();
173     }
174 
175     @Override
176     public void clearEvent(final int op) {
177         lock.lock();
178         try {
179             if (isStatusClosed()) {
180                 return;
181             }
182             this.key.interestOps(this.key.interestOps() & ~op);
183         } finally {
184             lock.unlock();
185         }
186         this.key.selector().wakeup();
187     }
188 
189     @Override
190     public Timeout getSocketTimeout() {
191         return this.socketTimeout;
192     }
193 
194     @Override
195     public void setSocketTimeout(final Timeout timeout) {
196         this.socketTimeout = Timeout.defaultsToDisabled(timeout);
197         this.lastEventTime = System.currentTimeMillis();
198     }
199 
200     @Override
201     public int read(final ByteBuffer dst) throws IOException {
202         return this.channel.read(dst);
203     }
204 
205     @Override
206     public int write(final ByteBuffer src) throws IOException {
207         return this.channel.write(src);
208     }
209 
210     @Override
211     public void updateReadTime() {
212         lastReadTime = System.currentTimeMillis();
213         lastEventTime = lastReadTime;
214     }
215 
216     @Override
217     public void updateWriteTime() {
218         lastWriteTime = System.currentTimeMillis();
219         lastEventTime = lastWriteTime;
220     }
221 
222     @Override
223     public long getLastReadTime() {
224         return lastReadTime;
225     }
226 
227     @Override
228     public long getLastWriteTime() {
229         return lastWriteTime;
230     }
231 
232     @Override
233     public long getLastEventTime() {
234         return lastEventTime;
235     }
236 
237     @Override
238     public Status getStatus() {
239         return this.status.get();
240     }
241 
242     private boolean isStatusClosed() {
243         return this.status.get() == Status.CLOSED;
244     }
245 
246     @Override
247     public boolean isOpen() {
248         return this.status.get() == Status.ACTIVE && this.channel.isOpen();
249     }
250 
251     @Override
252     public void close() {
253         close(CloseMode.GRACEFUL);
254     }
255 
256     @Override
257     public void close(final CloseMode closeMode) {
258         if (this.status.compareAndSet(Status.ACTIVE, Status.CLOSED)) {
259             if (closeMode == CloseMode.IMMEDIATE) {
260                 try {
261                     this.channel.socket().setSoLinger(true, 0);
262                 } catch (final SocketException e) {
263                     // Quietly ignore
264                 }
265             }
266             this.key.cancel();
267             this.key.attach(null);
268             Closer.closeQuietly(this.key.channel());
269             if (this.key.selector().isOpen()) {
270                 this.key.selector().wakeup();
271             }
272         }
273     }
274 
275     private static void formatOps(final StringBuilder buffer, final int ops) {
276         if ((ops & SelectionKey.OP_READ) > 0) {
277             buffer.append('r');
278         }
279         if ((ops & SelectionKey.OP_WRITE) > 0) {
280             buffer.append('w');
281         }
282         if ((ops & SelectionKey.OP_ACCEPT) > 0) {
283             buffer.append('a');
284         }
285         if ((ops & SelectionKey.OP_CONNECT) > 0) {
286             buffer.append('c');
287         }
288     }
289 
290     @Override
291     public String toString() {
292         final StringBuilder buffer = new StringBuilder();
293         buffer.append(id).append("[");
294         buffer.append(this.status);
295         buffer.append("][");
296         if (this.key.isValid()) {
297             formatOps(buffer, this.key.interestOps());
298             buffer.append(":");
299             formatOps(buffer, this.key.readyOps());
300         }
301         buffer.append("]");
302         return buffer.toString();
303     }
304 
305 }