Joshua Steward
Concurrent Flows

Concurrent Flows

Basic Dapper Resiliency Using Polly

Basic Dapper Resiliency Using Polly

Joshua Steward's photo
Joshua Steward
·Jun 6, 2021·

10 min read

Background

Dapper is a lightweight ORM for use in high performance database transactions that uses hand written SQL scripts and transforms the results into objects. This approach makes interacting with a database both performant and easy, as you can fine tune the SQL to your taste. Of course there are other high performant ORM's available, EF Core 6 has made performance a key focus area of improvement but we'll focus on Dapper here. We want our solution to to extend all the built-in functionality of Dapper and add layer of resiliency that manages transient database errors and other error types. We'll be handling exceptions with the use of a few different Polly policies.

Challenge

We want to gracefully handle transient errors, meaning we want to retry our request with a minimal amount of boiler plate. Polly makes this trivial. Next we want to gracefully handle networking errors that could corrupt our connection pool by both retrying our request and clearing the connection pool. We also want to set a high level timeout for all our operations so that our end-user is not waiting forever for a response. Finally if we encounter a fatal exception we want to bypass our retry logic and prevent additional retries with the poison request.

Solution

Error Numbers We'll Handle

First let's go through what errors we're going to handle. We're going to focus on the SqlException here with the property Number. These numbers indicate the type of error that caused the exception. If more than one error occurred it's important to know that only the first error number is reported. This information is gathered from mutliple sources including Offcial docs and the master.dbo.sysmessages table

