View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.UnknownHostException;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.SocketChannel;
38  import java.util.Queue;
39  import java.util.Set;
40  import java.util.concurrent.ConcurrentLinkedQueue;
41  import java.util.concurrent.ThreadFactory;
42  
43  import org.apache.http.annotation.ThreadSafe;
44  import org.apache.http.nio.reactor.ConnectingIOReactor;
45  import org.apache.http.nio.reactor.IOReactorException;
46  import org.apache.http.nio.reactor.IOReactorStatus;
47  import org.apache.http.nio.reactor.SessionRequest;
48  import org.apache.http.nio.reactor.SessionRequestCallback;
49  import org.apache.http.params.HttpParams;
50  import org.apache.http.util.Asserts;
51  
52  /**
53   * Default implementation of {@link ConnectingIOReactor}. This class extends
54   * {@link AbstractMultiworkerIOReactor} with capability to connect to remote
55   * hosts.
56   *
57   * @since 4.0
58   */
59  @SuppressWarnings("deprecation")
60  @ThreadSafe // public methods only
61  public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
62          implements ConnectingIOReactor {
63  
64      private final Queue<SessionRequestImpl> requestQueue;
65  
66      private long lastTimeoutCheck;
67  
68      /**
69       * Creates an instance of DefaultConnectingIOReactor with the given configuration.
70       *
71       * @param config I/O reactor configuration.
72       * @param threadFactory the factory to create threads.
73       *   Can be {@code null}.
74       * @throws IOReactorException in case if a non-recoverable I/O error.
75       *
76       * @since 4.2
77       */
78      public DefaultConnectingIOReactor(
79              final IOReactorConfig config,
80              final ThreadFactory threadFactory) throws IOReactorException {
81          super(config, threadFactory);
82          this.requestQueue = new ConcurrentLinkedQueue<SessionRequestImpl>();
83          this.lastTimeoutCheck = System.currentTimeMillis();
84      }
85  
86      /**
87       * Creates an instance of DefaultConnectingIOReactor with the given configuration.
88       *
89       * @param config I/O reactor configuration.
90       *   Can be {@code null}.
91       * @throws IOReactorException in case if a non-recoverable I/O error.
92       *
93       * @since 4.2
94       */
95      public DefaultConnectingIOReactor(final IOReactorConfig config) throws IOReactorException {
96          this(config, null);
97      }
98  
99      /**
100      * Creates an instance of DefaultConnectingIOReactor with default configuration.
101      *
102      * @throws IOReactorException in case if a non-recoverable I/O error.
103      *
104      * @since 4.2
105      */
106     public DefaultConnectingIOReactor() throws IOReactorException {
107         this(null, null);
108     }
109 
110     /**
111      * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig, ThreadFactory)}
112      */
113     @Deprecated
114     public DefaultConnectingIOReactor(
115             final int workerCount,
116             final ThreadFactory threadFactory,
117             final HttpParams params) throws IOReactorException {
118         this(convert(workerCount, params), threadFactory);
119     }
120 
121     /**
122      * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig)}
123      */
124     @Deprecated
125     public DefaultConnectingIOReactor(
126             final int workerCount,
127             final HttpParams params) throws IOReactorException {
128         this(convert(workerCount, params), null);
129     }
130 
131     @Override
132     protected void cancelRequests() throws IOReactorException {
133         SessionRequestImpl request;
134         while ((request = this.requestQueue.poll()) != null) {
135             request.cancel();
136         }
137     }
138 
139     @Override
140     protected void processEvents(final int readyCount) throws IOReactorException {
141         processSessionRequests();
142 
143         if (readyCount > 0) {
144             final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
145             for (final SelectionKey key : selectedKeys) {
146 
147                 processEvent(key);
148 
149             }
150             selectedKeys.clear();
151         }
152 
153         final long currentTime = System.currentTimeMillis();
154         if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
155             this.lastTimeoutCheck = currentTime;
156             final Set<SelectionKey> keys = this.selector.keys();
157             processTimeouts(keys);
158         }
159     }
160 
161     private void processEvent(final SelectionKey key) {
162         try {
163 
164             if (key.isConnectable()) {
165 
166                 final SocketChannel channel = (SocketChannel) key.channel();
167                 // Get request handle
168                 final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
169                 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
170 
171                 // Finish connection process
172                 try {
173                     channel.finishConnect();
174                 } catch (final IOException ex) {
175                     sessionRequest.failed(ex);
176                 }
177                 key.cancel();
178                 key.attach(null);
179                 if (!sessionRequest.isCompleted()) {
180                     addChannel(new ChannelEntry(channel, sessionRequest));
181                 } else {
182                     try {
183                         channel.close();
184                     } catch (IOException ignore) {
185                     }
186                 }
187             }
188 
189         } catch (final CancelledKeyException ex) {
190             final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
191             key.attach(null);
192             if (requestHandle != null) {
193                 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
194                 if (sessionRequest != null) {
195                     sessionRequest.cancel();
196                 }
197             }
198         }
199     }
200 
201     private void processTimeouts(final Set<SelectionKey> keys) {
202         final long now = System.currentTimeMillis();
203         for (final SelectionKey key : keys) {
204             final Object attachment = key.attachment();
205 
206             if (attachment instanceof SessionRequestHandle) {
207                 final SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
208                 final SessionRequestImpl sessionRequest = handle.getSessionRequest();
209                 final int timeout = sessionRequest.getConnectTimeout();
210                 if (timeout > 0) {
211                     if (handle.getRequestTime() + timeout < now) {
212                         sessionRequest.timeout();
213                     }
214                 }
215             }
216 
217         }
218     }
219 
220     @Override
221     public SessionRequest connect(
222             final SocketAddress remoteAddress,
223             final SocketAddress localAddress,
224             final Object attachment,
225             final SessionRequestCallback callback) {
226         Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
227             "I/O reactor has been shut down");
228         final SessionRequestImpl sessionRequest = new SessionRequestImpl(
229                 remoteAddress, localAddress, attachment, callback);
230         sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
231 
232         this.requestQueue.add(sessionRequest);
233         this.selector.wakeup();
234 
235         return sessionRequest;
236     }
237 
238     private void validateAddress(final SocketAddress address) throws UnknownHostException {
239         if (address == null) {
240             return;
241         }
242         if (address instanceof InetSocketAddress) {
243             final InetSocketAddress endpoint = (InetSocketAddress) address;
244             if (endpoint.isUnresolved()) {
245                 throw new UnknownHostException(endpoint.getHostName());
246             }
247         }
248     }
249 
250     private void processSessionRequests() throws IOReactorException {
251         SessionRequestImpl request;
252         while ((request = this.requestQueue.poll()) != null) {
253             if (request.isCompleted()) {
254                 continue;
255             }
256             final SocketChannel socketChannel;
257             try {
258                 socketChannel = SocketChannel.open();
259             } catch (final IOException ex) {
260                 throw new IOReactorException("Failure opening socket", ex);
261             }
262             try {
263                 validateAddress(request.getLocalAddress());
264                 validateAddress(request.getRemoteAddress());
265 
266                 socketChannel.configureBlocking(false);
267                 prepareSocket(socketChannel.socket());
268 
269                 if (request.getLocalAddress() != null) {
270                     final Socket sock = socketChannel.socket();
271                     sock.setReuseAddress(this.config.isSoReuseAddress());
272                     sock.bind(request.getLocalAddress());
273                 }
274                 final boolean connected = socketChannel.connect(request.getRemoteAddress());
275                 if (connected) {
276                     final ChannelEntry entry = new ChannelEntry(socketChannel, request);
277                     addChannel(entry);
278                     continue;
279                 }
280             } catch (final IOException ex) {
281                 closeChannel(socketChannel);
282                 request.failed(ex);
283                 return;
284             }
285 
286             final SessionRequestHandle requestHandle = new SessionRequestHandle(request);
287             try {
288                 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
289                         requestHandle);
290                 request.setKey(key);
291             } catch (final IOException ex) {
292                 closeChannel(socketChannel);
293                 throw new IOReactorException("Failure registering channel " +
294                         "with the selector", ex);
295             }
296         }
297     }
298 
299 }