1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.net.ServerSocket;
32 import java.net.SocketAddress;
33 import java.nio.channels.CancelledKeyException;
34 import java.nio.channels.SelectionKey;
35 import java.nio.channels.ServerSocketChannel;
36 import java.nio.channels.SocketChannel;
37 import java.util.Collections;
38 import java.util.HashSet;
39 import java.util.Iterator;
40 import java.util.Queue;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentLinkedQueue;
43 import java.util.concurrent.ThreadFactory;
44
45 import org.apache.http.annotation.ThreadSafe;
46 import org.apache.http.nio.reactor.IOReactorException;
47 import org.apache.http.nio.reactor.IOReactorStatus;
48 import org.apache.http.nio.reactor.ListenerEndpoint;
49 import org.apache.http.nio.reactor.ListeningIOReactor;
50 import org.apache.http.params.HttpParams;
51 import org.apache.http.util.Asserts;
52
53
54
55
56
57
58
59
60 @SuppressWarnings("deprecation")
61 @ThreadSafe
62 public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
63 implements ListeningIOReactor {
64
65 private final Queue<ListenerEndpointImpl> requestQueue;
66 private final Set<ListenerEndpointImpl> endpoints;
67 private final Set<SocketAddress> pausedEndpoints;
68
69 private volatile boolean paused;
70
71
72
73
74
75
76
77
78
79
80
81 public DefaultListeningIOReactor(
82 final IOReactorConfig config,
83 final ThreadFactory threadFactory) throws IOReactorException {
84 super(config, threadFactory);
85 this.requestQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
86 this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
87 this.pausedEndpoints = new HashSet<SocketAddress>();
88 }
89
90
91
92
93
94
95
96
97
98
99 public DefaultListeningIOReactor(final IOReactorConfig config) throws IOReactorException {
100 this(config, null);
101 }
102
103
104
105
106
107
108
109
110 public DefaultListeningIOReactor() throws IOReactorException {
111 this(null, null);
112 }
113
114
115
116
117 @Deprecated
118 public DefaultListeningIOReactor(
119 final int workerCount,
120 final ThreadFactory threadFactory,
121 final HttpParams params) throws IOReactorException {
122 this(convert(workerCount, params), threadFactory);
123 }
124
125
126
127
128 @Deprecated
129 public DefaultListeningIOReactor(
130 final int workerCount,
131 final HttpParams params) throws IOReactorException {
132 this(convert(workerCount, params), null);
133 }
134
135 @Override
136 protected void cancelRequests() throws IOReactorException {
137 ListenerEndpointImpl request;
138 while ((request = this.requestQueue.poll()) != null) {
139 request.cancel();
140 }
141 }
142
143 @Override
144 protected void processEvents(final int readyCount) throws IOReactorException {
145 if (!this.paused) {
146 processSessionRequests();
147 }
148
149 if (readyCount > 0) {
150 final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
151 for (final SelectionKey key : selectedKeys) {
152
153 processEvent(key);
154
155 }
156 selectedKeys.clear();
157 }
158 }
159
160 private void processEvent(final SelectionKey key)
161 throws IOReactorException {
162 try {
163
164 if (key.isAcceptable()) {
165
166 final ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
167 for (;;) {
168 SocketChannel socketChannel = null;
169 try {
170 socketChannel = serverChannel.accept();
171 } catch (final IOException ex) {
172 if (this.exceptionHandler == null ||
173 !this.exceptionHandler.handle(ex)) {
174 throw new IOReactorException(
175 "Failure accepting connection", ex);
176 }
177 }
178 if (socketChannel == null) {
179 break;
180 }
181 try {
182 prepareSocket(socketChannel.socket());
183 } catch (final IOException ex) {
184 if (this.exceptionHandler == null ||
185 !this.exceptionHandler.handle(ex)) {
186 throw new IOReactorException(
187 "Failure initalizing socket", ex);
188 }
189 }
190 final ChannelEntry entry = new ChannelEntry(socketChannel);
191 addChannel(entry);
192 }
193 }
194
195 } catch (final CancelledKeyException ex) {
196 final ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
197 this.endpoints.remove(endpoint);
198 key.attach(null);
199 }
200 }
201
202 private ListenerEndpointImpl createEndpoint(final SocketAddress address) {
203 final ListenerEndpointImpl endpoint = new ListenerEndpointImpl(
204 address,
205 new ListenerEndpointClosedCallback() {
206
207 public void endpointClosed(final ListenerEndpoint endpoint) {
208 endpoints.remove(endpoint);
209 }
210
211 });
212 return endpoint;
213 }
214
215 public ListenerEndpoint listen(final SocketAddress address) {
216 Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
217 "I/O reactor has been shut down");
218 final ListenerEndpointImpl request = createEndpoint(address);
219 this.requestQueue.add(request);
220 this.selector.wakeup();
221 return request;
222 }
223
224 private void processSessionRequests() throws IOReactorException {
225 ListenerEndpointImpl request;
226 while ((request = this.requestQueue.poll()) != null) {
227 final SocketAddress address = request.getAddress();
228 ServerSocketChannel serverChannel;
229 try {
230 serverChannel = ServerSocketChannel.open();
231 } catch (final IOException ex) {
232 throw new IOReactorException("Failure opening server socket", ex);
233 }
234 try {
235 final ServerSocket socket = serverChannel.socket();
236 socket.setReuseAddress(this.config.isSoReuseAddress());
237 serverChannel.configureBlocking(false);
238 socket.bind(address);
239 } catch (final IOException ex) {
240 closeChannel(serverChannel);
241 request.failed(ex);
242 if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
243 throw new IOReactorException("Failure binding socket to address "
244 + address, ex);
245 } else {
246 return;
247 }
248 }
249 try {
250 final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
251 key.attach(request);
252 request.setKey(key);
253 } catch (final IOException ex) {
254 closeChannel(serverChannel);
255 throw new IOReactorException("Failure registering channel " +
256 "with the selector", ex);
257 }
258
259 this.endpoints.add(request);
260 request.completed(serverChannel.socket().getLocalSocketAddress());
261 }
262 }
263
264 public Set<ListenerEndpoint> getEndpoints() {
265 final Set<ListenerEndpoint> set = new HashSet<ListenerEndpoint>();
266 synchronized (this.endpoints) {
267 final Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
268 while (it.hasNext()) {
269 final ListenerEndpoint endpoint = it.next();
270 if (!endpoint.isClosed()) {
271 set.add(endpoint);
272 } else {
273 it.remove();
274 }
275 }
276 }
277 return set;
278 }
279
280 public void pause() throws IOException {
281 if (this.paused) {
282 return;
283 }
284 this.paused = true;
285 synchronized (this.endpoints) {
286 final Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
287 while (it.hasNext()) {
288 final ListenerEndpoint endpoint = it.next();
289 if (!endpoint.isClosed()) {
290 endpoint.close();
291 this.pausedEndpoints.add(endpoint.getAddress());
292 }
293 }
294 this.endpoints.clear();
295 }
296 }
297
298 public void resume() throws IOException {
299 if (!this.paused) {
300 return;
301 }
302 this.paused = false;
303 for (final SocketAddress address: this.pausedEndpoints) {
304 final ListenerEndpointImpl request = createEndpoint(address);
305 this.requestQueue.add(request);
306 }
307 this.pausedEndpoints.clear();
308 this.selector.wakeup();
309 }
310
311 }