Implementing the Saga pattern on Azure - Part 4

In this concluding post, we will enact the implementation of the Saga pattern on Azure using C#.

Serverless microservices can be realized using Azure Functions within a microservices architecture. We will capitalize on this capability by crafting a very simple example that mirrors the uncomplicated scenario we elucidated in the previous post.

Setting up the project

  • In Visual Studio, create an Azure Functions project and name it EOCS.SagaPattern for example.

  • In this project, add a new class named OrderService and add the following code in it.
1public class OrderService
2{
3    [FunctionName(nameof(PlaceOrder))]
4    public async Task<IActionResult> PlaceOrder([HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] HttpRequest req)
5    {
6        // ...
7    }
8}

This function will serve as the entry point for placing an order (for example, from a React application) and will function as the executor, as per the terminology we employed earlier.

Implementing the repositories

In our example, we are dealing with the Order database (assumed to be a SQL Server database) and the Delivery database (assumed to be a CosmosDB database). As a result, the function needs to provide access to each of these repositories.

  • Add a new folder named Repositories and a subfolder named Interfaces in it.

  • Add a new interface named IOrderRepository in the Interfaces folder with the following code.

1public interface IOrderRepository
2{
3    Task<Order> AddOrder(AddOrderRequest request);
4
5    Task DeleteOrder(string id);
6}
  • Add a new interface named IDeliveryRepository in the Interfaces folder with the following code.
1public interface IDeliveryRepository
2{
3    Task<Delivery> AddDelivery(AddDeliveryRequest request);
4
5    Task DeleteDelivery(string id);
6}

The Order and Delivery classes are, respectively, the classes that represent the order and delivery entities.

Now, we need to concretely implement these interfaces with their corresponding classes.

  • Add a class named SqlServerOrderRepository in the Repositories folder.
 1public class SqlServerOrderRepository : IOrderRepository
 2{
 3    private readonly IServiceConfiguration _configuration;
 4
 5    public SqlServerOrderRepository(IServiceConfiguration configuration)
 6    {
 7        _configuration = configuration;
 8    }
 9
10    public async Task<Order> AddOrder(AddOrderRequest request)
11    {
12        var order = request.Order;
13        using (var connection = new SqlConnection(_configuration.GetOrderConnectionString()))
14        {
15            connection.Open();
16            using (var command = connection.CreateCommand())
17            {
18                command.CommandText = $"INSERT INTO Orders VALUES('{order.OrderId}', {order.Amount}, '{order.Currency}')";
19                command.ExecuteNonQuery();
20            }
21
22            connection.Close();
23        }
24
25        return order;
26    }
27
28    public async Task DeleteOrder(string id)
29    {
30        using (var connection = new SqlConnection(_configuration.GetOrderConnectionString()))
31        {
32            connection.Open();
33            using (var command = connection.CreateCommand())
34            {
35                command.CommandText = $"DELETE FROM Orders WHERE OrderId = '{id}'";
36                command.ExecuteNonQuery();
37            }
38
39            connection.Close();
40        }
41    }
42}
  • Add a class named CosmosDbDeliveryRepository in the Repositories folder.
 1public class CosmosDbDeliveryRepository : IDeliveryRepository
 2{
 3    private readonly IServiceConfiguration _configuration;
 4    private readonly Container _container;
 5
 6    public CosmosDbDeliveryRepository(IServiceConfiguration configuration)
 7    {
 8        _configuration = configuration;
 9        var client = GetCosmosDbClient();
10
11        var database = client.CreateDatabaseIfNotExistsAsync("Deliveries").Result.Database;
12        _container = database.CreateContainerIfNotExistsAsync("Deliveries", "/DeliveryId").Result.Container;
13    }
14
15    public async Task<Delivery> AddDelivery(AddDeliveryRequest request)
16    {
17        var delivery = request.Delivery;
18        return await _container.UpsertItemAsync<Delivery>(delivery, new PartitionKey(delivery.DeliveryId)).ConfigureAwait(false);
19    }
20
21    public async Task DeleteDelivery(string id)
22    {
23        await _container.DeleteItemAsync<Delivery>(id, new PartitionKey(id)).ConfigureAwait(false);
24    }
25
26    #region Private Methods
27
28    private CosmosClient GetCosmosDbClient()
29    {
30        var options = new CosmosClientOptions() { ConnectionMode = ConnectionMode.Direct };
31        return new CosmosClient(_configuration.GetDeliveryConnectionString(), options);
32    }
33
34    #endregion
35}

