View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ByteChannel;
34  import java.nio.channels.SelectionKey;
35  import java.util.Objects;
36  import java.util.Queue;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.ConcurrentMap;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicReference;
41  import java.util.concurrent.locks.Lock;
42  
43  import javax.net.ssl.SSLContext;
44  import javax.net.ssl.SSLSession;
45  
46  import org.apache.hc.core5.concurrent.CallbackContribution;
47  import org.apache.hc.core5.concurrent.FutureCallback;
48  import org.apache.hc.core5.function.Decorator;
49  import org.apache.hc.core5.io.CloseMode;
50  import org.apache.hc.core5.net.NamedEndpoint;
51  import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
52  import org.apache.hc.core5.reactor.ssl.SSLIOSession;
53  import org.apache.hc.core5.reactor.ssl.SSLMode;
54  import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
55  import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
56  import org.apache.hc.core5.reactor.ssl.TlsDetails;
57  import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
58  import org.apache.hc.core5.util.Args;
59  import org.apache.hc.core5.util.Asserts;
60  import org.apache.hc.core5.util.TextUtils;
61  import org.apache.hc.core5.util.Timeout;
62  
63  final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
64  
65      private final IOSession ioSession;
66      private final NamedEndpoint initialEndpoint;
67      private final Decorator<IOSession> ioSessionDecorator;
68      private final IOSessionListener sessionListener;
69      private final Queue<InternalDataChannel> closedSessions;
70      private final AtomicReference<SSLIOSession> tlsSessionRef;
71      private final AtomicReference<IOSession> currentSessionRef;
72      private final AtomicReference<IOEventHandler> eventHandlerRef;
73      private final ConcurrentMap<String, ProtocolUpgradeHandler> protocolUpgradeHandlerMap;
74      private final AtomicBoolean closed;
75  
76      InternalDataChannel(
77              final IOSession ioSession,
78              final NamedEndpoint initialEndpoint,
79              final Decorator<IOSession> ioSessionDecorator,
80              final IOSessionListener sessionListener,
81              final Queue<InternalDataChannel> closedSessions) {
82          this.ioSession = ioSession;
83          this.initialEndpoint = initialEndpoint;
84          this.closedSessions = closedSessions;
85          this.ioSessionDecorator = ioSessionDecorator;
86          this.sessionListener = sessionListener;
87          this.tlsSessionRef = new AtomicReference<>();
88          this.currentSessionRef = new AtomicReference<>(
89                  ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
90          this.eventHandlerRef = new AtomicReference<>();
91          this.protocolUpgradeHandlerMap = new ConcurrentHashMap<>();
92          this.closed = new AtomicBoolean();
93      }
94  
95      @Override
96      public String getId() {
97          return ioSession.getId();
98      }
99  
100     @Override
101     public NamedEndpoint getInitialEndpoint() {
102         return initialEndpoint;
103     }
104 
105     @Override
106     public IOEventHandler getHandler() {
107         return eventHandlerRef.get();
108     }
109 
110     @Override
111     public void upgrade(final IOEventHandler handler) {
112         final IOSession currentSession = currentSessionRef.get();
113         currentSession.upgrade(handler);
114         eventHandlerRef.set(handler);
115     }
116 
117     private IOEventHandler ensureHandler(final IOSession session) {
118         final IOEventHandler handler = session.getHandler();
119         Asserts.notNull(handler, "IO event handler");
120         return handler;
121     }
122 
123     @Override
124     void onIOEvent(final int readyOps) throws IOException {
125         if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
126             final IOSession currentSession = currentSessionRef.get();
127             currentSession.clearEvent(SelectionKey.OP_CONNECT);
128             if (tlsSessionRef.get() == null) {
129                 if (sessionListener != null) {
130                     sessionListener.connected(currentSession);
131                 }
132                 final IOEventHandler handler = ensureHandler(currentSession);
133                 handler.connected(currentSession);
134             }
135         }
136         if ((readyOps & SelectionKey.OP_READ) != 0) {
137             final IOSession currentSession = currentSessionRef.get();
138             currentSession.updateReadTime();
139             if (sessionListener != null) {
140                 sessionListener.inputReady(currentSession);
141             }
142             final IOEventHandler handler = ensureHandler(currentSession);
143             handler.inputReady(currentSession, null);
144         }
145         if ((readyOps & SelectionKey.OP_WRITE) != 0
146                 || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
147             final IOSession currentSession = currentSessionRef.get();
148             currentSession.updateWriteTime();
149             if (sessionListener != null) {
150                 sessionListener.outputReady(currentSession);
151             }
152             final IOEventHandler handler = ensureHandler(currentSession);
153             handler.outputReady(currentSession);
154         }
155     }
156 
157     @Override
158     Timeout getTimeout() {
159         final IOSession currentSession = currentSessionRef.get();
160         return currentSession.getSocketTimeout();
161     }
162 
163     @Override
164     void onTimeout(final Timeout timeout) throws IOException {
165         final IOSession currentSession = currentSessionRef.get();
166         if (sessionListener != null) {
167             sessionListener.timeout(currentSession);
168         }
169         final IOEventHandler handler = ensureHandler(currentSession);
170         handler.timeout(currentSession, timeout);
171     }
172 
173     @Override
174     void onException(final Exception cause) {
175         final IOSession currentSession = currentSessionRef.get();
176         if (sessionListener != null) {
177             sessionListener.exception(currentSession, cause);
178         }
179         final IOEventHandler handler = currentSession.getHandler();
180         if (handler != null) {
181             handler.exception(currentSession, cause);
182         }
183     }
184 
185     void onTLSSessionStart(final SSLIOSession sslSession) {
186         final IOSession currentSession = currentSessionRef.get();
187         if (sessionListener != null) {
188             sessionListener.connected(currentSession);
189         }
190     }
191 
192     void onTLSSessionEnd(final SSLIOSession sslSession) {
193         if (closed.compareAndSet(false, true)) {
194             closedSessions.add(this);
195         }
196     }
197 
198     void disconnected() {
199         final IOSession currentSession = currentSessionRef.get();
200         if (sessionListener != null) {
201             sessionListener.disconnected(currentSession);
202         }
203         final IOEventHandler handler = currentSession.getHandler();
204         if (handler != null) {
205             handler.disconnected(currentSession);
206         }
207     }
208 
209     @Override
210     public void startTls(
211             final SSLContext sslContext,
212             final NamedEndpoint endpoint,
213             final SSLBufferMode sslBufferMode,
214             final SSLSessionInitializer initializer,
215             final SSLSessionVerifier verifier,
216             final Timeout handshakeTimeout) {
217         startTls(sslContext, endpoint, sslBufferMode, initializer, verifier, handshakeTimeout, null);
218     }
219 
220     @Override
221     public void startTls(
222             final SSLContext sslContext,
223             final NamedEndpoint endpoint,
224             final SSLBufferMode sslBufferMode,
225             final SSLSessionInitializer initializer,
226             final SSLSessionVerifier verifier,
227             final Timeout handshakeTimeout,
228             final FutureCallback<TransportSecurityLayer> callback) {
229         final SSLIOSession sslioSession = new SSLIOSession(
230                 endpoint != null ? endpoint : initialEndpoint,
231                 ioSession,
232                 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
233                 sslContext,
234                 sslBufferMode,
235                 initializer,
236                 verifier,
237                 handshakeTimeout,
238                 this::onTLSSessionStart,
239                 this::onTLSSessionEnd,
240                 new CallbackContribution<SSLSession>(callback) {
241 
242                     @Override
243                     public void completed(final SSLSession sslSession) {
244                         if (callback != null) {
245                             callback.completed(InternalDataChannel.this);
246                         }
247                     }
248 
249                 });
250         if (tlsSessionRef.compareAndSet(null, sslioSession)) {
251             currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
252         } else {
253             throw new IllegalStateException("TLS already activated");
254         }
255         try {
256             if (sessionListener != null) {
257                 sessionListener.startTls(sslioSession);
258             }
259             sslioSession.beginHandshake(this);
260         } catch (final Exception ex) {
261             onException(ex);
262         }
263     }
264 
265     @SuppressWarnings("resource")
266     @Override
267     public TlsDetails getTlsDetails() {
268         final SSLIOSession sslIoSession = tlsSessionRef.get();
269         return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
270     }
271 
272     @Override
273     public Lock getLock() {
274         return ioSession.getLock();
275     }
276 
277     @Override
278     public void close() {
279         close(CloseMode.GRACEFUL);
280     }
281 
282     @Override
283     public void close(final CloseMode closeMode) {
284         final IOSession currentSession = currentSessionRef.get();
285         if (closeMode == CloseMode.IMMEDIATE) {
286             closed.set(true);
287             currentSession.close(closeMode);
288         } else {
289             if (closed.compareAndSet(false, true)) {
290                 try {
291                     currentSession.close(closeMode);
292                 } finally {
293                     closedSessions.add(this);
294                 }
295             }
296         }
297     }
298 
299     @Override
300     public IOSession.Status getStatus() {
301         final IOSession currentSession = currentSessionRef.get();
302         return currentSession.getStatus();
303     }
304 
305     @Override
306     public boolean isOpen() {
307         final IOSession currentSession = currentSessionRef.get();
308         return currentSession.isOpen();
309     }
310 
311     @Override
312     public void enqueue(final Command command, final Command.Priority priority) {
313         final IOSession currentSession = currentSessionRef.get();
314         currentSession.enqueue(command, priority);
315     }
316 
317     @Override
318     public boolean hasCommands() {
319         final IOSession currentSession = currentSessionRef.get();
320         return currentSession.hasCommands();
321     }
322 
323     @Override
324     public Command poll() {
325         final IOSession currentSession = currentSessionRef.get();
326         return currentSession.poll();
327     }
328 
329     @Override
330     public ByteChannel channel() {
331         final IOSession currentSession = currentSessionRef.get();
332         return currentSession.channel();
333     }
334 
335     @Override
336     public SocketAddress getRemoteAddress() {
337         return ioSession.getRemoteAddress();
338     }
339 
340     @Override
341     public SocketAddress getLocalAddress() {
342         return ioSession.getLocalAddress();
343     }
344 
345     @Override
346     public int getEventMask() {
347         final IOSession currentSession = currentSessionRef.get();
348         return currentSession.getEventMask();
349     }
350 
351     @Override
352     public void setEventMask(final int ops) {
353         final IOSession currentSession = currentSessionRef.get();
354         currentSession.setEventMask(ops);
355     }
356 
357     @Override
358     public void setEvent(final int op) {
359         final IOSession currentSession = currentSessionRef.get();
360         currentSession.setEvent(op);
361     }
362 
363     @Override
364     public void clearEvent(final int op) {
365         final IOSession currentSession = currentSessionRef.get();
366         currentSession.clearEvent(op);
367     }
368 
369     @Override
370     public Timeout getSocketTimeout() {
371         return ioSession.getSocketTimeout();
372     }
373 
374     @Override
375     public void setSocketTimeout(final Timeout timeout) {
376         ioSession.setSocketTimeout(timeout);
377     }
378 
379     @Override
380     public int read(final ByteBuffer dst) throws IOException {
381         final IOSession currentSession = currentSessionRef.get();
382         return currentSession.read(dst);
383     }
384 
385     @Override
386     public int write(final ByteBuffer src) throws IOException {
387         final IOSession currentSession = currentSessionRef.get();
388         return currentSession.write(src);
389     }
390 
391     @Override
392     public void updateReadTime() {
393         ioSession.updateReadTime();
394     }
395 
396     @Override
397     public void updateWriteTime() {
398         ioSession.updateWriteTime();
399     }
400 
401     @Override
402     public long getLastReadTime() {
403         return ioSession.getLastReadTime();
404     }
405 
406     @Override
407     public long getLastWriteTime() {
408         return ioSession.getLastWriteTime();
409     }
410 
411     @Override
412     public long getLastEventTime() {
413         return ioSession.getLastEventTime();
414     }
415 
416     @Override
417     public void switchProtocol(final String protocolId, final FutureCallback<ProtocolIOSession> callback) {
418         Args.notEmpty(protocolId, "Application protocol ID");
419         final ProtocolUpgradeHandler upgradeHandler = protocolUpgradeHandlerMap.get(TextUtils.toLowerCase(protocolId));
420         if (upgradeHandler != null) {
421             upgradeHandler.upgrade(this, callback);
422         } else {
423             throw new IllegalStateException("Unsupported protocol: " + protocolId);
424         }
425     }
426 
427     @Override
428     public void registerProtocol(final String protocolId, final ProtocolUpgradeHandler upgradeHandler) {
429         Args.notEmpty(protocolId, "Application protocol ID");
430         Args.notNull(upgradeHandler, "Protocol upgrade handler");
431         protocolUpgradeHandlerMap.put(TextUtils.toLowerCase(protocolId), upgradeHandler);
432     }
433 
434     @Override
435     public String toString() {
436         final IOSession currentSession = currentSessionRef.get();
437         return Objects.toString(currentSession != null ? currentSession: ioSession, null);
438     }
439 
440 }