1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package dev.aherscu.qa.jgiven.rabbitmq.utils;
18
19 import static com.google.common.base.Suppliers.*;
20 import static java.lang.Runtime.*;
21 import static java.util.Collections.*;
22 import static java.util.Objects.*;
23
24 import java.io.*;
25 import java.util.*;
26 import java.util.concurrent.*;
27 import java.util.function.*;
28 import java.util.stream.*;
29
30 import org.apache.commons.lang3.*;
31 import org.jooq.lambda.*;
32
33 import com.rabbitmq.client.*;
34 import com.rabbitmq.client.impl.*;
35
36 import dev.aherscu.qa.jgiven.rabbitmq.model.*;
37 import lombok.*;
38 import lombok.experimental.*;
39 import lombok.extern.slf4j.*;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 @SuperBuilder
69 @Slf4j
70 public class QueueHandler<K, V> implements AutoCloseable {
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 @Builder
115 public static final class Singleton<K, V> {
116 public final ConnectionFactory connectionFactory;
117 public final String queue;
118 public final Function<Message<V>, K> indexingBy;
119 public final Function<byte[], V> consumingBy;
120 public final Function<V, byte[]> publishingBy;
121
122
123
124
125 public Supplier<QueueHandler<K, V>> supplier() {
126 return memoize(() -> {
127 final Connection connection;
128 final QueueHandler<K, V> queueHandler;
129 try {
130 log.debug("creating connection");
131 connection = connectionFactory.newConnection();
132 log.debug("setting-up queue handler");
133 queueHandler = QueueHandler.<K, V> builder()
134 .channel(connection.createChannel())
135 .queue(queue)
136 .indexingBy(indexingBy)
137 .consumingBy(consumingBy)
138 .publishingBy(publishingBy)
139 .build();
140 } catch (final IOException | TimeoutException e) {
141 throw new RuntimeException(e);
142 }
143 getRuntime().addShutdownHook(new Thread(() -> {
144
145
146
147
148
149 try {
150 log.debug("closing connection");
151 connection.close();
152 } catch (final IOException ioe) {
153 log.warn("connection already closed {}",
154 ioe.toString());
155 }
156 }));
157 return queueHandler;
158 });
159 }
160 }
161
162 public final Channel channel;
163 public final String queue;
164 public final Function<Message<V>, K> indexingBy;
165 public final Function<byte[], V> consumingBy;
166 public final Function<V, byte[]> publishingBy;
167
168
169 private final ConcurrentMap<K, Message<V>> recievedMessages =
170 new ConcurrentHashMap<>();
171 private String consumerTag;
172
173
174
175
176
177
178
179
180
181 @SneakyThrows
182 public static ConnectionFactory connectionFactoryFrom(final String uri) {
183 val connectionFactory = new ConnectionFactory();
184 connectionFactory.setUri(uri);
185 connectionFactory.setExceptionHandler(new ForgivingExceptionHandler());
186 return connectionFactory;
187 }
188
189
190
191
192
193
194
195
196
197 public String cancel() throws IOException {
198 if (isNull(consumerTag)) {
199 throw new IOException("consumer not started");
200 }
201 log.debug("cancelling consumer by tag {}", consumerTag);
202 channel.basicCancel(consumerTag);
203 return consumerTag;
204 }
205
206 @Override
207 public void close() throws IOException {
208 try {
209 cancel();
210 } catch (final Exception e) {
211 log.error("while closing got {}", e.getMessage());
212 throw e;
213 }
214 }
215
216
217
218
219
220
221
222
223
224 @SneakyThrows
225 public String consume() {
226 channel.basicQos(16);
227 consumerTag = channel.basicConsume(queue,
228 new DefaultConsumer(channel) {
229 @Override
230 public void handleDelivery(
231 @SuppressWarnings("hiding") final String consumerTag,
232 final Envelope envelope,
233 final AMQP.BasicProperties properties, final byte[] body)
234 throws IOException {
235 final K key;
236 final Message<V> message;
237 try {
238 message = Message.<V> builder()
239 .content(consumingBy.apply(body))
240 .properties(properties)
241 .build();
242 key = indexingBy.apply(message);
243 log.trace("received {}", key);
244 } catch (final Exception e) {
245 log.warn("skipping unknown type {}", e.getMessage());
246 channel.basicReject(envelope.getDeliveryTag(), true);
247 return;
248 }
249 recievedMessages.put(key, message);
250 channel.basicAck(envelope.getDeliveryTag(), false);
251 }
252 });
253
254 log.debug("set-up consumer with tag {}", consumerTag);
255 return consumerTag;
256 }
257
258 public void publish(final Stream<Message<V>> messages) {
259 messages.parallel()
260 .peek(message -> log.trace("publishing {}", message))
261 .forEach(Unchecked.consumer(
262 message -> channel.basicPublish(StringUtils.EMPTY, queue,
263 message.properties, publishingBy.apply(message.content))));
264 }
265
266 public void publishValues(final Stream<V> values) {
267 publish(values
268 .map(value -> Message.<V> builder().content(value).build()));
269 }
270
271
272
273
274
275
276 public Map<K, Message<V>> recievedMessages() {
277 return unmodifiableMap(recievedMessages);
278 }
279 }