View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ByteChannel;
34  import java.nio.channels.SelectionKey;
35  import java.util.Queue;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  import java.util.concurrent.atomic.AtomicReference;
38  import java.util.concurrent.locks.Lock;
39  
40  import javax.net.ssl.SSLContext;
41  
42  import org.apache.hc.core5.function.Callback;
43  import org.apache.hc.core5.function.Decorator;
44  import org.apache.hc.core5.io.CloseMode;
45  import org.apache.hc.core5.net.NamedEndpoint;
46  import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
47  import org.apache.hc.core5.reactor.ssl.SSLIOSession;
48  import org.apache.hc.core5.reactor.ssl.SSLMode;
49  import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
50  import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
51  import org.apache.hc.core5.reactor.ssl.TlsDetails;
52  import org.apache.hc.core5.util.Asserts;
53  import org.apache.hc.core5.util.Timeout;
54  
55  final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
56  
57      private final IOSession ioSession;
58      private final NamedEndpoint initialEndpoint;
59      private final Decorator<IOSession> ioSessionDecorator;
60      private final IOSessionListener sessionListener;
61      private final Queue<InternalDataChannel> closedSessions;
62      private final AtomicReference<SSLIOSession> tlsSessionRef;
63      private final AtomicReference<IOSession> currentSessionRef;
64      private final AtomicReference<IOEventHandler> eventHandlerRef;
65      private final AtomicBoolean closed;
66  
67      InternalDataChannel(
68              final IOSession ioSession,
69              final NamedEndpoint initialEndpoint,
70              final Decorator<IOSession> ioSessionDecorator,
71              final IOSessionListener sessionListener,
72              final Queue<InternalDataChannel> closedSessions) {
73          this.ioSession = ioSession;
74          this.initialEndpoint = initialEndpoint;
75          this.closedSessions = closedSessions;
76          this.ioSessionDecorator = ioSessionDecorator;
77          this.sessionListener = sessionListener;
78          this.tlsSessionRef = new AtomicReference<>();
79          this.currentSessionRef = new AtomicReference<>(
80                  ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
81          this.eventHandlerRef = new AtomicReference<>();
82          this.closed = new AtomicBoolean(false);
83      }
84  
85      @Override
86      public String getId() {
87          return ioSession.getId();
88      }
89  
90      @Override
91      public NamedEndpoint getInitialEndpoint() {
92          return initialEndpoint;
93      }
94  
95      @Override
96      public IOEventHandler getHandler() {
97          return eventHandlerRef.get();
98      }
99  
100     @Override
101     public void upgrade(final IOEventHandler handler) {
102         final IOSession currentSession = currentSessionRef.get();
103         currentSession.upgrade(handler);
104         eventHandlerRef.set(handler);
105     }
106 
107     private IOEventHandler ensureHandler(final IOSession session) {
108         final IOEventHandler handler = session.getHandler();
109         Asserts.notNull(handler, "IO event handler");
110         return handler;
111     }
112 
113     @Override
114     void onIOEvent(final int readyOps) throws IOException {
115         if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
116             final IOSession currentSession = currentSessionRef.get();
117             currentSession.clearEvent(SelectionKey.OP_CONNECT);
118             if (tlsSessionRef.get() == null) {
119                 if (sessionListener != null) {
120                     sessionListener.connected(currentSession);
121                 }
122                 final IOEventHandler handler = ensureHandler(currentSession);
123                 handler.connected(currentSession);
124             }
125         }
126         if ((readyOps & SelectionKey.OP_READ) != 0) {
127             final IOSession currentSession = currentSessionRef.get();
128             currentSession.updateReadTime();
129             if (sessionListener != null) {
130                 sessionListener.inputReady(currentSession);
131             }
132             final IOEventHandler handler = ensureHandler(currentSession);
133             handler.inputReady(currentSession, null);
134         }
135         if ((readyOps & SelectionKey.OP_WRITE) != 0
136                 || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
137             final IOSession currentSession = currentSessionRef.get();
138             currentSession.updateWriteTime();
139             if (sessionListener != null) {
140                 sessionListener.outputReady(currentSession);
141             }
142             final IOEventHandler handler = ensureHandler(currentSession);
143             handler.outputReady(currentSession);
144         }
145     }
146 
147     @Override
148     Timeout getTimeout() {
149         final IOSession currentSession = currentSessionRef.get();
150         return currentSession.getSocketTimeout();
151     }
152 
153     @Override
154     void onTimeout(final Timeout timeout) throws IOException {
155         final IOSession currentSession = currentSessionRef.get();
156         if (sessionListener != null) {
157             sessionListener.timeout(currentSession);
158         }
159         final IOEventHandler handler = ensureHandler(currentSession);
160         handler.timeout(currentSession, timeout);
161     }
162 
163     @Override
164     void onException(final Exception cause) {
165         final IOSession currentSession = currentSessionRef.get();
166         if (sessionListener != null) {
167             sessionListener.exception(currentSession, cause);
168         }
169         final IOEventHandler handler = currentSession.getHandler();
170         if (handler != null) {
171             handler.exception(currentSession, cause);
172         }
173     }
174 
175     void onTLSSessionStart(final SSLIOSession sslSession) {
176         final IOSession currentSession = currentSessionRef.get();
177         if (sessionListener != null) {
178             sessionListener.connected(currentSession);
179         }
180     }
181 
182     void onTLSSessionEnd() {
183         if (closed.compareAndSet(false, true)) {
184             closedSessions.add(this);
185         }
186     }
187 
188     void disconnected() {
189         final IOSession currentSession = currentSessionRef.get();
190         if (sessionListener != null) {
191             sessionListener.disconnected(currentSession);
192         }
193         final IOEventHandler handler = currentSession.getHandler();
194         if (handler != null) {
195             handler.disconnected(currentSession);
196         }
197     }
198 
199     @Override
200     public void startTls(
201             final SSLContext sslContext,
202             final NamedEndpoint endpoint,
203             final SSLBufferMode sslBufferMode,
204             final SSLSessionInitializer initializer,
205             final SSLSessionVerifier verifier,
206             final Timeout handshakeTimeout) {
207         final SSLIOSessionSession.html#SSLIOSession">SSLIOSession sslioSession = new SSLIOSession(
208                 endpoint != null ? endpoint : initialEndpoint,
209                 ioSession,
210                 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
211                 sslContext,
212                 sslBufferMode,
213                 initializer,
214                 verifier,
215                 new Callback<SSLIOSession>() {
216 
217                     @Override
218                     public void execute(final SSLIOSession sslSession) {
219                         onTLSSessionStart(sslSession);
220                     }
221 
222                 },
223                 new Callback<SSLIOSession>() {
224 
225                     @Override
226                     public void execute(final SSLIOSession sslSession) {
227                         onTLSSessionEnd();
228                     }
229 
230                 },
231                 handshakeTimeout);
232         if (tlsSessionRef.compareAndSet(null, sslioSession)) {
233             currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
234             if (sessionListener != null) {
235                 sessionListener.startTls(sslioSession);
236             }
237         } else {
238             throw new IllegalStateException("TLS already activated");
239         }
240     }
241 
242     @SuppressWarnings("resource")
243     @Override
244     public TlsDetails getTlsDetails() {
245         final SSLIOSession sslIoSession = tlsSessionRef.get();
246         return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
247     }
248 
249     @Override
250     public Lock getLock() {
251         return ioSession.getLock();
252     }
253 
254     @Override
255     public void close() {
256         close(CloseMode.GRACEFUL);
257     }
258 
259     @Override
260     public void close(final CloseMode closeMode) {
261         final IOSession currentSession = currentSessionRef.get();
262         if (closeMode == CloseMode.IMMEDIATE) {
263             closed.set(true);
264             currentSession.close(closeMode);
265         } else {
266             if (closed.compareAndSet(false, true)) {
267                 try {
268                     currentSession.close(closeMode);
269                 } finally {
270                     closedSessions.add(this);
271                 }
272             }
273         }
274     }
275 
276     @Override
277     public IOSession.Status getStatus() {
278         final IOSession currentSession = currentSessionRef.get();
279         return currentSession.getStatus();
280     }
281 
282     @Override
283     public boolean isOpen() {
284         final IOSession currentSession = currentSessionRef.get();
285         return currentSession.isOpen();
286     }
287 
288     @Override
289     public void enqueue(final Command command, final Command.Priority priority) {
290         final IOSession currentSession = currentSessionRef.get();
291         currentSession.enqueue(command, priority);
292     }
293 
294     @Override
295     public boolean hasCommands() {
296         final IOSession currentSession = currentSessionRef.get();
297         return currentSession.hasCommands();
298     }
299 
300     @Override
301     public Command poll() {
302         final IOSession currentSession = currentSessionRef.get();
303         return currentSession.poll();
304     }
305 
306     @Override
307     public ByteChannel channel() {
308         final IOSession currentSession = currentSessionRef.get();
309         return currentSession.channel();
310     }
311 
312     @Override
313     public SocketAddress getRemoteAddress() {
314         return ioSession.getRemoteAddress();
315     }
316 
317     @Override
318     public SocketAddress getLocalAddress() {
319         return ioSession.getLocalAddress();
320     }
321 
322     @Override
323     public int getEventMask() {
324         final IOSession currentSession = currentSessionRef.get();
325         return currentSession.getEventMask();
326     }
327 
328     @Override
329     public void setEventMask(final int ops) {
330         final IOSession currentSession = currentSessionRef.get();
331         currentSession.setEventMask(ops);
332     }
333 
334     @Override
335     public void setEvent(final int op) {
336         final IOSession currentSession = currentSessionRef.get();
337         currentSession.setEvent(op);
338     }
339 
340     @Override
341     public void clearEvent(final int op) {
342         final IOSession currentSession = currentSessionRef.get();
343         currentSession.clearEvent(op);
344     }
345 
346     @Override
347     public Timeout getSocketTimeout() {
348         return ioSession.getSocketTimeout();
349     }
350 
351     @Override
352     public void setSocketTimeout(final Timeout timeout) {
353         ioSession.setSocketTimeout(timeout);
354     }
355 
356     @Override
357     public int read(final ByteBuffer dst) throws IOException {
358         final IOSession currentSession = currentSessionRef.get();
359         return currentSession.read(dst);
360     }
361 
362     @Override
363     public int write(final ByteBuffer src) throws IOException {
364         final IOSession currentSession = currentSessionRef.get();
365         return currentSession.write(src);
366     }
367 
368     @Override
369     public void updateReadTime() {
370         ioSession.updateReadTime();
371     }
372 
373     @Override
374     public void updateWriteTime() {
375         ioSession.updateWriteTime();
376     }
377 
378     @Override
379     public long getLastReadTime() {
380         return ioSession.getLastReadTime();
381     }
382 
383     @Override
384     public long getLastWriteTime() {
385         return ioSession.getLastWriteTime();
386     }
387 
388     @Override
389     public long getLastEventTime() {
390         return ioSession.getLastEventTime();
391     }
392 
393     @Override
394     public String toString() {
395         final IOSession currentSession = currentSessionRef.get();
396         if (currentSession != null) {
397             return currentSession.toString();
398         } else {
399             return ioSession.toString();
400         }
401     }
402 
403 }