View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.channels.ByteChannel;
33  import java.nio.channels.SelectionKey;
34  import java.util.Queue;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicReference;
37  
38  import javax.net.ssl.SSLContext;
39  
40  import org.apache.hc.core5.function.Callback;
41  import org.apache.hc.core5.io.ShutdownType;
42  import org.apache.hc.core5.net.NamedEndpoint;
43  import org.apache.hc.core5.reactor.ssl.SSLBufferManagement;
44  import org.apache.hc.core5.reactor.ssl.SSLIOSession;
45  import org.apache.hc.core5.reactor.ssl.SSLMode;
46  import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
47  import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
48  import org.apache.hc.core5.reactor.ssl.TlsDetails;
49  import org.apache.hc.core5.util.Asserts;
50  
51  final class InternalDataChannel extends InternalChannel implements TlsCapableIOSession {
52  
53      private final IOSession ioSession;
54      private final NamedEndpoint namedEndpoint;
55      private final IOSessionListener sessionListener;
56      private final AtomicReference<SSLIOSession> tlsSessionRef;
57      private final Queue<InternalDataChannel> closedSessions;
58      private final AtomicBoolean connected;
59      private final AtomicBoolean closed;
60  
61      InternalDataChannel(
62              final IOSession ioSession,
63              final NamedEndpoint namedEndpoint,
64              final IOSessionListener sessionListener,
65              final Queue<InternalDataChannel> closedSessions) {
66          this.ioSession = ioSession;
67          this.namedEndpoint = namedEndpoint;
68          this.closedSessions = closedSessions;
69          this.sessionListener = sessionListener;
70          this.tlsSessionRef = new AtomicReference<>(null);
71          this.connected = new AtomicBoolean(false);
72          this.closed = new AtomicBoolean(false);
73      }
74  
75      @Override
76      public String getId() {
77          return ioSession.getId();
78      }
79  
80      private IOSession getSessionImpl() {
81          final SSLIOSession tlsSession = tlsSessionRef.get();
82          if (tlsSession != null) {
83              return tlsSession;
84          } else {
85              return ioSession;
86          }
87      }
88  
89      private IOEventHandler getEventHandler() {
90          final IOEventHandler handler = ioSession.getHandler();
91          Asserts.notNull(handler, "IO event handler");
92          return handler;
93      }
94  
95      @Override
96      void onIOEvent(final int readyOps) throws IOException {
97          final SSLIOSession tlsSession = tlsSessionRef.get();
98          if (tlsSession != null) {
99              if (!tlsSession.isInitialized()) {
100                 tlsSession.initialize();
101                 if (sessionListener != null) {
102                     sessionListener.tlsStarted(tlsSession);
103                 }
104             }
105             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
106                 ioSession.clearEvent(SelectionKey.OP_CONNECT);
107             }
108             if ((readyOps & SelectionKey.OP_READ) != 0) {
109                 ioSession.updateReadTime();
110                 do {
111                     tlsSession.resetReadCount();
112                     if (tlsSession.isAppInputReady()) {
113                         if (sessionListener != null) {
114                             sessionListener.inputReady(this);
115                         }
116                         final IOEventHandler handler = getEventHandler();
117                         handler.inputReady(this);
118                     }
119                     tlsSession.inboundTransport();
120                     if (sessionListener != null) {
121                         sessionListener.tlsInbound(tlsSession);
122                     }
123                 } while (tlsSession.getReadCount() > 0);
124             }
125             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
126                 ioSession.updateWriteTime();
127                 if (tlsSession.isAppOutputReady()) {
128                     if (sessionListener != null) {
129                         sessionListener.outputReady(this);
130                     }
131                     final IOEventHandler handler = getEventHandler();
132                     handler.outputReady(this);
133                 }
134                 tlsSession.outboundTransport();
135                 if (sessionListener != null) {
136                     sessionListener.tlsOutbound(tlsSession);
137                 }
138             }
139         } else {
140             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
141                 ioSession.clearEvent(SelectionKey.OP_CONNECT);
142                 if (connected.compareAndSet(false, true)) {
143                     if (sessionListener != null) {
144                         sessionListener.connected(this);
145                     }
146                     final IOEventHandler handler = getEventHandler();
147                     handler.connected(this);
148                 }
149             }
150             if ((readyOps & SelectionKey.OP_READ) != 0) {
151                 ioSession.updateReadTime();
152                 if (sessionListener != null) {
153                     sessionListener.inputReady(this);
154                 }
155                 final IOEventHandler handler = getEventHandler();
156                 handler.inputReady(this);
157             }
158             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
159                 ioSession.updateWriteTime();
160                 if (sessionListener != null) {
161                     sessionListener.outputReady(this);
162                 }
163                 final IOEventHandler handler = getEventHandler();
164                 handler.outputReady(this);
165             }
166         }
167     }
168 
169     @Override
170     int getTimeout() {
171         return ioSession.getSocketTimeout();
172     }
173 
174     @Override
175     void onTimeout() throws IOException {
176         final IOEventHandler handler = getEventHandler();
177         handler.timeout(this);
178         final SSLIOSession tlsSession = tlsSessionRef.get();
179         if (tlsSession != null) {
180             if (tlsSession.isOutboundDone() && !tlsSession.isInboundDone()) {
181                 // The session failed to terminate cleanly
182                 tlsSession.shutdown(ShutdownType.IMMEDIATE);
183             }
184         }
185     }
186 
187     @Override
188     void onException(final Exception cause) {
189         final IOEventHandler handler = getEventHandler();
190         if (sessionListener != null) {
191             sessionListener.exception(this, cause);
192         }
193         handler.exception(this, cause);
194     }
195 
196     void disconnected() {
197         if (sessionListener != null) {
198             sessionListener.disconnected(this);
199         }
200         final IOEventHandler handler = getEventHandler();
201         handler.disconnected(this);
202     }
203 
204     @Override
205     public void startTls(
206             final SSLContext sslContext,
207             final SSLBufferManagement sslBufferManagement,
208             final SSLSessionInitializer initializer,
209             final SSLSessionVerifier verifier) {
210         if (!tlsSessionRef.compareAndSet(null, new SSLIOSession(
211                 namedEndpoint,
212                 ioSession,
213                 namedEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
214                 sslContext,
215                 sslBufferManagement,
216                 initializer,
217                 verifier,
218                 new Callback<SSLIOSession>() {
219 
220                     @Override
221                     public void execute(final SSLIOSession sslSession) {
222                         if (connected.compareAndSet(false, true)) {
223                             final IOEventHandler handler = getEventHandler();
224                             try {
225                                 if (sessionListener != null) {
226                                     sessionListener.connected(InternalDataChannel.this);
227                                 }
228                                 handler.connected(InternalDataChannel.this);
229                             } catch (final Exception ex) {
230                                 if (sessionListener != null) {
231                                     sessionListener.exception(InternalDataChannel.this, ex);
232                                 }
233                                 handler.exception(InternalDataChannel.this, ex);
234                             }
235                         }
236                     }
237 
238                 }))) {
239             throw new IllegalStateException("TLS already activated");
240         }
241     }
242 
243     @Override
244     public TlsDetails getTlsDetails() {
245         final SSLIOSession sslIoSession = tlsSessionRef.get();
246         return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
247     }
248 
249     @Override
250     public void close() {
251         if (closed.compareAndSet(false, true)) {
252             try {
253                 getSessionImpl().close();
254             } finally {
255                 closedSessions.add(this);
256             }
257         }
258     }
259 
260     @Override
261     public void shutdown(final ShutdownType shutdownType) {
262         if (closed.compareAndSet(false, true)) {
263             try {
264                 getSessionImpl().shutdown(shutdownType);
265             } finally {
266                 closedSessions.add(this);
267             }
268         }
269     }
270 
271     @Override
272     public int getStatus() {
273         return getSessionImpl().getStatus();
274     }
275 
276     @Override
277     public boolean isClosed() {
278         return getSessionImpl().isClosed();
279     }
280 
281     @Override
282     public IOEventHandler getHandler() {
283         return ioSession.getHandler();
284     }
285 
286     @Override
287     public void setHandler(final IOEventHandler eventHandler) {
288         ioSession.setHandler(eventHandler);
289     }
290 
291     @Override
292     public void addLast(final Command command) {
293         getSessionImpl().addLast(command);
294     }
295 
296     @Override
297     public void addFirst(final Command command) {
298         getSessionImpl().addFirst(command);
299     }
300 
301     @Override
302     public Command getCommand() {
303         return getSessionImpl().getCommand();
304     }
305 
306     @Override
307     public ByteChannel channel() {
308         return getSessionImpl().channel();
309     }
310 
311     @Override
312     public SocketAddress getRemoteAddress() {
313         return ioSession.getRemoteAddress();
314     }
315 
316     @Override
317     public SocketAddress getLocalAddress() {
318         return ioSession.getLocalAddress();
319     }
320 
321     @Override
322     public int getEventMask() {
323         return getSessionImpl().getEventMask();
324     }
325 
326     @Override
327     public void setEventMask(final int ops) {
328         getSessionImpl().setEventMask(ops);
329     }
330 
331     @Override
332     public void setEvent(final int op) {
333         getSessionImpl().setEvent(op);
334     }
335 
336     @Override
337     public void clearEvent(final int op) {
338         getSessionImpl().clearEvent(op);
339     }
340 
341     @Override
342     public int getSocketTimeout() {
343         return ioSession.getSocketTimeout();
344     }
345 
346     @Override
347     public void setSocketTimeout(final int timeout) {
348         ioSession.setSocketTimeout(timeout);
349     }
350 
351     @Override
352     public void updateReadTime() {
353         ioSession.updateReadTime();
354     }
355 
356     @Override
357     public void updateWriteTime() {
358         ioSession.updateWriteTime();
359     }
360 
361     @Override
362     public long getLastReadTime() {
363         return ioSession.getLastReadTime();
364     }
365 
366     @Override
367     public long getLastWriteTime() {
368         return ioSession.getLastWriteTime();
369     }
370 
371     @Override
372     public String toString() {
373         return getSessionImpl().toString();
374     }
375 
376 }