View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.channels.SocketChannel;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.Deque;
36  import java.util.List;
37  import java.util.Set;
38  import java.util.concurrent.ConcurrentLinkedDeque;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.atomic.AtomicInteger;
42  
43  import org.apache.hc.core5.concurrent.DefaultThreadFactory;
44  import org.apache.hc.core5.concurrent.FutureCallback;
45  import org.apache.hc.core5.function.Callback;
46  import org.apache.hc.core5.function.Decorator;
47  import org.apache.hc.core5.io.ShutdownType;
48  import org.apache.hc.core5.net.NamedEndpoint;
49  import org.apache.hc.core5.util.Args;
50  import org.apache.hc.core5.util.TimeValue;
51  
52  /**
53   * Multi-core I/O reactor that can ask as both {@link ConnectionInitiator}
54   * and {@link ConnectionAcceptor}. Internally this I/O reactor distributes newly created
55   * I/O session equally across multiple I/O worker threads for a more optimal resource
56   * utilization and a better I/O performance. Usually it is recommended to have
57   * one worker I/O reactor per physical CPU core.
58   *
59   * @since 4.0
60   */
61  public class DefaultListeningIOReactor implements IOReactorService, ConnectionInitiator, ConnectionAcceptor {
62  
63      private final static ThreadFactory DISPATCH_THREAD_FACTORY = new DefaultThreadFactory("I/O server dispatch", true);
64      private final static ThreadFactory LISTENER_THREAD_FACTORY = new DefaultThreadFactory("I/O listener", true);
65  
66      private final Deque<ExceptionEvent> auditLog;
67      private final int workerCount;
68      private final SingleCoreIOReactor[] dispatchers;
69      private final SingleCoreListeningIOReactor listener;
70      private final MultiCoreIOReactor ioReactor;
71      private final AtomicInteger currentWorker;
72  
73      /**
74       * Creates an instance of DefaultListeningIOReactor with the given configuration.
75       *
76       * @param eventHandlerFactory the factory to create I/O event handlers.
77       * @param ioReactorConfig I/O reactor configuration.
78       * @param listenerThreadFactory the factory to create listener thread.
79       *   Can be {@code null}.
80       *
81       * @since 5.0
82       */
83      public DefaultListeningIOReactor(
84              final IOEventHandlerFactory eventHandlerFactory,
85              final IOReactorConfig ioReactorConfig,
86              final ThreadFactory dispatchThreadFactory,
87              final ThreadFactory listenerThreadFactory,
88              final Decorator<IOSession> ioSessionDecorator,
89              final IOSessionListener sessionListener,
90              final Callback<IOSession> sessionShutdownCallback) {
91          Args.notNull(eventHandlerFactory, "Event handler factory");
92          this.auditLog = new ConcurrentLinkedDeque<>();
93          this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
94          this.dispatchers = new SingleCoreIOReactor[workerCount];
95          final Thread[] threads = new Thread[workerCount + 1];
96          for (int i = 0; i < this.dispatchers.length; i++) {
97              final SingleCoreIOReactor dispatcher = new SingleCoreIOReactor(
98                      auditLog,
99                      eventHandlerFactory,
100                     ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT,
101                     ioSessionDecorator,
102                     sessionListener,
103                     sessionShutdownCallback);
104             this.dispatchers[i] = dispatcher;
105             threads[i + 1] = (dispatchThreadFactory != null ? dispatchThreadFactory : DISPATCH_THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher));
106         }
107         final IOReactor[] ioReactors = new IOReactor[this.workerCount + 1];
108         System.arraycopy(this.dispatchers, 0, ioReactors, 1, this.workerCount);
109         this.listener = new SingleCoreListeningIOReactor(auditLog, ioReactorConfig, new Callback<SocketChannel>() {
110 
111             @Override
112             public void execute(final SocketChannel channel) {
113                 enqueueChannel(channel);
114             }
115 
116         });
117         ioReactors[0] = this.listener;
118         threads[0] = (listenerThreadFactory != null ? listenerThreadFactory : LISTENER_THREAD_FACTORY).newThread(new IOReactorWorker(listener));
119 
120         this.ioReactor = new MultiCoreIOReactor(ioReactors, threads);
121         this.currentWorker = new AtomicInteger(0);
122     }
123 
124     /**
125      * Creates an instance of DefaultListeningIOReactor with the given configuration.
126      *
127      * @param eventHandlerFactory the factory to create I/O event handlers.
128      * @param config I/O reactor configuration.
129      *   Can be {@code null}.
130      *
131      * @since 5.0
132      */
133     public DefaultListeningIOReactor(
134             final IOEventHandlerFactory eventHandlerFactory,
135             final IOReactorConfig config,
136             final Callback<IOSession> sessionShutdownCallback) {
137         this(eventHandlerFactory, config, null, null, null, null, sessionShutdownCallback);
138     }
139 
140     /**
141      * Creates an instance of DefaultListeningIOReactor with default configuration.
142      *
143      * @param eventHandlerFactory the factory to create I/O event handlers.
144      *
145      * @since 5.0
146      */
147     public DefaultListeningIOReactor(final IOEventHandlerFactory eventHandlerFactory) {
148         this(eventHandlerFactory, null, null);
149     }
150 
151     @Override
152     public void start() {
153         ioReactor.start();
154     }
155 
156     @Override
157     public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
158         return listener.listen(address, callback);
159     }
160 
161     public Future<ListenerEndpoint> listen(final SocketAddress address) {
162         return listen(address, null);
163     }
164 
165     @Override
166     public Set<ListenerEndpoint> getEndpoints() {
167         return listener.getEndpoints();
168     }
169 
170     @Override
171     public void pause() throws IOException {
172         listener.pause();
173     }
174 
175     @Override
176     public void resume() throws IOException {
177         listener.resume();
178     }
179 
180     @Override
181     public IOReactorStatus getStatus() {
182         return ioReactor.getStatus();
183     }
184 
185     @Override
186     public List<ExceptionEvent> getExceptionLog() {
187         return auditLog.isEmpty() ? Collections.<ExceptionEvent>emptyList() : new ArrayList<>(auditLog);
188     }
189 
190     private void enqueueChannel(final SocketChannel socketChannel) {
191         final int i = Math.abs(currentWorker.incrementAndGet() % workerCount);
192         try {
193             dispatchers[i].enqueueChannel(socketChannel);
194         } catch (final IOReactorShutdownException ex) {
195             initiateShutdown();
196         }
197     }
198 
199     @Override
200     public Future<IOSession> connect(
201             final NamedEndpoint remoteEndpoint,
202             final SocketAddress remoteAddress,
203             final SocketAddress localAddress,
204             final TimeValue timeout,
205             final Object attachment,
206             final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
207         Args.notNull(remoteEndpoint, "Remote endpoint");
208         if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
209             throw new IOReactorShutdownException("I/O reactor has been shut down");
210         }
211         final int i = Math.abs(currentWorker.incrementAndGet() % workerCount);
212         try {
213             return dispatchers[i].connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback);
214         } catch (final IOReactorShutdownException ex) {
215             initiateShutdown();
216             throw ex;
217         }
218     }
219 
220     @Override
221     public void initiateShutdown() {
222         ioReactor.initiateShutdown();
223     }
224 
225     @Override
226     public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
227         ioReactor.awaitShutdown(waitTime);
228     }
229 
230     @Override
231     public void shutdown(final ShutdownType shutdownType) {
232         ioReactor.shutdown(shutdownType);
233     }
234 
235     @Override
236     public void close() throws IOException {
237         ioReactor.close();
238     }
239 
240 }