Lesson Learned #443:Improve Application Resilience:Connection,Execution,and ResultSet Retry Policies

  • Thread starter Thread starter Jose_Manuel_Jurado
  • Start date Start date
J

Jose_Manuel_Jurado

We worked on a scenario where a customer is connected to a database with billions of records, processing data in batches. After reading and processing each batch, the connection is left idle for some time before moving on to the next group of rows. However, this approach sometimes resulted in random connection closures due to various unforeseen circumstances. To address these issues, we will demonstrate different methods to prevent such incidents and improve data processing.



Section 1: Connection Retry Policy

Maintaining a reliable database connection can be challenging. To address this issue, a Connection Retry Policy is indispensable. This policy ensures that your application makes repeated attempts to establish a stable database connection.



Implementation in Java:



Below is an example of implementing a Connection Retry Policy in Java.





private static Connection connectToDatabase(String url, String username, String password) {
Connection connection = null;
int retryCount = 3;

while (retryCount > 0) {
try {
System.out.println("-- Connecting to " + url);
connection = DriverManager.getConnection(url, username, password);
break;
} catch (SQLException e) {
System.err.println("Failed to connect to the database. Retrying...");
e.printStackTrace();
retryCount--;

if (retryCount == 0) {
System.err.println("Unable to connect to the database after multiple attempts.");
return null;
}
}
try {
Thread.sleep(5000); // Wait for 5 seconds before retrying
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return connection;
}






This code snippet demonstrates a practical approach to implementing a Connection Retry Policy in Java. By allowing multiple connection attempts, your application becomes more resilient in unstable network conditions.



Section 2: Execution Retry Policy

Query executions may fail due to various temporary issues. To ensure execution reliability, it's crucial to implement an Execution Retry Policy. This policy enables your application to retry query executions until success is achieved.



Implementation in Java:



Below is an example of implementing an Execution Retry Policy in Java.





private static ResultSet executeQueryWithRetries(Statement st, String query, int initialQueryTimeout) {
int retryCount = 3;
int queryTimeout = initialQueryTimeout;
ResultSet rs = null;

while (retryCount > 0) {
try {
System.out.println("Executing the query " + query + " with timeout " + queryTimeout + " Attempt " + retryCount);
st.setQueryTimeout(queryTimeout);
rs = st.executeQuery(query);
System.out.println("Executed the query " + query + " with timeout " + queryTimeout + " Attempt " + retryCount);
break;
} catch (SQLException e) {
System.err.println("Failed to execute query. Retrying...");
e.printStackTrace();
retryCount--;
queryTimeout += 30;

if (retryCount == 0) {
System.err.println("Unable to execute query after multiple attempts.");
return null;
}
}
}
return rs;
}






This code showcases the implementation of an Execution Retry Policy. By retrying query executions, your application can gracefully handle temporary issues and ensure reliable data retrieval.



Section 3: ResultSet Retry Policy and OFFSET usage



Processing ResultSets efficiently is vital, especially for Very Large Databases (VLDBs). To handle ResultSet processing effectively, implement a ResultSet Retry Policy. Additionally, consider using OFFSET to process data in manageable chunks without keeping a connection in an idle state.



Implementation in Java: Below is an example of implementing a ResultSet Retry Policy with OFFSET in Java.





/**
* Processes data in pages with retry support.
*
* connection The database connection.
* initialQueryTimeout The initial query timeout in seconds.
* iFetchSize The fetch size for the result set.
* sSQL The SQL query to execute.
* iPageSize The page size for data processing.
* iHowManySeconds The delay in seconds between pages.
* sSchema The schema of the table.
* sTable The name of the table.
*/
private static void processPages(Connection connection, int initialQueryTimeout, int iFetchSize, String sSQL, int iPageSize, int iHowManySeconds, String sSchema, String sTable) {
int rowCount = getRowCount(connection, sSchema, sTable, initialQueryTimeout);
if (rowCount > 0) {
int pageSize = rowCount / iPageSize;

for (int page = 0; page < iPageSize; page++) {
int offset = page * pageSize;
boolean success = processResultSetGroup(connection, initialQueryTimeout, iFetchSize, sSQL, offset, pageSize, iHowManySeconds);

if (!success) {
System.err.println("Failed to execute query for page " + page + " after multiple attempts.");
System.err.println("Reconnecting and retrying...");
connection = connectToDatabase(URL, USERNAME, PASSWORD);
if (connection == null) {
System.err.println("Unable to reconnect to the database. Exiting.");
break;
}
// Reset the counter
page--;
}
}
} else {
System.err.println("No rows found in the query.");
}
System.out.println("Finished");
}











/**
* Processes a result set in chunks with a delay between chunks.
*
* rs The result set to process.
* iChunk The chunk size for processing.
* iHowManySeconds The delay in seconds between chunks.
* @throws SQLException If a database error occurs.
*/
private static boolean processResultSet(ResultSet rs,int iChunk, int iHowManySeconds) throws SQLException {
int count = 0;
ResultSetMetaData rsmd = rs.getMetaData();
int columnCount = rsmd.getColumnCount();

try {
while (rs != null && rs.next()) {
count++;
for (int i = 1; i <= columnCount; i++) {
System.out.print(rs.getString(i) + " ");
}
System.out.println(" ");

if (count % iChunk == 0) {
System.out.println("Processed batch of " + iChunk + " records " + count);
ShowDateNow(iHowManySeconds, "Next execution: ");
Thread.sleep(iHowManySeconds * 1000);
}
}
return true; // Indicar que se completó con éxito
} catch (InterruptedException e) {
e.printStackTrace();
return false; // Indicar que hubo un error
}
}





his code demonstrates how to efficiently process ResultSets with a ResultSet Retry Policy. Utilizing OFFSET ensures that large databases are processed in manageable chunks, preventing idle connections and maintaining optimal performance.



Section 4: Conclusion



In conclusion, implementing robust retry policies for database connections, query executions, and ResultSet processing is essential for building resilient database applications. These policies enhance application stability in the face of connectivity and execution challenges. By following these principles and practices, developers can create more reliable and resilient database applications.



Below, you have the complete code for this process for reference and as an example for customization by the developer. It also incorporates improvements discussed in various articles, such as the timeout in the connection string and specifying the application name during connection to provide more details, etc







package testconnectionms;

import java.sql.*;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class SQLTestECB
{
static {
try {
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
} catch (Exception ex) {
System.err.println("Unable to load JDBC driver");
ex.printStackTrace();
System.exit(1);
}
}

private static final String USERNAME = "UserName";
private static final String PASSWORD = "Password!";
private static final String APPLICATION_NAME = "RobustTest";
private static final String URL = "jdbc:sqlserver://servername.database.windows.net;database=dotnetexample;sslProtocol=TLSv1.2;loginTimeout=30;PacketSize=8096;connectionTimeout=30;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;applicationName="+APPLICATION_NAME;

private static final String EXECUTION_TYPE = "BLOCK";

public static void main(String[] args)
throws SQLException
{

System.out.print("\033[H\033[2J");
System.out.flush();

Connection connection = connectToDatabase(URL, USERNAME, PASSWORD);
if (connection == null) {
System.exit(1);
}

if( EXECUTION_TYPE == "BLOCK")
{
processPages(connection, 20, 100, "select * from PerformanceVarcharNVarchar",1000,60*2,"dbo", "PerformanceVarcharNVarchar");
}
else
{
performQuery(connection, 20, 1000000, "select * from PerformanceVarcharNVarchar",100,60*2);

}

}
/**
* Connects to the database with retry support.
*
* url The database connection URL.
* username The username for authentication.
* password The password for authentication.
* @return A database connection if successful, otherwise null.
*/

private static Connection connectToDatabase(String url, String username, String password) {
Connection connection = null;
int retryCount = 3;

while (retryCount > 0) {
try {
System.out.println("-- Connecting to " + url);
connection = DriverManager.getConnection(url, username, password);
break;
} catch (SQLException e) {
System.err.println("Failed to connect to database. Retrying...");
e.printStackTrace();
retryCount--;
if (retryCount == 0) {
System.err.println("Unable to connect to database after multiple attempts.");
return null;
}
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return connection;
}
/**
* Executes a SQL query with retries.
*
* st The statement used to execute the query.
* query The SQL query to execute.
* initialQueryTimeout The initial query timeout in seconds.
* @return A ResultSet containing the query results if successful, otherwise null.
*/
private static ResultSet executeQueryWithRetries(Statement st, String query, int initialQueryTimeout) {
int retryCount = 3;
int queryTimeout = initialQueryTimeout;
ResultSet rs = null;

while (retryCount > 0) {
try {
System.out.println("Executing the query " + query + " with timeout " + queryTimeout + " Attempt " + retryCount);
st.setQueryTimeout(queryTimeout);
rs = st.executeQuery(query);
System.out.println("Executed the query " + query + " with timeout " + queryTimeout + " Attempt " + retryCount);
break;
} catch (SQLException e) {
System.err.println("Failed to execute query. Retrying...");
e.printStackTrace();
retryCount--;
queryTimeout += 30;

if (retryCount == 0) {
System.err.println("Unable to execute query after multiple attempts.");
return null;
}
}
}

return rs;
}
/**
* Performs a query and processes the result set in chunks.
*
* connection The database connection.
* initialQueryTimeout The initial query timeout in seconds.
* iFetchSize The fetch size for the result set.
* sSQL The SQL query to execute.
* iChunk The chunk size for processing.
* iHowManySeconds The delay in seconds between chunks.
*/
private static void performQuery(Connection connection, int initialQueryTimeout, int iFetchSize, String sSQL, int iChunk, int iHowManySeconds) {
try (Statement st = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY)) {
st.setFetchSize(iFetchSize);
ResultSet rs = executeQueryWithRetries(st, sSQL, initialQueryTimeout);
if (rs != null) {
processResultSet(rs, iChunk, iHowManySeconds);
rs.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
System.out.println("Finished");
}
/**
* Processes a result set in chunks with a delay between chunks.
*
* rs The result set to process.
* iChunk The chunk size for processing.
* iHowManySeconds The delay in seconds between chunks.
* @throws SQLException If a database error occurs.
*/
private static boolean processResultSet(ResultSet rs,int iChunk, int iHowManySeconds) throws SQLException {
int count = 0;
ResultSetMetaData rsmd = rs.getMetaData();
int columnCount = rsmd.getColumnCount();

try {
while (rs != null && rs.next()) {
count++;
for (int i = 1; i <= columnCount; i++) {
System.out.print(rs.getString(i) + " ");
}
System.out.println(" ");

if (count % iChunk == 0) {
System.out.println("Processed batch of " + iChunk + " records " + count);
ShowDateNow(iHowManySeconds, "Next execution: ");
Thread.sleep(iHowManySeconds * 1000);
}
}
return true; // Indicar que se completó con éxito
} catch (InterruptedException e) {
e.printStackTrace();
return false; // Indicar que hubo un error
}
}

/**
* Processes data in pages with retry support.
*
* connection The database connection.
* initialQueryTimeout The initial query timeout in seconds.
* iFetchSize The fetch size for the result set.
* sSQL The SQL query to execute.
* iPageSize The page size for data processing.
* iHowManySeconds The delay in seconds between pages.
* sSchema The schema of the table.
* sTable The name of the table.
*/
private static void processPages(Connection connection, int initialQueryTimeout, int iFetchSize, String sSQL, int iPageSize, int iHowManySeconds, String sSchema, String sTable) {
int rowCount = getRowCount(connection, sSchema, sTable, initialQueryTimeout);
if (rowCount > 0) {
int pageSize = rowCount / iPageSize;

for (int page = 0; page < iPageSize; page++) {
int offset = page * pageSize;
boolean success = processResultSetGroup(connection, initialQueryTimeout, iFetchSize, sSQL, offset, pageSize, iHowManySeconds);

if (!success) {
System.err.println("Failed to execute query for page " + page + " after multiple attempts.");
System.err.println("Reconnecting and retrying...");
connection = connectToDatabase(URL, USERNAME, PASSWORD);
if (connection == null) {
System.err.println("Unable to reconnect to the database. Exiting.");
break;
}
// Reset the counter
page--;
}
}
} else {
System.err.println("No rows found in the query.");
}
System.out.println("Finished");
}

/**
* Processes a result set in chunks with retry support.
*
* connection The database connection.
* initialQueryTimeout The initial query timeout in seconds.
* iFetchSize The fetch size for the result set.
* sSQL The SQL query to execute.
* offset The offset for paging.
* pageSize The page size for processing.
* iHowManySeconds The delay in seconds between chunks.
* @return True if the page was processed successfully, otherwise false.
*/
private static boolean processResultSetGroup(Connection connection, int initialQueryTimeout, int iFetchSize, String sSQL, int offset, int pageSize, int iHowManySeconds) {
int retries = 3;

while (retries > 0) {
try (Statement st = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY)) {
if( iFetchSize > pageSize)
{ iFetchSize=pageSize;}
st.setFetchSize(iFetchSize);
String paginatedSQL = sSQL + " order by ID OFFSET " + offset + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY";
ResultSet rs = executeQueryWithRetries(st, paginatedSQL, initialQueryTimeout);

if (rs != null) {
boolean success = processResultSet(rs, iFetchSize, iHowManySeconds);
rs.close();
return success;
}
} catch (SQLException e) {
e.printStackTrace();
System.err.println("Error executing query. Retrying...");
retries--;
initialQueryTimeout += 10000; // Aumenta el tiempo de espera en 10 segundos en cada reintento
}
}

return false;
}

/**
* Gets the row count for a table in a specific schema.
*
* connection The database connection.
* sSchema The schema of the table.
* sTable The name of the table.
* initialQueryTimeout The initial query timeout in seconds.
* @return The row count of the table.
*/

private static int getRowCount(Connection connection, String sSchema, String sTable, int initialQueryTimeout) {
int rowCount = 0;
String sSQL = "SELECT max(p.rows) AS RowCounts "+
"FROM sys.tables t "+
"INNER JOIN sys.indexes i ON t.OBJECT_ID = i.object_id "+
"INNER JOIN sys.partitions p ON i.object_id = p.OBJECT_ID AND i.index_id = p.index_id "+
"LEFT OUTER JOIN sys.schemas s ON t.schema_id = s.schema_id "+
"WHERE s.Name = '" + sSchema + "' and t.NAME = '" + sTable + "'";

try (Statement st = connection.createStatement())
{
ResultSet rs = executeQueryWithRetries(st, sSQL, initialQueryTimeout);
if (rs != null) {
rs.next();
rowCount = rs.getInt(1);
rs.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
System.out.println("Number of rows found in " + sSchema + "." + sTable + " are " + rowCount);
return rowCount;
}

/**
* Displays the current date and time with an optional message.
*
* numberSecondsToAdd The number of seconds to add to the current date and time.
* sMsg An optional message to display.
*/
private static void ShowDateNow(long numberSecondsToAdd,String sMsg) {
LocalDateTime fechaHoraActual = LocalDateTime.now().plusSeconds(numberSecondsToAdd);
DateTimeFormatter formato = DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss");
String fechaHoraFormateada = fechaHoraActual.format(formato);
System.out.println( sMsg + " " + fechaHoraFormateada);
}
}

Continue reading...
 
Back
Top