1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.examples;
28
29 import java.net.URI;
30 import java.nio.ByteBuffer;
31 import java.nio.charset.StandardCharsets;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
36 import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient;
37 import org.apache.hc.core5.http.ContentType;
38 import org.apache.hc.core5.http.Header;
39 import org.apache.hc.core5.http.HttpResponse;
40 import org.apache.hc.core5.http.Message;
41 import org.apache.hc.core5.http.config.Http1Config;
42 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
43 import org.apache.hc.core5.http2.config.H2Config;
44 import org.apache.hc.core5.io.CloseMode;
45 import org.apache.hc.core5.reactive.ReactiveEntityProducer;
46 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
47 import org.apache.hc.core5.reactor.IOReactorConfig;
48 import org.reactivestreams.Publisher;
49
50 import io.reactivex.Flowable;
51 import io.reactivex.Observable;
52
53
54
55
56 public class ReactiveClientFullDuplexExchange {
57
58 public static void main(final String[] args) throws Exception {
59
60 final MinimalHttpAsyncClient client = HttpAsyncClients.createMinimal(
61 H2Config.DEFAULT,
62 Http1Config.DEFAULT,
63 IOReactorConfig.DEFAULT);
64
65 client.start();
66
67 final URI requestUri = new URI("http://httpbin.org/post");
68 final byte[] bs = "stuff".getBytes(StandardCharsets.UTF_8);
69 final ReactiveEntityProducer reactiveEntityProducer = new ReactiveEntityProducer(
70 Flowable.just(ByteBuffer.wrap(bs)), bs.length, ContentType.TEXT_PLAIN, null);
71 final BasicRequestProducer requestProducer = new BasicRequestProducer(
72 "POST", requestUri, reactiveEntityProducer);
73
74 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
75 final Future<Void> requestFuture = client.execute(requestProducer, consumer, null);
76 final Message<HttpResponse, Publisher<ByteBuffer>> streamingResponse = consumer.getResponseFuture().get();
77
78 System.out.println(streamingResponse.getHead());
79 for (final Header header : streamingResponse.getHead().getHeaders()) {
80 System.out.println(header);
81 }
82 System.out.println();
83
84 Observable.fromPublisher(streamingResponse.getBody())
85 .map(byteBuffer -> {
86 final byte[] string = new byte[byteBuffer.remaining()];
87 byteBuffer.get(string);
88 return new String(string);
89 })
90 .materialize()
91 .forEach(System.out::println);
92
93 requestFuture.get(1, TimeUnit.MINUTES);
94
95 System.out.println("Shutting down");
96 client.close(CloseMode.GRACEFUL);
97 }
98 }