1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.net.SocketAddress;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import org.apache.hc.core5.concurrent.FutureCallback;
38 import org.apache.hc.core5.function.Supplier;
39 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
40 import org.apache.hc.core5.io.CloseMode;
41 import org.apache.hc.core5.net.NamedEndpoint;
42 import org.apache.hc.core5.reactor.ConnectionInitiator;
43 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
44 import org.apache.hc.core5.reactor.IOReactorStatus;
45 import org.apache.hc.core5.reactor.IOSession;
46 import org.apache.hc.core5.util.TimeValue;
47 import org.apache.hc.core5.util.Timeout;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 abstract class AbstractHttpAsyncClientBase extends CloseableHttpAsyncClient implements ConnectionInitiator {
52
53 enum Status { READY, RUNNING, TERMINATED }
54
55 private static final Logger LOG = LoggerFactory.getLogger(AbstractHttpAsyncClientBase.class);
56
57 private final AsyncPushConsumerRegistry pushConsumerRegistry;
58 private final DefaultConnectingIOReactor ioReactor;
59 private final ExecutorService executorService;
60 private final AtomicReference<Status> status;
61
62 AbstractHttpAsyncClientBase(
63 final DefaultConnectingIOReactor ioReactor,
64 final AsyncPushConsumerRegistry pushConsumerRegistry,
65 final ThreadFactory threadFactory) {
66 super();
67 this.ioReactor = ioReactor;
68 this.pushConsumerRegistry = pushConsumerRegistry;
69 this.executorService = Executors.newSingleThreadExecutor(threadFactory);
70 this.status = new AtomicReference<>(Status.READY);
71 }
72
73 @Override
74 public final void start() {
75 if (status.compareAndSet(Status.READY, Status.RUNNING)) {
76 executorService.execute(ioReactor::start);
77 }
78 }
79
80
81
82
83
84 @Deprecated
85 @Override
86 public void register(final String hostname, final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
87 pushConsumerRegistry.register(hostname, uriPattern, supplier);
88 }
89
90 boolean isRunning() {
91 return status.get() == Status.RUNNING;
92 }
93
94 @Override
95 public Future<IOSession> connect(
96 final NamedEndpoint remoteEndpoint,
97 final SocketAddress remoteAddress,
98 final SocketAddress localAddress,
99 final Timeout timeout,
100 final Object attachment,
101 final FutureCallback<IOSession> callback) {
102 return ioReactor.connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback);
103 }
104
105 ConnectionInitiator getConnectionInitiator() {
106 return ioReactor;
107 }
108
109 @Override
110 public final IOReactorStatus getStatus() {
111 return ioReactor.getStatus();
112 }
113
114 @Override
115 public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
116 ioReactor.awaitShutdown(waitTime);
117 }
118
119 @Override
120 public final void initiateShutdown() {
121 if (LOG.isDebugEnabled()) {
122 LOG.debug("Initiating shutdown");
123 }
124 ioReactor.initiateShutdown();
125 }
126
127 void internalClose(final CloseMode closeMode) {
128 }
129
130 @Override
131 public final void close(final CloseMode closeMode) {
132 if (LOG.isDebugEnabled()) {
133 LOG.debug("Shutdown {}", closeMode);
134 }
135 ioReactor.initiateShutdown();
136 ioReactor.close(closeMode);
137 if (closeMode == CloseMode.GRACEFUL) {
138 executorService.shutdown();
139 try {
140 if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
141 executorService.shutdownNow();
142 }
143 } catch (final InterruptedException ignore) {
144 Thread.currentThread().interrupt();
145 }
146 } else {
147 executorService.shutdownNow();
148 }
149 internalClose(closeMode);
150 }
151
152 @Override
153 public void close() {
154 close(CloseMode.GRACEFUL);
155 }
156
157 }