View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.UnknownHostException;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.SocketChannel;
38  import java.util.Queue;
39  import java.util.Set;
40  import java.util.concurrent.ConcurrentLinkedQueue;
41  import java.util.concurrent.ThreadFactory;
42  
43  import org.apache.http.nio.reactor.ConnectingIOReactor;
44  import org.apache.http.nio.reactor.IOReactorException;
45  import org.apache.http.nio.reactor.IOReactorStatus;
46  import org.apache.http.nio.reactor.SessionRequest;
47  import org.apache.http.nio.reactor.SessionRequestCallback;
48  import org.apache.http.params.HttpParams;
49  import org.apache.http.util.Asserts;
50  
51  /**
52   * Default implementation of {@link ConnectingIOReactor}. This class extends
53   * {@link AbstractMultiworkerIOReactor} with capability to connect to remote
54   * hosts.
55   *
56   * @since 4.0
57   */
58  @SuppressWarnings("deprecation")
59  public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
60          implements ConnectingIOReactor {
61  
62      private final Queue<SessionRequestImpl> requestQueue;
63  
64      private long lastTimeoutCheck;
65  
66      /**
67       * Creates an instance of DefaultConnectingIOReactor with the given configuration.
68       *
69       * @param config I/O reactor configuration.
70       * @param threadFactory the factory to create threads.
71       *   Can be {@code null}.
72       * @throws IOReactorException in case if a non-recoverable I/O error.
73       *
74       * @since 4.2
75       */
76      public DefaultConnectingIOReactor(
77              final IOReactorConfig config,
78              final ThreadFactory threadFactory) throws IOReactorException {
79          super(config, threadFactory);
80          this.requestQueue = new ConcurrentLinkedQueue<SessionRequestImpl>();
81          this.lastTimeoutCheck = System.currentTimeMillis();
82      }
83  
84      /**
85       * Creates an instance of DefaultConnectingIOReactor with the given configuration.
86       *
87       * @param config I/O reactor configuration.
88       *   Can be {@code null}.
89       * @throws IOReactorException in case if a non-recoverable I/O error.
90       *
91       * @since 4.2
92       */
93      public DefaultConnectingIOReactor(final IOReactorConfig config) throws IOReactorException {
94          this(config, null);
95      }
96  
97      /**
98       * Creates an instance of DefaultConnectingIOReactor with default configuration.
99       *
100      * @throws IOReactorException in case if a non-recoverable I/O error.
101      *
102      * @since 4.2
103      */
104     public DefaultConnectingIOReactor() throws IOReactorException {
105         this(null, null);
106     }
107 
108     /**
109      * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig, ThreadFactory)}
110      */
111     @Deprecated
112     public DefaultConnectingIOReactor(
113             final int workerCount,
114             final ThreadFactory threadFactory,
115             final HttpParams params) throws IOReactorException {
116         this(convert(workerCount, params), threadFactory);
117     }
118 
119     /**
120      * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig)}
121      */
122     @Deprecated
123     public DefaultConnectingIOReactor(
124             final int workerCount,
125             final HttpParams params) throws IOReactorException {
126         this(convert(workerCount, params), null);
127     }
128 
129     @Override
130     protected void cancelRequests() throws IOReactorException {
131         SessionRequestImpl request;
132         while ((request = this.requestQueue.poll()) != null) {
133             request.cancel();
134         }
135     }
136 
137     @Override
138     protected void processEvents(final int readyCount) throws IOReactorException {
139         processSessionRequests();
140 
141         if (readyCount > 0) {
142             final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
143             for (final SelectionKey key : selectedKeys) {
144 
145                 processEvent(key);
146 
147             }
148             selectedKeys.clear();
149         }
150 
151         final long currentTime = System.currentTimeMillis();
152         if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
153             this.lastTimeoutCheck = currentTime;
154             final Set<SelectionKey> keys = this.selector.keys();
155             processTimeouts(keys);
156         }
157     }
158 
159     private void processEvent(final SelectionKey key) {
160         try {
161 
162             if (key.isConnectable()) {
163 
164                 final SocketChannel channel = (SocketChannel) key.channel();
165                 // Get request handle
166                 final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
167                 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
168 
169                 // Finish connection process
170                 try {
171                     channel.finishConnect();
172                 } catch (final IOException ex) {
173                     sessionRequest.failed(ex);
174                 }
175                 key.cancel();
176                 key.attach(null);
177                 if (!sessionRequest.isCompleted()) {
178                     addChannel(new ChannelEntry(channel, sessionRequest));
179                 } else {
180                     try {
181                         channel.close();
182                     } catch (IOException ignore) {
183                     }
184                 }
185             }
186 
187         } catch (final CancelledKeyException ex) {
188             final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
189             key.attach(null);
190             if (requestHandle != null) {
191                 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
192                 if (sessionRequest != null) {
193                     sessionRequest.cancel();
194                 }
195             }
196         }
197     }
198 
199     private void processTimeouts(final Set<SelectionKey> keys) {
200         final long now = System.currentTimeMillis();
201         for (final SelectionKey key : keys) {
202             final Object attachment = key.attachment();
203 
204             if (attachment instanceof SessionRequestHandle) {
205                 final SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
206                 final SessionRequestImpl sessionRequest = handle.getSessionRequest();
207                 final int timeout = sessionRequest.getConnectTimeout();
208                 if (timeout > 0) {
209                     if (handle.getRequestTime() + timeout < now) {
210                         sessionRequest.timeout();
211                     }
212                 }
213             }
214 
215         }
216     }
217 
218     @Override
219     public SessionRequest connect(
220             final SocketAddress remoteAddress,
221             final SocketAddress localAddress,
222             final Object attachment,
223             final SessionRequestCallback callback) {
224         Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
225             "I/O reactor has been shut down");
226         final SessionRequestImpl sessionRequest = new SessionRequestImpl(
227                 remoteAddress, localAddress, attachment, callback);
228         sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
229 
230         this.requestQueue.add(sessionRequest);
231         this.selector.wakeup();
232 
233         return sessionRequest;
234     }
235 
236     private void validateAddress(final SocketAddress address) throws UnknownHostException {
237         if (address == null) {
238             return;
239         }
240         if (address instanceof InetSocketAddress) {
241             final InetSocketAddress endpoint = (InetSocketAddress) address;
242             if (endpoint.isUnresolved()) {
243                 throw new UnknownHostException(endpoint.getHostName());
244             }
245         }
246     }
247 
248     private void processSessionRequests() throws IOReactorException {
249         SessionRequestImpl request;
250         while ((request = this.requestQueue.poll()) != null) {
251             if (request.isCompleted()) {
252                 continue;
253             }
254             final SocketChannel socketChannel;
255             try {
256                 socketChannel = SocketChannel.open();
257             } catch (final IOException ex) {
258                 request.failed(ex);
259                 return;
260             }
261             try {
262                 validateAddress(request.getLocalAddress());
263                 validateAddress(request.getRemoteAddress());
264 
265                 socketChannel.configureBlocking(false);
266                 prepareSocket(socketChannel.socket());
267 
268                 if (request.getLocalAddress() != null) {
269                     final Socket sock = socketChannel.socket();
270                     sock.setReuseAddress(this.config.isSoReuseAddress());
271                     sock.bind(request.getLocalAddress());
272                 }
273                 final boolean connected = socketChannel.connect(request.getRemoteAddress());
274                 if (connected) {
275                     final ChannelEntry entry = new ChannelEntry(socketChannel, request);
276                     addChannel(entry);
277                     continue;
278                 }
279             } catch (final IOException ex) {
280                 closeChannel(socketChannel);
281                 request.failed(ex);
282                 return;
283             }
284 
285             final SessionRequestHandle requestHandle = new SessionRequestHandle(request);
286             try {
287                 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
288                         requestHandle);
289                 request.setKey(key);
290             } catch (final IOException ex) {
291                 closeChannel(socketChannel);
292                 throw new IOReactorException("Failure registering channel " +
293                         "with the selector", ex);
294             }
295         }
296     }
297 
298 }