Transient Errors

  • 40613 - Database is not currently available.
  • 40197 - The service has encountered an error processing your request. Please try again.
  • 40501 - The service is currently busy.
  • 49918 - Cannot process request. Not enough resources to process request.
  • 40549 - Session is terminated because you have a long running transaction.
  • 40550 - The session has been terminated because it has acquired too many locks.
  • 1205 - Transaction was deadlocked resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

    Networking Errors

  • 258 - Cannot call methods on server.
  • 26 - Error Locating server specified.
  • 40 - Could not open a connection to the server.
  • 10053 - A transport-level error has occurred when receiving results from the server.

    Constraint Violation Errors

  • 2627 - Cannot insert duplicate key.
  • 547 - The statement conflicted with the constraint.
  • 2601 - Cannot insert duplicate key row in object.

    Our Policies

    We'll want to handle these errors differently and execute different paths depending on the type of error received. One policy won't do. Let's take a look at what we want our policies to behave like: Resiliency.jpeg

    First In Line, Timeout

    We want to set up a timeout for all our actions to take place; Dapper has the ability to set timeouts per command but we want a global timeout on our entire request. Polly makes this easy. For this, we setup a simple Timeout Async policy

    var timeoutPolicy = Policy.TimeoutAsync(maxTimeout ?? TimeSpan.FromMinutes(2));
    

    Next, We Handle Transient Errors

    For our transient errors we want to retry and to log a warning with the associated sql and parameters that were associated with the transient error. We do this by utilizing the execution Context that we'll describe more later and set up a Wait And Retry Async policy.

    var transientPolicy = Policy.Handle<SqlException>(ex => transientNumbers.Contains(ex.Number))
      .WaitAndRetryAsync(
      transientRetries,
      attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
      (ex, _, ctx) => ctx.GetLogger()?.LogWarning(ex, "{@Operation} Encountered Transient SqlException. Params:{@Param} Sql:{@Sql}", ctx.OperationKey, ctx[ParamContextKey], ctx[SqlContextKey]));
    

    Next, The Networking Errors

    For networking errors we want another Wait And Retry Async policy and not only log, as we did with transient error, but we also want to clear the connection pool in case it's become corrupted. We do this again by accessing available data within our Context.

    var networkPolicy = Policy.Handle<SqlException>(ex => networkingNumbers.Contains(ex.Number))
      .WaitAndRetryAsync(
      networkRetries,
      attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
      (ex, _, ctx) =>
      {
          ctx.GetLogger()?.LogWarning(ex, "{@Operation} Encountered a Network Error. Params:{@Param} Sql:{@Sql}", ctx.OperationKey, ctx[ParamContextKey], ctx[SqlContextKey]);
          if (ctx.TryGetConnection(out var connection))
              SqlConnection.ClearPool(connection);
      });
    

    Next, Constraint Violations

    If we violate a constraint with a combination of query and parameters a retry is not going to solve the problem. In this case we create a Circuit Breaker that opens and never closes again, lggoing and returning the error straight back to the consumer.

    var constraintPolicy = Policy.Handle<SqlException>(ex => constraintViolationNumbers.Contains(ex.Number))
      .CircuitBreakerAsync(
      1,
      TimeSpan.MaxValue,
      (ex, _, ctx) => ctx.GetLogger()?.LogError(ex, "{@Operation} Encountered a Constraint Violation. Params:{@Param} Sql:{@Sql}", ctx.OperationKey, ctx[ParamContextKey], ctx[SqlContextKey]),
      ctx => { }
      );
    

    Putting The Policies Together

    Finally we wrap our policies in our desired execution order and return our fully formed resiliency policy.

    var resiliencyPolicy = timeoutPolicy
      .WrapAsync(transientPolicy)
      .WrapAsync(networkPolicy)
      .WrapAsync(constraintPolicy);
    return resiliencyPolicy;
    

    All Together Now

    And here's what all this together looks like.

    public static class SqlResiliencyPolicy
    {        
      private static readonly ISet<int> transientNumbers = new HashSet<int>(new[] { 40613, 40197, 40501, 49918, 40549, 40550, 1205 });
      private static readonly ISet<int> networkingNumbers = new HashSet<int>(new[] { 258, -2, 10060, 0, 64, 26, 40, 10053 });
      private static readonly ISet<int> constraintViolationNumbers = new HashSet<int>(new[] { 2627, 547, 2601 });
    
      public static IAsyncPolicy GetSqlResiliencyPolicy(TimeSpan? maxTimeout = null, int transientRetries = 3, int networkRetries = 3)
      {
          var timeoutPolicy = Policy.TimeoutAsync(maxTimeout ?? TimeSpan.FromMinutes(2));
    
          var transientPolicy = Policy.Handle<SqlException>(ex => transientNumbers.Contains(ex.Number))
              .WaitAndRetryAsync(
              transientRetries,
              attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
              (ex, _, ctx) => ctx.GetLogger()?.LogWarning(ex, "{@Operation} Encountered Transient SqlException. Params:{@Param} Sql:{@Sql}", ctx.OperationKey, ctx[ParamContextKey], ctx[SqlContextKey]));
    
          var networkPolicy = Policy.Handle<SqlException>(ex => networkingNumbers.Contains(ex.Number))
              .WaitAndRetryAsync(
              networkRetries,
              attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
              (ex, _, ctx) =>
              {
                  ctx.GetLogger()?.LogWarning(ex, "{@Operation} Encountered a Network Error. Params:{@Param} Sql:{@Sql}", ctx.OperationKey, ctx[ParamContextKey], ctx[SqlContextKey]);
                  if (ctx.TryGetConnection(out var connection))
                      SqlConnection.ClearPool(connection);
              });
    
          var constraintPolicy = Policy.Handle<SqlException>(ex => constraintViolationNumbers.Contains(ex.Number))
              .CircuitBreakerAsync(
              1,
              TimeSpan.MaxValue,
              (ex, _, ctx) => ctx.GetLogger()?.LogError(ex, "{@Operation} Encountered a Constraint Violation. Params:{@Param} Sql:{@Sql}", ctx.OperationKey, ctx[ParamContextKey], ctx[SqlContextKey]),
              ctx => { }
              );
    
          var resiliencyPolicy = timeoutPolicy
              .WrapAsync(transientPolicy)
              .WrapAsync(networkPolicy)
              .WrapAsync(constraintPolicy);
    
          return resiliencyPolicy;
      }
    }
    

    The Context

    As you can see we make a lot of use of the Context in our policies. The Context object is scoped to a particular execution of a policy and provides a way to pass contextual information about the delegate that's being executed. Learn more with Using Execution Context in Polly We use it to store the requested sql script, the parameters, our logger and our connection. So to make this simpler we have a Context helper static class.

    public static class ContextHelper
    {
      public static readonly string LoggerContextKey = nameof(LoggerContextKey);
      public static readonly string SqlContextKey = nameof(SqlContextKey);
      public static readonly string ParamContextKey = nameof(ParamContextKey);
      public static readonly string ConnectionContextKey = nameof(ConnectionContextKey);
    
      public static Context NewContext(
          SqlConnection connection, 
          ILogger logger, 
          string sql,
          object param, 
          string operationKey)
      {
          return new Context(operationKey, new Dictionary<string, object>()
          {
              { ConnectionContextKey, connection },
              { LoggerContextKey, logger },
              { SqlContextKey, sql },
              { ParamContextKey, param }
          });
      }
    
      public static ILogger GetLogger(this Context ctx)
          => ctx[LoggerContextKey] as ILogger;
    
      public static bool TryGetConnection(this Context ctx, out SqlConnection connection)
          => (connection = ctx[ConnectionContextKey] as SqlConnection) is not null ? true : false;
    }
    

    Registration With DI

    We need to register three items with our DI: A factory for our SqlConnection, our resiliency policy, and our yet to be revealed SqlDapperClient.

    public delegate SqlConnection SqlConnectionFactory();
    public static void AddSqlDapperClient(this IServiceCollection services, string connectionString)
    {
      services.AddSingleton<SqlConnectionFactory>(() => new SqlConnection(connectionString));
      services.AddScoped(_ => SqlResiliencyPolicy.GetSqlResiliencyPolicy());
      services.AddScoped<ISqlDapperClient, SqlDapperClient>();
    }
    

    Notice that our Resiliency Policy and Dapper Client are scoped. This means we'll get a new instance for each request and we're not sharing the policy across requests. We could have used a more elaborate setup if we needed our client to be singleton and used mutable Context to affect the state of our resiliency policy in the case of, for instance, the Constraint Violation exception, but we're just keeping it scoped here for simplicities sake.

    The Dapper Wrapper, err... Client

    Dapper is primarily a set of extension methods on IDbConnection. Now, while it is highly recommended to integration test your data access layer, I find it's also useful to unit test the layer above that, the layer that interacts with data access should be tested to ensure it passes the correct sql script and expected arguments. To facilitate this we need a way to mock the behavior of Dapper. Mocking the complete set of interactions from IDbConnection to a SqlCommand is laborious and we can simply create a client whose only purpose is to expose the extension methods of Dapper as a mockable interface and whose design is one that necessitate integration testing with a testing database. We expose an interface with the methods we'd like to mock, this is just a sampling of what Dapper offers but can easily be expanded.

    public interface ISqlDapperClient
    {
      Task<int> ExecuteAsync(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null);
      Task<T> ExecuteScalarAsync<T>(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null);
      Task<IEnumerable<T>> QueryAsync<T>(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null);
      Task<T> QueryFirstOrDefaultAsync<T>(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null);
      Task<IEnumerable<TReturn>> QueryAsync<TFirst, TSecond, TReturn>(string sql, Func<TFirst, TSecond, TReturn> map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null);
      Task<IEnumerable<TReturn>> QueryAsync<TFirst, TSecond, TThird, TReturn>(string sql, Func<TFirst, TSecond, TThird, TReturn> map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null);
    }
    

    Then in the implementation we inject our Logger, Resiliency Policy and our SqlConnection Factory and proxy the Dapper calls through our resiliency policy.

    public class SqlDapperClient : ISqlDapperClient
    {
      private readonly ILogger<SqlDapperClient> logger;
      private readonly SqlConnectionFactory connectionFactory;
      private readonly IAsyncPolicy resiliencyPolicy;
    
      public SqlDapperClient(
          ILogger<SqlDapperClient> logger, 
          SqlConnectionFactory connectionFactory,
          IAsyncPolicy resiliencyPolicy)
      {
          this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
          this.connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
          this.resiliencyPolicy = resiliencyPolicy ?? throw new ArgumentNullException(nameof(resiliencyPolicy));
      }
    
      public Task<int> ExecuteAsync(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null) 
          => ExecuteWithResiliency((s, p, c) => c.ExecuteAsync(s, p, transaction, commandTimeout, commandType), sql, param);
    
      public Task<T> ExecuteScalarAsync<T>(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null) 
          => ExecuteWithResiliency((s, p, c) => c.ExecuteScalarAsync<T>(s, p, transaction, commandTimeout, commandType), sql, param);
    
      public Task<T> QueryFirstOrDefaultAsync<T>(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null) 
          => ExecuteWithResiliency((s, p, c) => c.QueryFirstOrDefaultAsync<T>(s, p, transaction, commandTimeout, commandType), sql, param);
    
      public Task<IEnumerable<T>> QueryAsync<T>(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null) 
          => ExecuteWithResiliency((s, p, c) => c.QueryAsync<T>(s, p, transaction, commandTimeout, commandType), sql, param);
    
      public Task<IEnumerable<TReturn>> QueryAsync<TFirst, TSecond, TReturn>(string sql, Func<TFirst, TSecond, TReturn> map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null) 
          => ExecuteWithResiliency((s, p, c) => c.QueryAsync(s, map, p, transaction, buffered, splitOn, commandTimeout, commandType), sql, param);
    
      public Task<IEnumerable<TReturn>> QueryAsync<TFirst, TSecond, TThird, TReturn>(string sql, Func<TFirst, TSecond, TThird, TReturn> map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null) 
          => ExecuteWithResiliency((s, p, c) => c.QueryAsync(s, map, p, transaction, buffered, splitOn, commandTimeout, commandType), sql, param);
    
      private async Task<T> ExecuteWithResiliency<T>(Func<string, object, SqlConnection, Task<T>> connectionFunc, string sql, object param = null, [CallerMemberName] string operation = "")
      {
          using var connection = connectionFactory();
          return await resiliencyPolicy.ExecuteAsync(
              ctx => connectionFunc(sql, param, connection), 
              NewContext(connection, logger, sql, param, operation));
      }
    }
    

    Notice we bring everything down to the ExecuteWithResiliency method that captures the extension method we want to execute, the sql we want to run and the parameters object that we pass to Dapper. In this method we create the connection and the instance of the execution Context and run our captured extension method with the provided arguments.

    Summary

    We have seen how to make a SqlDapperClient client resilient to transient errors with a global timeout on operations and reject constraint violations. And we've also seen how to make great use of Polly's execution Context to pass execution specific details to our policies. Finally we introduced a mockable wrapper for Dapper that will let us assert the against the passed arguments to the underlying extension methods. That's it for this post, hope you enjoyed it.

 
Share this