1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.UnknownHostException;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.SelectionKey;
37 import java.nio.channels.SocketChannel;
38 import java.util.Queue;
39 import java.util.Set;
40 import java.util.concurrent.ConcurrentLinkedQueue;
41 import java.util.concurrent.ThreadFactory;
42
43 import org.apache.http.annotation.ThreadSafe;
44 import org.apache.http.nio.reactor.ConnectingIOReactor;
45 import org.apache.http.nio.reactor.IOReactorException;
46 import org.apache.http.nio.reactor.IOReactorStatus;
47 import org.apache.http.nio.reactor.SessionRequest;
48 import org.apache.http.nio.reactor.SessionRequestCallback;
49 import org.apache.http.params.HttpParams;
50 import org.apache.http.util.Asserts;
51
52
53
54
55
56
57
58
59 @SuppressWarnings("deprecation")
60 @ThreadSafe
61 public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
62 implements ConnectingIOReactor {
63
64 private final Queue<SessionRequestImpl> requestQueue;
65
66 private long lastTimeoutCheck;
67
68
69
70
71
72
73
74
75
76
77
78 public DefaultConnectingIOReactor(
79 final IOReactorConfig config,
80 final ThreadFactory threadFactory) throws IOReactorException {
81 super(config, threadFactory);
82 this.requestQueue = new ConcurrentLinkedQueue<SessionRequestImpl>();
83 this.lastTimeoutCheck = System.currentTimeMillis();
84 }
85
86
87
88
89
90
91
92
93
94
95 public DefaultConnectingIOReactor(final IOReactorConfig config) throws IOReactorException {
96 this(config, null);
97 }
98
99
100
101
102
103
104
105
106 public DefaultConnectingIOReactor() throws IOReactorException {
107 this(null, null);
108 }
109
110
111
112
113 @Deprecated
114 public DefaultConnectingIOReactor(
115 final int workerCount,
116 final ThreadFactory threadFactory,
117 final HttpParams params) throws IOReactorException {
118 this(convert(workerCount, params), threadFactory);
119 }
120
121
122
123
124 @Deprecated
125 public DefaultConnectingIOReactor(
126 final int workerCount,
127 final HttpParams params) throws IOReactorException {
128 this(convert(workerCount, params), null);
129 }
130
131 @Override
132 protected void cancelRequests() throws IOReactorException {
133 SessionRequestImpl request;
134 while ((request = this.requestQueue.poll()) != null) {
135 request.cancel();
136 }
137 }
138
139 @Override
140 protected void processEvents(final int readyCount) throws IOReactorException {
141 processSessionRequests();
142
143 if (readyCount > 0) {
144 final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
145 for (final SelectionKey key : selectedKeys) {
146
147 processEvent(key);
148
149 }
150 selectedKeys.clear();
151 }
152
153 final long currentTime = System.currentTimeMillis();
154 if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
155 this.lastTimeoutCheck = currentTime;
156 final Set<SelectionKey> keys = this.selector.keys();
157 processTimeouts(keys);
158 }
159 }
160
161 private void processEvent(final SelectionKey key) {
162 try {
163
164 if (key.isConnectable()) {
165
166 final SocketChannel channel = (SocketChannel) key.channel();
167
168 final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
169 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
170
171
172 try {
173 channel.finishConnect();
174 } catch (final IOException ex) {
175 sessionRequest.failed(ex);
176 }
177 key.cancel();
178 if (channel.isConnected()) {
179 try {
180 try {
181 prepareSocket(channel.socket());
182 } catch (final IOException ex) {
183 if (this.exceptionHandler == null
184 || !this.exceptionHandler.handle(ex)) {
185 throw new IOReactorException(
186 "Failure initalizing socket", ex);
187 }
188 }
189 final ChannelEntry entry = new ChannelEntry(channel, sessionRequest);
190 addChannel(entry);
191 } catch (final IOException ex) {
192 sessionRequest.failed(ex);
193 }
194 }
195 }
196
197 } catch (final CancelledKeyException ex) {
198 key.attach(null);
199 }
200 }
201
202 private void processTimeouts(final Set<SelectionKey> keys) {
203 final long now = System.currentTimeMillis();
204 for (final SelectionKey key : keys) {
205 final Object attachment = key.attachment();
206
207 if (attachment instanceof SessionRequestHandle) {
208 final SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
209 final SessionRequestImpl sessionRequest = handle.getSessionRequest();
210 final int timeout = sessionRequest.getConnectTimeout();
211 if (timeout > 0) {
212 if (handle.getRequestTime() + timeout < now) {
213 sessionRequest.timeout();
214 }
215 }
216 }
217
218 }
219 }
220
221 public SessionRequest connect(
222 final SocketAddress remoteAddress,
223 final SocketAddress localAddress,
224 final Object attachment,
225 final SessionRequestCallback callback) {
226 Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
227 "I/O reactor has been shut down");
228 final SessionRequestImpl sessionRequest = new SessionRequestImpl(
229 remoteAddress, localAddress, attachment, callback);
230 sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
231
232 this.requestQueue.add(sessionRequest);
233 this.selector.wakeup();
234
235 return sessionRequest;
236 }
237
238 private void validateAddress(final SocketAddress address) throws UnknownHostException {
239 if (address == null) {
240 return;
241 }
242 if (address instanceof InetSocketAddress) {
243 final InetSocketAddress endpoint = (InetSocketAddress) address;
244 if (endpoint.isUnresolved()) {
245 throw new UnknownHostException(endpoint.getHostName());
246 }
247 }
248 }
249
250 private void processSessionRequests() throws IOReactorException {
251 SessionRequestImpl request;
252 while ((request = this.requestQueue.poll()) != null) {
253 if (request.isCompleted()) {
254 continue;
255 }
256 SocketChannel socketChannel;
257 try {
258 socketChannel = SocketChannel.open();
259 } catch (final IOException ex) {
260 throw new IOReactorException("Failure opening socket", ex);
261 }
262 try {
263 socketChannel.configureBlocking(false);
264 validateAddress(request.getLocalAddress());
265 validateAddress(request.getRemoteAddress());
266
267 if (request.getLocalAddress() != null) {
268 final Socket sock = socketChannel.socket();
269 sock.setReuseAddress(this.config.isSoReuseAddress());
270 sock.bind(request.getLocalAddress());
271 }
272 final boolean connected = socketChannel.connect(request.getRemoteAddress());
273 if (connected) {
274 prepareSocket(socketChannel.socket());
275 final ChannelEntry entry = new ChannelEntry(socketChannel, request);
276 addChannel(entry);
277 return;
278 }
279 } catch (final IOException ex) {
280 closeChannel(socketChannel);
281 request.failed(ex);
282 return;
283 }
284
285 final SessionRequestHandle requestHandle = new SessionRequestHandle(request);
286 try {
287 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
288 requestHandle);
289 request.setKey(key);
290 } catch (final IOException ex) {
291 closeChannel(socketChannel);
292 throw new IOReactorException("Failure registering channel " +
293 "with the selector", ex);
294 }
295 }
296 }
297
298 }