1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.UnknownHostException;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.SelectionKey;
37 import java.nio.channels.SocketChannel;
38 import java.util.Iterator;
39 import java.util.Queue;
40 import java.util.Set;
41 import java.util.concurrent.ConcurrentLinkedQueue;
42 import java.util.concurrent.ThreadFactory;
43
44 import org.apache.http.annotation.ThreadSafe;
45 import org.apache.http.nio.reactor.ConnectingIOReactor;
46 import org.apache.http.nio.reactor.IOReactorException;
47 import org.apache.http.nio.reactor.IOReactorStatus;
48 import org.apache.http.nio.reactor.SessionRequest;
49 import org.apache.http.nio.reactor.SessionRequestCallback;
50 import org.apache.http.params.HttpParams;
51
52
53
54
55
56
57
58
59 @ThreadSafe
60 public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
61 implements ConnectingIOReactor {
62
63 private final Queue<SessionRequestImpl> requestQueue;
64
65 private long lastTimeoutCheck;
66
67
68
69
70
71
72
73
74
75
76
77 public DefaultConnectingIOReactor(
78 final IOReactorConfig config,
79 final ThreadFactory threadFactory) throws IOReactorException {
80 super(config, threadFactory);
81 this.requestQueue = new ConcurrentLinkedQueue<SessionRequestImpl>();
82 this.lastTimeoutCheck = System.currentTimeMillis();
83 }
84
85
86
87
88
89
90
91
92
93
94 public DefaultConnectingIOReactor(final IOReactorConfig config) throws IOReactorException {
95 this(config, null);
96 }
97
98
99
100
101
102
103
104
105 public DefaultConnectingIOReactor() throws IOReactorException {
106 this(null, null);
107 }
108
109
110
111
112 @Deprecated
113 public DefaultConnectingIOReactor(
114 int workerCount,
115 final ThreadFactory threadFactory,
116 final HttpParams params) throws IOReactorException {
117 this(convert(workerCount, params), threadFactory);
118 }
119
120
121
122
123 @Deprecated
124 public DefaultConnectingIOReactor(
125 int workerCount,
126 final HttpParams params) throws IOReactorException {
127 this(convert(workerCount, params), null);
128 }
129
130 @Override
131 protected void cancelRequests() throws IOReactorException {
132 SessionRequestImpl request;
133 while ((request = this.requestQueue.poll()) != null) {
134 request.cancel();
135 }
136 }
137
138 @Override
139 protected void processEvents(int readyCount) throws IOReactorException {
140 processSessionRequests();
141
142 if (readyCount > 0) {
143 Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
144 for (Iterator<SelectionKey> it = selectedKeys.iterator(); it.hasNext(); ) {
145
146 SelectionKey key = it.next();
147 processEvent(key);
148
149 }
150 selectedKeys.clear();
151 }
152
153 long currentTime = System.currentTimeMillis();
154 if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
155 this.lastTimeoutCheck = currentTime;
156 Set<SelectionKey> keys = this.selector.keys();
157 processTimeouts(keys);
158 }
159 }
160
161 private void processEvent(final SelectionKey key) {
162 try {
163
164 if (key.isConnectable()) {
165
166 SocketChannel channel = (SocketChannel) key.channel();
167
168 SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
169 SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
170
171
172 try {
173 channel.finishConnect();
174 } catch (IOException ex) {
175 sessionRequest.failed(ex);
176 }
177 key.cancel();
178 if (channel.isConnected()) {
179 try {
180 try {
181 prepareSocket(channel.socket());
182 } catch (IOException ex) {
183 if (this.exceptionHandler == null
184 || !this.exceptionHandler.handle(ex)) {
185 throw new IOReactorException(
186 "Failure initalizing socket", ex);
187 }
188 }
189 ChannelEntry entry = new ChannelEntry(channel, sessionRequest);
190 addChannel(entry);
191 } catch (IOException ex) {
192 sessionRequest.failed(ex);
193 }
194 }
195 }
196
197 } catch (CancelledKeyException ex) {
198 key.attach(null);
199 }
200 }
201
202 private void processTimeouts(final Set<SelectionKey> keys) {
203 long now = System.currentTimeMillis();
204 for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext();) {
205 SelectionKey key = it.next();
206 Object attachment = key.attachment();
207
208 if (attachment instanceof SessionRequestHandle) {
209 SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
210 SessionRequestImpl sessionRequest = handle.getSessionRequest();
211 int timeout = sessionRequest.getConnectTimeout();
212 if (timeout > 0) {
213 if (handle.getRequestTime() + timeout < now) {
214 sessionRequest.timeout();
215 }
216 }
217 }
218
219 }
220 }
221
222 public SessionRequest connect(
223 final SocketAddress remoteAddress,
224 final SocketAddress localAddress,
225 final Object attachment,
226 final SessionRequestCallback callback) {
227
228 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
229 throw new IllegalStateException("I/O reactor has been shut down");
230 }
231 SessionRequestImpl sessionRequest = new SessionRequestImpl(
232 remoteAddress, localAddress, attachment, callback);
233 sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
234
235 this.requestQueue.add(sessionRequest);
236 this.selector.wakeup();
237
238 return sessionRequest;
239 }
240
241 private void validateAddress(final SocketAddress address) throws UnknownHostException {
242 if (address == null) {
243 return;
244 }
245 if (address instanceof InetSocketAddress) {
246 InetSocketAddress endpoint = (InetSocketAddress) address;
247 if (endpoint.isUnresolved()) {
248 throw new UnknownHostException(endpoint.getHostName());
249 }
250 }
251 }
252
253 private void processSessionRequests() throws IOReactorException {
254 SessionRequestImpl request;
255 while ((request = this.requestQueue.poll()) != null) {
256 if (request.isCompleted()) {
257 continue;
258 }
259 SocketChannel socketChannel;
260 try {
261 socketChannel = SocketChannel.open();
262 } catch (IOException ex) {
263 throw new IOReactorException("Failure opening socket", ex);
264 }
265 try {
266 socketChannel.configureBlocking(false);
267 validateAddress(request.getLocalAddress());
268 validateAddress(request.getRemoteAddress());
269
270 if (request.getLocalAddress() != null) {
271 Socket sock = socketChannel.socket();
272 sock.setReuseAddress(this.config.isSoReuseAddress());
273 sock.bind(request.getLocalAddress());
274 }
275 boolean connected = socketChannel.connect(request.getRemoteAddress());
276 if (connected) {
277 prepareSocket(socketChannel.socket());
278 ChannelEntry entry = new ChannelEntry(socketChannel, request);
279 addChannel(entry);
280 return;
281 }
282 } catch (IOException ex) {
283 closeChannel(socketChannel);
284 request.failed(ex);
285 return;
286 }
287
288 SessionRequestHandle requestHandle = new SessionRequestHandle(request);
289 try {
290 SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
291 requestHandle);
292 request.setKey(key);
293 } catch (IOException ex) {
294 closeChannel(socketChannel);
295 throw new IOReactorException("Failure registering channel " +
296 "with the selector", ex);
297 }
298 }
299 }
300
301 }