1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.net.ServerSocket;
32 import java.net.SocketAddress;
33 import java.nio.channels.CancelledKeyException;
34 import java.nio.channels.SelectionKey;
35 import java.nio.channels.ServerSocketChannel;
36 import java.nio.channels.SocketChannel;
37 import java.util.Collections;
38 import java.util.HashSet;
39 import java.util.Iterator;
40 import java.util.Queue;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentLinkedQueue;
43 import java.util.concurrent.ThreadFactory;
44
45 import org.apache.http.annotation.ThreadSafe;
46 import org.apache.http.nio.reactor.IOReactorException;
47 import org.apache.http.nio.reactor.IOReactorStatus;
48 import org.apache.http.nio.reactor.ListenerEndpoint;
49 import org.apache.http.nio.reactor.ListeningIOReactor;
50 import org.apache.http.params.HttpParams;
51
52
53
54
55
56
57
58
59 @ThreadSafe
60 public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
61 implements ListeningIOReactor {
62
63 private final Queue<ListenerEndpointImpl> requestQueue;
64 private final Set<ListenerEndpointImpl> endpoints;
65 private final Set<SocketAddress> pausedEndpoints;
66
67 private volatile boolean paused;
68
69
70
71
72
73
74
75
76
77
78
79 public DefaultListeningIOReactor(
80 final IOReactorConfig config,
81 final ThreadFactory threadFactory) throws IOReactorException {
82 super(config, threadFactory);
83 this.requestQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
84 this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
85 this.pausedEndpoints = new HashSet<SocketAddress>();
86 }
87
88
89
90
91
92
93
94
95
96
97 public DefaultListeningIOReactor(final IOReactorConfig config) throws IOReactorException {
98 this(config, null);
99 }
100
101
102
103
104
105
106
107
108 public DefaultListeningIOReactor() throws IOReactorException {
109 this(null, null);
110 }
111
112
113
114
115 @Deprecated
116 public DefaultListeningIOReactor(
117 int workerCount,
118 final ThreadFactory threadFactory,
119 final HttpParams params) throws IOReactorException {
120 this(convert(workerCount, params), threadFactory);
121 }
122
123
124
125
126 @Deprecated
127 public DefaultListeningIOReactor(
128 int workerCount,
129 final HttpParams params) throws IOReactorException {
130 this(convert(workerCount, params), null);
131 }
132
133 @Override
134 protected void cancelRequests() throws IOReactorException {
135 ListenerEndpointImpl request;
136 while ((request = this.requestQueue.poll()) != null) {
137 request.cancel();
138 }
139 }
140
141 @Override
142 protected void processEvents(int readyCount) throws IOReactorException {
143 if (!this.paused) {
144 processSessionRequests();
145 }
146
147 if (readyCount > 0) {
148 Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
149 for (Iterator<SelectionKey> it = selectedKeys.iterator(); it.hasNext(); ) {
150
151 SelectionKey key = it.next();
152 processEvent(key);
153
154 }
155 selectedKeys.clear();
156 }
157 }
158
159 private void processEvent(final SelectionKey key)
160 throws IOReactorException {
161 try {
162
163 if (key.isAcceptable()) {
164
165 ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
166 for (;;) {
167 SocketChannel socketChannel = null;
168 try {
169 socketChannel = serverChannel.accept();
170 } catch (IOException ex) {
171 if (this.exceptionHandler == null ||
172 !this.exceptionHandler.handle(ex)) {
173 throw new IOReactorException(
174 "Failure accepting connection", ex);
175 }
176 }
177 if (socketChannel == null) {
178 break;
179 }
180 try {
181 prepareSocket(socketChannel.socket());
182 } catch (IOException ex) {
183 if (this.exceptionHandler == null ||
184 !this.exceptionHandler.handle(ex)) {
185 throw new IOReactorException(
186 "Failure initalizing socket", ex);
187 }
188 }
189 ChannelEntry entry = new ChannelEntry(socketChannel);
190 addChannel(entry);
191 }
192 }
193
194 } catch (CancelledKeyException ex) {
195 ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
196 this.endpoints.remove(endpoint);
197 key.attach(null);
198 }
199 }
200
201 private ListenerEndpointImpl createEndpoint(final SocketAddress address) {
202 ListenerEndpointImpl endpoint = new ListenerEndpointImpl(
203 address,
204 new ListenerEndpointClosedCallback() {
205
206 public void endpointClosed(final ListenerEndpoint endpoint) {
207 endpoints.remove(endpoint);
208 }
209
210 });
211 return endpoint;
212 }
213
214 public ListenerEndpoint listen(final SocketAddress address) {
215 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
216 throw new IllegalStateException("I/O reactor has been shut down");
217 }
218 ListenerEndpointImpl request = createEndpoint(address);
219 this.requestQueue.add(request);
220 this.selector.wakeup();
221 return request;
222 }
223
224 private void processSessionRequests() throws IOReactorException {
225 ListenerEndpointImpl request;
226 while ((request = this.requestQueue.poll()) != null) {
227 SocketAddress address = request.getAddress();
228 ServerSocketChannel serverChannel;
229 try {
230 serverChannel = ServerSocketChannel.open();
231 } catch (IOException ex) {
232 throw new IOReactorException("Failure opening server socket", ex);
233 }
234 try {
235 ServerSocket socket = serverChannel.socket();
236 socket.setReuseAddress(this.config.isSoReuseAddress());
237 serverChannel.configureBlocking(false);
238 socket.bind(address);
239 } catch (IOException ex) {
240 closeChannel(serverChannel);
241 request.failed(ex);
242 if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
243 throw new IOReactorException("Failure binding socket to address "
244 + address, ex);
245 } else {
246 return;
247 }
248 }
249 try {
250 SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
251 key.attach(request);
252 request.setKey(key);
253 } catch (IOException ex) {
254 closeChannel(serverChannel);
255 throw new IOReactorException("Failure registering channel " +
256 "with the selector", ex);
257 }
258
259 this.endpoints.add(request);
260 request.completed(serverChannel.socket().getLocalSocketAddress());
261 }
262 }
263
264 public Set<ListenerEndpoint> getEndpoints() {
265 Set<ListenerEndpoint> set = new HashSet<ListenerEndpoint>();
266 synchronized (this.endpoints) {
267 Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
268 while (it.hasNext()) {
269 ListenerEndpoint endpoint = it.next();
270 if (!endpoint.isClosed()) {
271 set.add(endpoint);
272 } else {
273 it.remove();
274 }
275 }
276 }
277 return set;
278 }
279
280 public void pause() throws IOException {
281 if (this.paused) {
282 return;
283 }
284 this.paused = true;
285 synchronized (this.endpoints) {
286 Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
287 while (it.hasNext()) {
288 ListenerEndpoint endpoint = it.next();
289 if (!endpoint.isClosed()) {
290 endpoint.close();
291 this.pausedEndpoints.add(endpoint.getAddress());
292 }
293 }
294 this.endpoints.clear();
295 }
296 }
297
298 public void resume() throws IOException {
299 if (!this.paused) {
300 return;
301 }
302 this.paused = false;
303 for (SocketAddress address: this.pausedEndpoints) {
304 ListenerEndpointImpl request = createEndpoint(address);
305 this.requestQueue.add(request);
306 }
307 this.pausedEndpoints.clear();
308 this.selector.wakeup();
309 }
310
311 }