View Javadoc
1   /*
2    * Copyright 2023 Adrian Herscu
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package dev.aherscu.qa.jgiven.rabbitmq.utils;
18  
19  import static com.google.common.base.Suppliers.*;
20  import static java.lang.Runtime.*;
21  import static java.util.Collections.*;
22  import static java.util.Objects.*;
23  
24  import java.io.*;
25  import java.util.*;
26  import java.util.concurrent.*;
27  import java.util.function.*;
28  import java.util.stream.*;
29  
30  import org.apache.commons.lang3.*;
31  import org.jooq.lambda.*;
32  
33  import com.rabbitmq.client.*;
34  import com.rabbitmq.client.impl.*;
35  
36  import dev.aherscu.qa.jgiven.rabbitmq.model.*;
37  import lombok.*;
38  import lombok.experimental.*;
39  import lombok.extern.slf4j.*;
40  
41  /**
42   * Retrieves RabbitMQ messages from specified queue via provided channel.
43   * Messages are converted by specified value function and indexed by specified
44   * key function.
45   *
46   * <p>
47   * Messages are consumed in background threads and made available for retrieval
48   * by their key according to a specified retry policy. If a retry policy is not
49   * specified, then an internal default will be used.
50   * </p>
51   *
52   * <p>
53   * Typical workflow:
54   * </p>
55   * <ol>
56   * <li>build it</li>
57   * <li>start consumption -- this will run in background until closing</li>
58   * <li>retrieve messages</li>
59   * </ol>
60   *
61   *
62   * @param <K>
63   *            type of message-key; it should have a proper hash function in
64   *            order to get O(1) access time, otherwise it may degrade to O(n)
65   * @param <V>
66   *            type of message-value
67   */
68  @SuperBuilder
69  @Slf4j
70  public class QueueHandler<K, V> implements AutoCloseable {
71      /**
72       * Utility for building a singleton QueueHandler.
73       *
74       * <p>
75       * Build it:
76       *
77       * <pre>
78       * private final Supplier<QueueHandler<YourKeyType, YourContentType>> queueHandlerSupplier =
79       *     QueueHandler.Singleton.<UUID, RetrieveTransactionResponse> builder()
80       *         .queue(queueName())
81       *         .connectionFactory(amqpConnectionFactory())
82       *         .indexingBy(message -> message.content.yourKey)
83       *         .consumingBy(
84       *             Unchecked.function(YourContentType::deserializeFromBytes))
85       *         .publishingBy(YourContentType::serializeToBytes)
86       *         .build()
87       *         .supplier();
88       * </pre>
89       *
90       * and, later:
91       *
92       * <pre>
93       * public QueueHandler<YourKeyType, YourContentType> queueHandler() {
94       *     return queueHandlerSupplier.get();
95       * }
96       * </pre>
97       *
98       * It is guaranteed that calling {@code queueHandler()} <strong>from any
99       * thread</strong>, will supply the same QueueHandler instance.
100      * </p>
101      *
102      * <p>
103      * The connection is internally managed and closed via a runtime shutdown
104      * hook.
105      * </p>
106      *
107      * @param <K>
108      *            type of message-key; it should have a proper hash function in
109      *            order to get O(1) access time, otherwise it may degrade to
110      *            O(n)
111      * @param <V>
112      *            type of message-value
113      */
114     @Builder
115     public static final class Singleton<K, V> {
116         public final ConnectionFactory       connectionFactory;
117         public final String                  queue;
118         public final Function<Message<V>, K> indexingBy;
119         public final Function<byte[], V>     consumingBy;
120         public final Function<V, byte[]>     publishingBy;
121 
122         /**
123          * @return a memoized supplier of QueueHandler
124          */
125         public Supplier<QueueHandler<K, V>> supplier() {
126             return memoize(() -> {
127                 final Connection connection;
128                 final QueueHandler<K, V> queueHandler;
129                 try {
130                     log.debug("creating connection");
131                     connection = connectionFactory.newConnection();
132                     log.debug("setting-up queue handler");
133                     queueHandler = QueueHandler.<K, V> builder()
134                         .channel(connection.createChannel())
135                         .queue(queue)
136                         .indexingBy(indexingBy)
137                         .consumingBy(consumingBy)
138                         .publishingBy(publishingBy)
139                         .build();
140                 } catch (final IOException | TimeoutException e) {
141                     throw new RuntimeException(e);
142                 }
143                 getRuntime().addShutdownHook(new Thread(() -> {
144                     // NOTE: apparently the only way to purge unacked messages
145                     // is
146                     // via management interface
147                     // see
148                     // https://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.purge
149                     try {
150                         log.debug("closing connection");
151                         connection.close();
152                     } catch (final IOException ioe) {
153                         log.warn("connection already closed {}",
154                             ioe.toString());
155                     }
156                 }));
157                 return queueHandler;
158             });
159         }
160     }
161 
162     public final Channel                       channel;
163     public final String                        queue;
164     public final Function<Message<V>, K>       indexingBy;
165     public final Function<byte[], V>           consumingBy;
166     public final Function<V, byte[]>           publishingBy;
167 
168     // TODO allow subclasses to provide their own data-structure, e.g. a cache
169     private final ConcurrentMap<K, Message<V>> recievedMessages =
170         new ConcurrentHashMap<>();
171     private String                             consumerTag;
172 
173     /**
174      * @param uri
175      *            an amqp or amqps uri; e.g.,
176      *            amqps://username:password@host/virtual-host
177      * @return a connection factory
178      * @throws RuntimeException
179      *             upon malformed uri
180      */
181     @SneakyThrows
182     public static ConnectionFactory connectionFactoryFrom(final String uri) {
183         val connectionFactory = new ConnectionFactory();
184         connectionFactory.setUri(uri);
185         connectionFactory.setExceptionHandler(new ForgivingExceptionHandler());
186         return connectionFactory;
187     }
188 
189     /**
190      * Cancels the consumption, previously started by {@link #consume()}. Must
191      * be called on same thread as last {@link #consume()} call.
192      *
193      * @throws IOException
194      *             upon connection failures, or if consumer was not started
195      * @return the consumer tag
196      */
197     public String cancel() throws IOException {
198         if (isNull(consumerTag)) {
199             throw new IOException("consumer not started");
200         }
201         log.debug("cancelling consumer by tag {}", consumerTag);
202         channel.basicCancel(consumerTag);
203         return consumerTag;
204     }
205 
206     @Override
207     public void close() throws IOException {
208         try {
209             cancel();
210         } catch (final Exception e) {
211             log.error("while closing got {}", e.getMessage());
212             throw e;
213         }
214     }
215 
216     /**
217      * Starts the message consumption process and returns immediately.
218      *
219      * @return consumer tag
220      * @throws RuntimeException
221      *             upon connection failures
222      * @see #cancel() for canceling the consumption
223      */
224     @SneakyThrows
225     public String consume() {
226         channel.basicQos(16);
227         consumerTag = channel.basicConsume(queue,
228             new DefaultConsumer(channel) {
229                 @Override
230                 public void handleDelivery(
231                     @SuppressWarnings("hiding") final String consumerTag,
232                     final Envelope envelope,
233                     final AMQP.BasicProperties properties, final byte[] body)
234                     throws IOException {
235                     final K key;
236                     final Message<V> message;
237                     try {
238                         message = Message.<V> builder()
239                             .content(consumingBy.apply(body))
240                             .properties(properties)
241                             .build();
242                         key = indexingBy.apply(message);
243                         log.trace("received {}", key);
244                     } catch (final Exception e) {
245                         log.warn("skipping unknown type {}", e.getMessage());
246                         channel.basicReject(envelope.getDeliveryTag(), true);
247                         return;
248                     }
249                     recievedMessages.put(key, message);
250                     channel.basicAck(envelope.getDeliveryTag(), false);
251                 }
252             });
253 
254         log.debug("set-up consumer with tag {}", consumerTag);
255         return consumerTag;
256     }
257 
258     public void publish(final Stream<Message<V>> messages) {
259         messages.parallel()
260             .peek(message -> log.trace("publishing {}", message))
261             .forEach(Unchecked.consumer(
262                 message -> channel.basicPublish(StringUtils.EMPTY, queue,
263                     message.properties, publishingBy.apply(message.content))));
264     }
265 
266     public void publishValues(final Stream<V> values) {
267         publish(values
268             .map(value -> Message.<V> builder().content(value).build()));
269     }
270 
271     /**
272      * Unmodifiable view of retrieved messages.
273      *
274      * @return the messages
275      */
276     public Map<K, Message<V>> recievedMessages() {
277         return unmodifiableMap(recievedMessages);
278     }
279 }