1 /* 2 * Copyright 2023 Adrian Herscu 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 // source https://github.com/rhuffman/dbstream 17 package dev.aherscu.qa.jgiven.jdbc.utils.dbutils; 18 19 import java.sql.*; 20 import java.util.stream.*; 21 22 import javax.sql.*; 23 24 import org.apache.commons.dbutils.*; 25 26 import jakarta.ws.rs.core.*; 27 import lombok.*; 28 29 /** 30 * An extension of the Apache DbUtils QueryRunner that adds methods to produce 31 * Streams where each element of the stream is constructed from a row in a 32 * ResultSet. This takes advantage of database cursors (assuming the underlying 33 * JDBC ResultSet does) so the entire query result does not have to be read into 34 * memory. 35 */ 36 public class StreamingQueryRunner extends QueryRunner { 37 public StreamingQueryRunner(DataSource dataSource) { 38 super(dataSource); 39 } 40 41 private static void closeUnchecked(AutoCloseable closeable) { 42 try { 43 closeable.close(); 44 } catch (RuntimeException | Error e) { 45 throw e; 46 } catch (Exception e) { 47 throw new RuntimeException(e); 48 } 49 } 50 51 /** 52 * Executes a Query and returns a Stream in which each element represents a 53 * row in the result. (This method cannot be named "query" because it would 54 * hide the corresponding methods in the superclass.) 55 * 56 * @param sql 57 * The SQL query to execute 58 * @param handler 59 * The ResultSetHandler that converts the ResultSet to a Stream 60 * @param args 61 * The arguments to pass to the query as prepared statement 62 * parameters 63 */ 64 public <T> Stream<T> queryStream(String sql, 65 StreamingResultSetHandler<T> handler, Object... args) 66 throws SQLException { 67 // We cannot use try-with-resources: if there is no exception the 68 // Connection, PreparedStatement, 69 // and ResultSet must remain open. 70 Connection connection = getDataSource().getConnection(); 71 try { 72 return query(connection, true, sql, handler, args); 73 } catch (SQLException | RuntimeException | Error e) { 74 closeUnchecked(connection); 75 throw e; 76 } 77 } 78 79 public Stream<Object[]> queryStream(String sql, Object... args) 80 throws SQLException { 81 return queryStream(sql, new ArrayStreamingHandler(), args); 82 } 83 84 /** 85 * Executes a Query and returns a Stream in which each element represents a 86 * row in the result. (This method cannot be named "query" because it would 87 * hide the corresponding methods in the superclass.) 88 * 89 * @param connection 90 * The database Connection to use 91 * @param sql 92 * The SQL query to execute 93 * @param handler 94 * The ResultSetHandler that converts the ResultSet to a Stream 95 * @param args 96 * The arguments to pass to the query as prepared statement 97 * parameters 98 */ 99 public <T> Stream<T> queryStream( 100 Connection connection, String sql, StreamingResultSetHandler<T> handler, 101 Object... args) 102 throws SQLException { 103 return query(connection, false, sql, handler, args); 104 } 105 106 @Override 107 @SneakyThrows 108 public String toString() { 109 return getDataSource().getConnection().getMetaData().getURL(); 110 } 111 112 /** 113 * Executes a Query and returns a Stream in which each element represents a 114 * row in the result. 115 * 116 * @param connection 117 * The database Connection to use 118 * @param closeConnection 119 * Whether or not the connection should be closed when the stream 120 * is closed 121 * @param sql 122 * The SQL query to execute 123 * @param handler 124 * The ResultSetHandler that converts the ResultSet to a Stream 125 * @param args 126 * The arguments to pass to the query as prepared statement 127 * parameters 128 */ 129 private <T> Stream<T> query( 130 Connection connection, 131 boolean closeConnection, 132 String sql, 133 StreamingResultSetHandler<T> handler, 134 Object... args) 135 throws SQLException { 136 // We cannot use try-with-resources: if there is no exception the 137 // PreparedStatement 138 // and ResultSet must remain open. 139 PreparedStatement statement = connection.prepareStatement(sql); 140 try { 141 fillStatement(statement, args); 142 ResultSet resultSet = statement.executeQuery(); 143 Stream<T> stream = handler.handle(resultSet) 144 .onClose(() -> closeUnchecked(resultSet)) 145 .onClose(() -> closeUnchecked(statement)); 146 if (closeConnection) { 147 return stream.onClose(() -> closeUnchecked(connection)); 148 } 149 return stream; 150 } catch (SQLException | RuntimeException | Error e) { 151 closeUnchecked(statement); 152 throw e; 153 } 154 } 155 }