Implementing the transaction logic

  • Add a new folder named Handlers and a subfolder named Interfaces in it.

  • Add an interface named ITransactionHandler in the Interfaces folder with the following code.

1public interface ITransactionHandler
2{
3    Task<bool> Commit();
4    Task Rollback();
5}

This handler will be responsible for coding the compensation logic specific to each database.

Compensation logic for orders

  • Add a class named PlaceOrderTransactionHandler in the Handlers folder with the following code.
 1public class PlaceOrderTransactionHandler : ITransactionHandler
 2{
 3    private readonly IOrderRepository _repository;
 4    private readonly AddOrderRequest _request;
 5
 6    public PlaceOrderTransactionHandler(IOrderRepository orderRepository, AddOrderRequest request) 
 7    {
 8        _repository = orderRepository;
 9        _request = request;
10    }
11
12    public async Task<bool> Commit()
13    {
14        try
15        {
16            await _repository.AddOrder(_request).ConfigureAwait(false);
17            return true;
18        }
19        catch
20        {
21            return false;
22        }
23    }
24
25    public async Task Rollback()
26    {
27        try
28        {
29            await _repository.DeleteOrder(_request.Order.OrderId).ConfigureAwait(false);
30        }
31        catch
32        {
33            // There is a serious problem.
34            // Log the issue and try to resolve manually
35        }
36    }
37}

The code is straightforward: when an order needs to be inserted, we simply insert a new row in SQL Server. If the executor needs to compensate the transaction, we delete the previously inserted row.

Compensation logic for delivery

  • Add a class named StoreDeliveryTransactionHandler in the Handlers folder with the following code.
 1public class StoreDeliveryTransactionHandler : ITransactionHandler
 2{
 3    private readonly IDeliveryRepository _repository;
 4
 5    private readonly AddDeliveryRequest _request;
 6
 7    public StoreDeliveryTransactionHandler(IDeliveryRepository orderRepository, AddDeliveryRequest request)
 8    {
 9        _repository = orderRepository;
10    }
11
12    public async Task<bool> Commit()
13    {
14        try
15        {
16            await _repository.AddDelivery(_request).ConfigureAwait(false);
17            return true;
18        }
19        catch
20        {
21            return false;
22        }
23    }
24
25    public async Task Rollback()
26    {
27        try
28        {
29            await _repository.DeleteDelivery(_request.Delivery.DeliveryId).ConfigureAwait(false);
30        }
31        catch
32        {
33            // There is a serious problem.
34            // Log the issue and try to resolve manually
35        }
36    }
37}

Implementing the Saga pattern

