View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.net.SocketAddress;
31  import java.nio.channels.ByteChannel;
32  import java.nio.channels.SelectionKey;
33  import java.nio.channels.SocketChannel;
34  import java.util.Deque;
35  import java.util.concurrent.ConcurrentLinkedDeque;
36  import java.util.concurrent.atomic.AtomicInteger;
37  import java.util.concurrent.atomic.AtomicLong;
38  import java.util.concurrent.locks.Lock;
39  import java.util.concurrent.locks.ReentrantLock;
40  
41  import org.apache.hc.core5.io.CloseMode;
42  import org.apache.hc.core5.util.Args;
43  import org.apache.hc.core5.io.Closer;
44  
45  class IOSessionImpl implements IOSession {
46  
47      private final static AtomicLong COUNT = new AtomicLong(0);
48  
49      private final SelectionKey key;
50      private final SocketChannel channel;
51      private final Deque<Command> commandQueue;
52      private final Lock lock;
53      private final String id;
54      private final AtomicInteger status;
55  
56      private volatile IOEventHandler eventHandler;
57      private volatile int socketTimeout;
58      private volatile long lastReadTime;
59      private volatile long lastWriteTime;
60  
61      /**
62       * Creates new instance of IOSessionImpl.
63       *
64       * @param key the selection key.
65       * @param socketChannel the socket channel
66       */
67      public IOSessionImpl(final SelectionKey key, final SocketChannel socketChannel) {
68          super();
69          this.key = Args.notNull(key, "Selection key");
70          this.channel = Args.notNull(socketChannel, "Socket channel");
71          this.commandQueue = new ConcurrentLinkedDeque<>();
72          this.lock = new ReentrantLock();
73          this.socketTimeout = 0;
74          this.id = String.format("i/o-%08X", COUNT.getAndIncrement());
75          this.status = new AtomicInteger(ACTIVE);
76          this.lastReadTime = System.currentTimeMillis();
77          this.lastWriteTime = System.currentTimeMillis();
78      }
79  
80      @Override
81      public String getId() {
82          return id;
83      }
84  
85      @Override
86      public Lock lock() {
87          return lock;
88      }
89  
90      @Override
91      public void enqueue(final Command command, final Command.Priority priority) {
92          if (priority == Command.Priority.IMMEDIATE) {
93              commandQueue.addFirst(command);
94          } else {
95              commandQueue.add(command);
96          }
97          setEvent(SelectionKey.OP_WRITE);
98      }
99  
100     @Override
101     public boolean hasCommands() {
102         return !commandQueue.isEmpty();
103     }
104 
105     @Override
106     public Command poll() {
107         return commandQueue.poll();
108     }
109 
110     @Override
111     public ByteChannel channel() {
112         return this.channel;
113     }
114 
115     @Override
116     public SocketAddress getLocalAddress() {
117         return this.channel.socket().getLocalSocketAddress();
118     }
119 
120     @Override
121     public SocketAddress getRemoteAddress() {
122         return this.channel.socket().getRemoteSocketAddress();
123     }
124 
125     @Override
126     public int getEventMask() {
127         return this.key.interestOps();
128     }
129 
130     @Override
131     public void setEventMask(final int newValue) {
132         if (this.status.get() == CLOSED) {
133             return;
134         }
135         this.key.interestOps(newValue);
136         this.key.selector().wakeup();
137     }
138 
139     @Override
140     public void setEvent(final int op) {
141         if (this.status.get() == CLOSED) {
142             return;
143         }
144         lock.lock();
145         try {
146             this.key.interestOps(this.key.interestOps() | op);
147         } finally {
148             lock.unlock();
149         }
150         this.key.selector().wakeup();
151     }
152 
153     @Override
154     public void clearEvent(final int op) {
155         if (this.status.get() == CLOSED) {
156             return;
157         }
158         lock.lock();
159         try {
160             this.key.interestOps(this.key.interestOps() & ~op);
161         } finally {
162             lock.unlock();
163         }
164         this.key.selector().wakeup();
165     }
166 
167     @Override
168     public int getSocketTimeoutMillis() {
169         return this.socketTimeout;
170     }
171 
172     @Override
173     public void setSocketTimeoutMillis(final int timeout) {
174         this.socketTimeout = timeout;
175     }
176 
177     @Override
178     public void updateReadTime() {
179         lastReadTime = System.currentTimeMillis();
180     }
181 
182     @Override
183     public void updateWriteTime() {
184         lastWriteTime = System.currentTimeMillis();
185     }
186 
187     @Override
188     public long getLastReadTimeMillis() {
189         return lastReadTime;
190     }
191 
192     @Override
193     public long getLastWriteTime() {
194         return lastWriteTime;
195     }
196 
197     @Override
198     public void close() {
199         if (this.status.compareAndSet(ACTIVE, CLOSED)) {
200             this.key.cancel();
201             this.key.attach(null);
202             Closer.closeQuietly(this.key.channel());
203             if (this.key.selector().isOpen()) {
204                 this.key.selector().wakeup();
205             }
206         }
207     }
208 
209     @Override
210     public int getStatus() {
211         return this.status.get();
212     }
213 
214     @Override
215     public boolean isClosed() {
216         return this.status.get() == CLOSED || !this.channel.isOpen();
217     }
218 
219     @Override
220     public void close(final CloseMode closeMode) {
221         // For this type of session, a close() does exactly
222         // what we need and nothing more.
223         close();
224     }
225 
226     private static void formatOps(final StringBuilder buffer, final int ops) {
227         if ((ops & SelectionKey.OP_READ) > 0) {
228             buffer.append('r');
229         }
230         if ((ops & SelectionKey.OP_WRITE) > 0) {
231             buffer.append('w');
232         }
233         if ((ops & SelectionKey.OP_ACCEPT) > 0) {
234             buffer.append('a');
235         }
236         if ((ops & SelectionKey.OP_CONNECT) > 0) {
237             buffer.append('c');
238         }
239     }
240 
241     @Override
242     public String toString() {
243         final StringBuilder buffer = new StringBuilder();
244         buffer.append(id).append("[");
245         switch (this.status.get()) {
246         case ACTIVE:
247             buffer.append("ACTIVE");
248             break;
249         case CLOSING:
250             buffer.append("CLOSING");
251             break;
252         case CLOSED:
253             buffer.append("CLOSED");
254             break;
255         }
256         buffer.append("][");
257         if (this.key.isValid()) {
258             formatOps(buffer, this.key.interestOps());
259             buffer.append(":");
260             formatOps(buffer, this.key.readyOps());
261         }
262         buffer.append("]");
263         return buffer.toString();
264     }
265 
266 }