View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.util.ArrayList;
33  import java.util.Collections;
34  import java.util.Deque;
35  import java.util.List;
36  import java.util.concurrent.ConcurrentLinkedDeque;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.ThreadFactory;
39  import java.util.concurrent.atomic.AtomicInteger;
40  
41  import org.apache.hc.core5.concurrent.DefaultThreadFactory;
42  import org.apache.hc.core5.concurrent.FutureCallback;
43  import org.apache.hc.core5.function.Callback;
44  import org.apache.hc.core5.function.Decorator;
45  import org.apache.hc.core5.io.ShutdownType;
46  import org.apache.hc.core5.net.NamedEndpoint;
47  import org.apache.hc.core5.util.Args;
48  import org.apache.hc.core5.util.TimeValue;
49  
50  /**
51   * Multi-core I/O reactor that can act as {@link ConnectionInitiator} Internally
52   * this I/O reactor distributes newly created I/O session equally across multiple
53   * I/O worker threads for a more optimal resource utilization and a better
54   * I/O performance. Usually it is recommended to have one worker I/O reactor
55   * per physical CPU core.
56   *
57   * @since 4.0
58   */
59  public class DefaultConnectingIOReactor implements IOReactorService, ConnectionInitiator {
60  
61      private final Deque<ExceptionEvent> auditLog;
62      private final int workerCount;
63      private final SingleCoreIOReactor[] dispatchers;
64      private final MultiCoreIOReactor ioReactor;
65      private final AtomicInteger currentWorker;
66  
67      private final static ThreadFactory THREAD_FACTORY = new DefaultThreadFactory("I/O client dispatch", true);
68  
69      public DefaultConnectingIOReactor(
70              final IOEventHandlerFactory eventHandlerFactory,
71              final IOReactorConfig ioReactorConfig,
72              final ThreadFactory threadFactory,
73              final Decorator<IOSession> ioSessionDecorator,
74              final IOSessionListener sessionListener,
75              final Callback<IOSession> sessionShutdownCallback) {
76          Args.notNull(eventHandlerFactory, "Event handler factory");
77          this.auditLog = new ConcurrentLinkedDeque<>();
78          this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
79          this.dispatchers = new SingleCoreIOReactor[workerCount];
80          final Thread[] threads = new Thread[workerCount];
81          for (int i = 0; i < this.dispatchers.length; i++) {
82              final SingleCoreIOReactor dispatcher = new SingleCoreIOReactor(
83                      auditLog,
84                      eventHandlerFactory,
85                      ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT,
86                      ioSessionDecorator,
87                      sessionListener,
88                      sessionShutdownCallback);
89              this.dispatchers[i] = dispatcher;
90              threads[i] = (threadFactory != null ? threadFactory : THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher));
91          }
92          this.ioReactor = new MultiCoreIOReactor(this.dispatchers, threads);
93          this.currentWorker = new AtomicInteger(0);
94      }
95  
96      public DefaultConnectingIOReactor(
97              final IOEventHandlerFactory eventHandlerFactory,
98              final IOReactorConfig config,
99              final Callback<IOSession> sessionShutdownCallback) {
100         this(eventHandlerFactory, config, null, null, null, sessionShutdownCallback);
101     }
102 
103     /**
104      * Creates an instance of DefaultConnectingIOReactor with default configuration.
105      *
106      * @since 5.0
107      */
108     public DefaultConnectingIOReactor(final IOEventHandlerFactory eventHandlerFactory) {
109         this(eventHandlerFactory, null, null);
110     }
111 
112     @Override
113     public void start() {
114         ioReactor.start();
115     }
116 
117     @Override
118     public IOReactorStatus getStatus() {
119         return ioReactor.getStatus();
120     }
121 
122     @Override
123     public List<ExceptionEvent> getExceptionLog() {
124         return auditLog.isEmpty() ? Collections.<ExceptionEvent>emptyList() : new ArrayList<>(auditLog);
125     }
126 
127     @Override
128     public Future<IOSession> connect(
129             final NamedEndpoint remoteEndpoint,
130             final SocketAddress remoteAddress,
131             final SocketAddress localAddress,
132             final TimeValue timeout,
133             final Object attachment,
134             final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
135         Args.notNull(remoteEndpoint, "Remote endpoint");
136         if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
137             throw new IOReactorShutdownException("I/O reactor has been shut down");
138         }
139         final int i = Math.abs(currentWorker.incrementAndGet() % workerCount);
140         try {
141             return dispatchers[i].connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback);
142         } catch (final IOReactorShutdownException ex) {
143             initiateShutdown();
144             throw ex;
145         }
146     }
147 
148     @Override
149     public void initiateShutdown() {
150         ioReactor.initiateShutdown();
151     }
152 
153     @Override
154     public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
155         ioReactor.awaitShutdown(waitTime);
156     }
157 
158     @Override
159     public void shutdown(final ShutdownType shutdownType) {
160         ioReactor.shutdown(shutdownType);
161     }
162 
163     @Override
164     public void close() throws IOException {
165         ioReactor.close();
166     }
167 
168 }