With all these implementations in place, the Saga pattern becomes quite straightforward: we maintain a queue of ITransactionHandler instances to execute and a stack of ITransactionHandler instances that have been completed.

  • Add a new folder named Sagas.

  • Add a class named MakeAPurchaseSaga in it with the following code.

 1public class MakeAPurchaseSaga
 2{
 3    private Queue<ITransactionHandler> _handlerQueue;
 4    private Stack<ITransactionHandler> _completedHandlers;
 5
 6    public MakeAPurchaseSaga(PlaceOrderTransactionHandler placeOrderHandler, StoreDeliveryTransactionHandler storeDeliveryHandler)
 7    {
 8        _handlerQueue = new Queue<ITransactionHandler>();
 9        _completedHandlers = new Stack<ITransactionHandler>();
10
11        _handlerQueue.Enqueue(placeOrderHandler);
12        _handlerQueue.Enqueue(storeDeliveryHandler);
13    }
14
15    public async Task<bool> Execute()
16    {
17        while (_handlerQueue.Count > 0)
18        {
19            var handler = _handlerQueue.Dequeue();
20
21            if (await handler.Commit())
22            {
23                _completedHandlers.Push(handler);
24            }
25            else
26            {
27                await Compensate();
28                return false;
29            }
30        }
31
32        return true;
33    }
34
35    #region Private Methods
36
37    private async Task Compensate()
38    {
39        while (_completedHandlers.Count > 0)
40        {
41            var handler = _completedHandlers.Pop();
42            await handler.Rollback();
43        }
44    }
45
46    #endregion
47}
Important

The saga is assigned a descriptive name (MakeAPurchaseSaga) that mirrors a business operation to be executed, even if its internals are composed of various technical components.

Now, we need to properly initialize all these classes.. Azure Functions provides support for dependency injection, and we will make use of this feature to invoke the methods of the OrderRepository and DeliveryRepository classes.

  • Add a StartUp class at the root of the project and add the following code in it.
 1[assembly: WebJobsStartup(typeof(StartUp))]
 2namespace EOCS.SagaPattern
 3{
 4    public class StartUp : FunctionsStartup
 5    {
 6        public override void Configure(IFunctionsHostBuilder builder)
 7        {
 8            ConfigureServices(builder.Services, environment: null);
 9        }
10
11        private static void ConfigureServices(IServiceCollection services, string? environment = null)
12        {
13            services.AddLogging();
14
15            services.AddSingleton<IServiceConfiguration>(provider => new ServiceConfiguration());            
16                        
17            services.AddSingleton<IOrderRepository, SqlServerOrderRepository>();
18            services.AddSingleton<IDeliveryRepository, CosmosDbDeliveryRepository>();
19        }
20    }
21}
  • Add the following code in the OrderService class.
 1public class OrderService
 2{
 3    private readonly IOrderRepository _orderRepository;
 4    private readonly IDeliveryRepository _deliveryRepository;
 5
 6    public OrderService(IOrderRepository orderRepository, IDeliveryRepository deliveryRepository)
 7    {
 8        _orderRepository = orderRepository;
 9        _deliveryRepository = deliveryRepository;
10    }
11
12    [FunctionName(nameof(PlaceOrder))]
13    public async Task<IActionResult> PlaceOrder([HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] HttpRequest req)
14    {
15        var data = await new StreamReader(req.Body).ReadToEndAsync().ConfigureAwait(false);
16        var request = JsonConvert.DeserializeObject<PlaceOrderRequest>(data);
17
18        var placeOrderHandler = new PlaceOrderTransactionHandler(_orderRepository, request.OrderRequest);
19        var storeDeliveryHandler = new StoreDeliveryTransactionHandler(_deliveryRepository, request.DeliveryRequest);
20        var purchaseSaga = new MakeAPurchaseSaga(placeOrderHandler, storeDeliveryHandler);
21
22        var result = await purchaseSaga.Execute().ConfigureAwait(false);
23
24        if (result) return new OkResult();
25        return new BadRequestResult();
26    }
27}

Run the program

This Azure Function can be tested using tools like Fiddler or Postman. If we intentionally trigger an exception in the Delivery repository, we can observe that the initial transaction (executed in SQL Server) is rolled back.

1public async Task<Delivery> AddDelivery(AddDeliveryRequest request)
2{
3    //var delivery = request.Delivery;
4    //return await _container.UpsertItemAsync<Delivery>(delivery, new PartitionKey(delivery.DeliveryId)).ConfigureAwait(false);
5
6    throw new NotImplementedException();
7}

Final thoughts

If you enjoyed this article, feel free to follow me on Twitter or subscribe to the newsletter to receive notifications of new articles. Refer to the top of the page for more information.