View Javadoc
1   /*
2    * Copyright 2023 Adrian Herscu
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  // source
17  package;
19  import java.sql.*;
20  import*;
22  import javax.sql.*;
24  import org.apache.commons.dbutils.*;
26  import*;
27  import lombok.*;
29  /**
30   * An extension of the Apache DbUtils QueryRunner that adds methods to produce
31   * Streams where each element of the stream is constructed from a row in a
32   * ResultSet. This takes advantage of database cursors (assuming the underlying
33   * JDBC ResultSet does) so the entire query result does not have to be read into
34   * memory.
35   */
36  public class StreamingQueryRunner extends QueryRunner {
37      public StreamingQueryRunner(DataSource dataSource) {
38          super(dataSource);
39      }
41      private static void closeUnchecked(AutoCloseable closeable) {
42          try {
43              closeable.close();
44          } catch (RuntimeException | Error e) {
45              throw e;
46          } catch (Exception e) {
47              throw new RuntimeException(e);
48          }
49      }
51      /**
52       * Executes a Query and returns a Stream in which each element represents a
53       * row in the result. (This method cannot be named "query" because it would
54       * hide the corresponding methods in the superclass.)
55       *
56       * @param sql
57       *            The SQL query to execute
58       * @param handler
59       *            The ResultSetHandler that converts the ResultSet to a Stream
60       * @param args
61       *            The arguments to pass to the query as prepared statement
62       *            parameters
63       */
64      public <T> Stream<T> queryStream(String sql,
65          StreamingResultSetHandler<T> handler, Object... args)
66          throws SQLException {
67          // We cannot use try-with-resources: if there is no exception the
68          // Connection, PreparedStatement,
69          // and ResultSet must remain open.
70          Connection connection = getDataSource().getConnection();
71          try {
72              return query(connection, true, sql, handler, args);
73          } catch (SQLException | RuntimeException | Error e) {
74              closeUnchecked(connection);
75              throw e;
76          }
77      }
79      public Stream<Object[]> queryStream(String sql, Object... args)
80          throws SQLException {
81          return queryStream(sql, new ArrayStreamingHandler(), args);
82      }
84      /**
85       * Executes a Query and returns a Stream in which each element represents a
86       * row in the result. (This method cannot be named "query" because it would
87       * hide the corresponding methods in the superclass.)
88       *
89       * @param connection
90       *            The database Connection to use
91       * @param sql
92       *            The SQL query to execute
93       * @param handler
94       *            The ResultSetHandler that converts the ResultSet to a Stream
95       * @param args
96       *            The arguments to pass to the query as prepared statement
97       *            parameters
98       */
99      public <T> Stream<T> queryStream(
100         Connection connection, String sql, StreamingResultSetHandler<T> handler,
101         Object... args)
102         throws SQLException {
103         return query(connection, false, sql, handler, args);
104     }
106     @Override
107     @SneakyThrows
108     public String toString() {
109         return getDataSource().getConnection().getMetaData().getURL();
110     }
112     /**
113      * Executes a Query and returns a Stream in which each element represents a
114      * row in the result.
115      *
116      * @param connection
117      *            The database Connection to use
118      * @param closeConnection
119      *            Whether or not the connection should be closed when the stream
120      *            is closed
121      * @param sql
122      *            The SQL query to execute
123      * @param handler
124      *            The ResultSetHandler that converts the ResultSet to a Stream
125      * @param args
126      *            The arguments to pass to the query as prepared statement
127      *            parameters
128      */
129     private <T> Stream<T> query(
130         Connection connection,
131         boolean closeConnection,
132         String sql,
133         StreamingResultSetHandler<T> handler,
134         Object... args)
135         throws SQLException {
136         // We cannot use try-with-resources: if there is no exception the
137         // PreparedStatement
138         // and ResultSet must remain open.
139         PreparedStatement statement = connection.prepareStatement(sql);
140         try {
141             fillStatement(statement, args);
142             ResultSet resultSet = statement.executeQuery();
143             Stream<T> stream = handler.handle(resultSet)
144                 .onClose(() -> closeUnchecked(resultSet))
145                 .onClose(() -> closeUnchecked(statement));
146             if (closeConnection) {
147                 return stream.onClose(() -> closeUnchecked(connection));
148             }
149             return stream;
150         } catch (SQLException | RuntimeException | Error e) {
151             closeUnchecked(statement);
152             throw e;
153         }
154     }
155 }