Lesson Learned #449: Unleashing Concurrent Threads for Robust Database Health Checks in C#

  • Thread starter Thread starter Jose_Manuel_Jurado
  • Start date Start date
J

Jose_Manuel_Jurado

Introduction: In the realm of database health checks, performance and concurrency are paramount. As databases grow and applications scale, the need to efficiently test the health of a database connection becomes crucial. This article takes you through a step-by-step journey of implementing a high-performance, concurrent database health check in C#.



Step 1: Setting Up the Environment Before we dive into the code, let's set up the environment. For this example, we're going to use a Microsoft Azure SQL Database. However, the principles can be applied to any database.



Step 2: Asynchronous Programming in C# Asynchronous programming is a method of executing operations without blocking the main thread. In C#, the async and await keywords are used to define and handle asynchronous tasks. Asynchrony is crucial when dealing with database operations, as it allows us to perform I/O-bound operations without freezing the application.



Step 3: Implementing the Concurrent Threads The following code snippet demonstrates how to create multiple threads to concurrently execute the database health check.



var tasks = new Task[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++)
{
tasks = Task.Run(async () =>
{
await semaphore.WaitAsync();
try
{
await ExecuteQueryAsync(connectionString);
}
finally
{
semaphore.Release();
}
});
}

await Task.WhenAll(tasks);




Step 4: Connecting to the Database with Retries Sometimes, a database connection can fail due to various reasons like network issues or database server overload. Implementing a retry policy ensures that the application attempts to establish a connection multiple times before failing.



static async Task<SqlConnection> ConnectWithRetriesAsync(string connectionString)
{
SqlConnection connection = new SqlConnection(connectionString);
var policy = Policy
.Handle<SqlException>()
.WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(connection.ConnectionTimeout * 1.05),
(exception, timespan, retryCount, context) =>
{
Console.WriteLine($"Retry {retryCount} due to {exception.Message}. Will retry in {timespan.TotalSeconds} seconds.");
});

await policy.ExecuteAsync(async () =>
{
await connection.OpenAsync();
});

return connection;
}




Step 5: Executing the Command with Retries Similar to the connection retry policy, we also need a retry policy for executing the SQL command.



static async Task<object> ExecuteCommandWithRetriesAsync(SqlCommand command)
{
var policy = Policy
.Handle<SqlException>()
.WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(command.CommandTimeout * 1.05),
(exception, timespan, retryCount, context) =>
{
Console.WriteLine($"Retry {retryCount} due to {exception.Message}. Will retry in {timespan.TotalSeconds} seconds.");
});

object result = null;
await policy.ExecuteAsync(async () =>
{
result = await command.ExecuteScalarAsync();
});

return result;
}




In this article, we explored the steps to implement a high-performance, concurrent database health check in C#. Asynchronous programming, along with retry policies, allows us to create robust and efficient health checks that can handle a multitude of concurrent threads. The scalability and performance improvements that this method offers are invaluable in today's fast-paced, data-driven world.



Script



using System;
using Polly;
using System.Data.SqlClient;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.IO;

namespace HealthCheck
{
class Program
{
const string LogFilePath = "c:\\temp\\log.txt";
static async Task Main(string[] args)
{
int numberOfThreads = 15000; //Nr Threads
int maxDegreeOfParallelism = 850; //Nr Threads to run concurrent
string connectionString = "data source=tcp:servername.database.windows.net,1433;initial catalog=dname;User ID=username;Password=password;ConnectRetryCount=3;ConnectRetryInterval=10;Connection Timeout=30;Max Pool Size=1200;MultipleActiveResultSets=false;Min Pool Size=1;Application Name=Testing by JMJD - SQL;Pooling=true";

var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);

var tasks = new Task[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++)
{
tasks = Task.Run(async () =>
{
await semaphore.WaitAsync();
try
{
await ExecuteQueryAsync(connectionString);
}
finally
{
semaphore.Release();
}
});
}

await Task.WhenAll(tasks);
}

static async Task ExecuteQueryAsync(string connectionString)
{
int threadId = Thread.CurrentThread.ManagedThreadId;
TimeSpan ts;
string elapsedTime;
try
{
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();

Log($"Thread {threadId}: Started");
Log($"Thread {threadId}: Opening the connection");
SqlConnection connection = await ConnectWithRetriesAsync(connectionString);

ts = stopWatch.Elapsed;

elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}",
ts.Hours, ts.Minutes, ts.Seconds,
ts.Milliseconds / 10);
Log($"Thread {threadId}: Connected - {elapsedTime} " + connection.ClientConnectionId.ToString());


Log($"Thread {threadId}: Executing the command");

SqlCommand command = new SqlCommand("SELECT 1", connection);
command.CommandTimeout = 5;

stopWatch.Reset();
stopWatch.Start();

object result = await ExecuteCommandWithRetriesAsync(command);

stopWatch.Stop();
ts = stopWatch.Elapsed;
elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}",
ts.Hours, ts.Minutes, ts.Seconds,
ts.Milliseconds / 10);
Log($"Thread {threadId}: Executed the command - {elapsedTime} - Result: {result}");
Log($"Thread {threadId}: Closing the connection");
connection.Close();
}
catch (OperationCanceledException canc)
{
Log($"Thread {threadId}: Error (Cancelation): {canc.Message}");
}
catch (Exception ex)
{
Log($"Thread {threadId}: - Error (Exception): {ex.Message}");
}
}

static async Task<SqlConnection> ConnectWithRetriesAsync(string connectionString)
{

SqlConnection connection = new SqlConnection(connectionString);
var policy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(connection.ConnectionTimeout * 1.05),
(exception, timespan, retryCount, context) =>
{
Log($"Retry {retryCount} due to {exception.Message}. Will retry in {timespan.TotalSeconds} seconds.");
});

await policy.ExecuteAsync(async () =>
{
try
{
await connection.OpenAsync();
}
catch (Exception ex)
{
throw;
}
});

return connection;
}

static async Task<object> ExecuteCommandWithRetriesAsync(SqlCommand command)
{
var policy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(command.CommandTimeout * 1.05),
(exception, timespan, retryCount, context) =>
{
Log($"Retry {retryCount} due to {exception.Message}. Will retry in {timespan.TotalSeconds} seconds.");
});

object result = null;
await policy.ExecuteAsync(async () =>
{
try
{
result = await command.ExecuteScalarAsync();
}
catch (Exception ex)
{
throw;
}
});

return result;
}

static void Log(string message)
{
string logMessage = $"{DateTime.Now}: {message}";
Console.WriteLine(logMessage);
try
{
using (FileStream stream = new FileStream(LogFilePath, FileMode.Append, FileAccess.Write, FileShare.ReadWrite))
{
using (StreamWriter writer = new StreamWriter(stream))
{
writer.WriteLine(logMessage);
}
}
}
catch (IOException ex)
{
Console.WriteLine($"Error writing in the log file: {ex.Message}");
}
}

static void DeleteLogFile()

{
try
{
if (File.Exists(LogFilePath))
{
File.Delete(LogFilePath);
}
}
catch (Exception ex)
{
Console.WriteLine($"Error deleting log file: {ex.Message}");
}
}
}
}

Continue reading...
 
Back
Top