Datenmigration mit dem SQL-Server: bidirektionaler Datenverkehr

post-thumb

In den letzten zwei Blogbeiträgen habe ich die Datenmigration vom SQL-Server thematisiert. Im ersten Teil wurden die Funktionalitäten rund um Change Tracking auf dem SQL-Server erläutert und im zweiten Teil habe ich gezeigt, wie mit Worker Service die Änderungen abgerufen und an den Azure Service Bus weitergeleitet werden.

Hier entlang zum Nachlesen:

In diesem Beitrag möchte ich den Weg zurück betrachten. Das bedeutet, dass ein bidirektionaler Austausch der Daten zwischen dem SQL-Server und dem Drittsystem erfolgt. Welche Herausforderungen auf uns warten und wie man diese am besten bewältigt, zeige ich erneut in einer beispielhaften Implementierung.

Datenverkehr in beide Richtungen: Two-Way-Sync

Das Senden und Empfangen von Daten wird auch als Two-Way-Synchronisation bezeichnet. Die Komplexität ist im Vergleich zu einem One-Way-Sync natürlich deutlich höher. So können durch die Anpassbarkeit der Datensätze in beiden Systemen Inkonsistenzen entstehen, welche die Sync-Logik im besten Fall selbstständig lösen kann. Für gewöhnlich agiert in einem solchen Szenario ein System als führend, wessen Änderungen an den Datensätzen dann entsprechend vorgezogen wird.

Zusätzlich zur bestehenden Sende-Logik aus der One-Way-Sync-Demo muss diese um eine Empfangslogik erweitert werden:

  • Empfangen der Domänen-Entität
  • Mappen auf Ziel-Entität
  • Speichern der Änderungen in Datenbank (Erstellung, Aktualisierung, Löschung)

Wir gehen in diesem Szenario davon aus, dass das führende System die Anwendung mit dem SQL-Server ist. Die Drittsysteme kennen somit die Primärschlüssel der SQL-Datenbank und senden diese stets mit, wodurch entsprechende Aktionen an der Datenbank vorgenommen werden können.

Zusätzlich muss die Stored Procedure für die Sende-Logik angepasst werden, damit einkommende Änderungen nicht von der Sende-Logik fälschlicherweise als solche erkannt und gesendet werden.

Erweiterung der Demo um einen Two-Way-Sync

Neben dem Erstellen eines komplett neuen Workers (ReceiveWorker) wird auch die Stored Procedure entsprechend angepasst.

Folgende Schritte sind für die Empfangslogik zu implementieren:

  1. Abruf der Änderungen vom Azure Service Bus
  2. Mappen der Domänen-Entität auf Ziel-Entität
  3. Speicherung der Änderung in Datenbank
    • Erstellen (Create)
    • Ändern (Update)
    • Löschen (Delete)

Anpassen der Stored Procedure

Die Stored Procedure wird um eine WHERE-Clause erweitert, in der geprüft wird, ob ein Änderungskontext (im Rahmen von Change Tracking) gesetzt wurde:

ALTER PROCEDURE [dbo].[GetEmplyeeChanges] @last_synchronization_version bigint
AS
SELECT
  CT.[Id],
  Employee.[Firstname],
  Employee.[Lastname],
  Employee.[Current Department],
  CT.SYS_CHANGE_OPERATION,
  CT.SYS_CHANGE_VERSION,
  CT.SYS_CHANGE_CREATION_VERSION,
  CT.SYS_CHANGE_CONTEXT,
FROM
	CHANGETABLE(CHANGES Employee, @last_synchronization_version) AS CT
LEFT OUTER JOIN [wf-sample].[dbo].[Employee] AS Employee
	ON CT.id = Employee.Id
WHERE
  COALESCE(DATALENGTH(CT.SYS_CHANGE_CONTEXT), 0) = 0

1. Abruf der Änderungen vom Azure Service Bus

Der erste Schritt der Empfangslogik bildet den Empfang von Änderungen über den Service Bus ab. Hierzu soll vom Service Bus auf dem jeweiligen Empfangs-Topic (in unserem Fall EmployeeThirdParty) eine Nachricht abgerufen werden.

private async Task<ServiceBusReceivedMessage?> RecieveMessageAsync(CancellationToken stoppingToken)
{
    var client = new ServiceBusClient(_serviceBusConnectionString);

    ServiceBusReceiver receiver = client.CreateReceiver(_serviceBusTopicName, "EmployeeThirdParty", new ServiceBusReceiverOptions
    {
        ReceiveMode = ServiceBusReceiveMode.PeekLock
    });

    return await receiver.ReceiveMessageAsync(cancellationToken: stoppingToken);
}

Im Worker selbst muss später geprüft werden, ob die Nachricht einen Inhalt hat oder leer (null) ist, bevor weitere Aktionen ausgeführt werden.

2. Mappen der Domänen-Entität auf Ziel-Entität

Nach dem Empfang der Nachricht vom Service Bus muss die Domänen-Entität auf die Zielentität gemappt werden. In unserem Fall umfasst das Ganze vier Attribute, die gemeinsam mit dem Deserialisieren der Nachricht umgesetzt wird:

private EmployeeSource EmployeeMap2Domaintype(ServiceBusReceivedMessage message)
{
    var employeeDomain = JsonSerializer.Deserialize<Employee>(message.Body.ToString()) ?? throw new Exception("EmplyeeReceiver #### Read Changes from Service Bus #### Employee is NULL!");

    return new EmployeeSource
    {
        Id = employeeDomain.Id,
        Firstname = employeeDomain.Firstname,
        Lastname = employeeDomain.Lastname,
        CurrentDepartment = employeeDomain.CurrentDepartment
    };
}

3. Speicherung Änderung in Datenbank

Im letzten Schritt müssen die Änderungen auf die Datenbank angewandt werden. Hierzu gehören Methoden, mit denen man alle Operationen (Create, Update, Delete) durchführen kann. Der Aufbau ist jeweils sehr ähnlich: Es werden SQL-Transaktionen mit entsprechenden Operationen durchgeführt, welche im Fehlerfall einen Rollback ausführen. Dabei wird auch jeweils ein Änderungskontext gesetzt, welcher von der Stored Procedure, die für das Laden der Änderungen verwendet wird, berücksichtigt wird. Somit werden die einkommenden Änderungen nicht wieder abgerufen.

Erstellen (Create)

private void InsertChangesToDB(EmployeeSource employee)
{
    using (var sqlConnection = new SqlConnection(_databaseConnectionString))
    {
        sqlConnection.Open();

        SqlTransaction transaction = sqlConnection.BeginTransaction();
        try
        {
            using (SqlCommand insertCommand = sqlConnection.CreateCommand())
            {
                insertCommand.Transaction = transaction;

                // Insert Employee
                insertCommand.CommandText =
                    "DECLARE @varSyncContext VARBINARY(100) = convert(varbinary, 'Two-Way-Sync'); " +
                    "WITH CHANGE_TRACKING_CONTEXT (@varSyncContext) " +
                    "INSERT INTO [dbo].[Employee] ( " +
                        "Firstname, " +
                        "Lastname, " +
                        "Current Department " +
                        ") " +
                    "VALUES ( " +
                        "@Firstname, " +
                        "@Lastname, " +
                        "@CurrentDepartment  " +
                        ")";
                
                insertCommand.Parameters.AddWithValue("@Firstname", employee.Firstname);
                insertCommand.Parameters.AddWithValue("@Lastname", employee.Lastname);
                insertCommand.Parameters.AddWithValue("@CurrentDepartment", employee.CurrentDepartment.ToString());

                // Execute the INSERT-Command
                insertCommand.ExecuteNonQuery();
            }

            transaction.Commit();

        }
        catch (Exception ex)
        {
            // Transaction-Rollback on Error
            transaction.Rollback();
            _logger.LogError(ex, "Error #### Processing Database Operations ### Rollback initiaded");
            throw;
        }
        finally
        {
            sqlConnection.Close();
        }
    }
}

Ändern (Update)

private void UpdateChangesToDB(EmployeeSource employee)
{
    using (var sqlConnection = new SqlConnection(_databaseConnectionString))
    {
        sqlConnection.Open();

        SqlTransaction transaction = sqlConnection.BeginTransaction();
        try
        {
            using (SqlCommand updateCommand = sqlConnection.CreateCommand())
            {
                updateCommand.Transaction = transaction;

                // Update Employee
                updateCommand.CommandText =
                    "DECLARE @varSyncContext VARBINARY(100) = convert(varbinary, 'Two-Way-Sync'); " +
                    "WITH CHANGE_TRACKING_CONTEXT (@varSyncContext) " +
                    "UPDATE [dbo].[Employee] " +
                        "SET " +
                        "Firstname = @Firstname, " +
                        "Lastname = @Lastname, " +
                        "Current Department = @CurrentDepartment" +
                    "WHERE " +
                        "Id = @Id;";

                updateCommand.Parameters.AddWithValue("@Id", employee.Id);
                updateCommand.Parameters.AddWithValue("@Firstname", employee.Firstname);
                updateCommand.Parameters.AddWithValue("@Lastname", employee.Lastname);
                updateCommand.Parameters.AddWithValue("@CurrentDepartment", employee.CurrentDepartment.ToString());

                // Execute the UPDATE-Command
                updateCommand.ExecuteNonQuery();
            }

            transaction.Commit();

        }
        catch (Exception ex)
        {
            // Transaction-Rollback on Error
            transaction.Rollback();
            _logger.LogError(ex, "Error #### Processing Database Operations ### Rollback initiaded");
            throw;
        }
        finally
        {
            sqlConnection.Close();
        }
    }
}

Löschen (Delete)

private void DeleteChangesToDB(EmployeeSource employee)
{
    using (var sqlConnection = new SqlConnection(_databaseConnectionString))
    {
        sqlConnection.Open();

        SqlTransaction transaction = sqlConnection.BeginTransaction();
        try
        {
            using (SqlCommand deleteCommand = sqlConnection.CreateCommand())
            {
                deleteCommand.Transaction = transaction;

                // Delete Employee
                deleteCommand.CommandText =
                    "DECLARE @varSyncContext VARBINARY(100) = convert(varbinary, 'Two-Way-Sync'); " +
                    "WITH CHANGE_TRACKING_CONTEXT (@varSyncContext) " +
                    "DELETE FROM [dbo].[Employee] " +
                    "WHERE " +
                        "Id = @Id;";

                deleteCommand.Parameters.AddWithValue("@Id", employee.Id);

                // Execute the DELETE-Command
                deleteCommand.ExecuteNonQuery();
            }

            transaction.Commit();

        }
        catch (Exception ex)
        {
            // Transaction-Rollback on Error
            transaction.Rollback();
            _logger.LogError(ex, "Error #### Processing Database Operations ### Rollback initiaded");
            throw;
        }
        finally
        {
            sqlConnection.Close();
        }
    }
}

Anpassen Receive-Worker-Service für den Two-Way-Sync

Der Worker Service für den Empfang sieht dann so aus:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    // 1. Abruf der Änderungen vom Azure Service Bus
    var message = await RecieveMessageAsync(stoppingToken);

    while (!stoppingToken.IsCancellationRequested)
    {
        if (message == null)
        {
            continue;
        }

        // 2. Mappen der Domänen-Entität auf Ziel-Entität
        var employee = EmployeeMap2Domaintype(message);
        var operationType = (string)message.ApplicationProperties["OperationType"];

        // 3. Änderung in Datenbank speichern
        switch (operationType)
        {
            case "I": // Insert
                
                InsertChangesToDB(employee);
                break;

            case "U": // Update

                UpdateChangesToDB(employee);
                break;

            case "D": // Delete

                DeleteChangesToDB(employee);
                break;

            default:
                break;
        }

        _logger.LogInformation("EmplyeeReceiver #### Wait 5 Seconds");

        // Wait 5 seconds
        await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);

    }
}

Der Worker Service lädt die Nachricht vom Service Bus und mappt den Inhalt auf die Ziel-Entität. Durch die Angabe des Änderungstyps aus der Service-Bus-Nachricht wir die entsprechende Methode im switch-case aufgerufen und eine Datenbankooperation ausgeführt. Durch das Setzen des Änderungskontextes wird der aktualisierte Datensatz vom Change Tracking bzw. der dazugehörenden Stored Procedure beim Senden ignoriert. So entstehen keine Endlosschleifen.

Die Konflikterkennung wurde in diesem Beitrag nur erwähnt, aber nicht in der Implementierung berücksichtigt. Da die Konflikterkennung mitunter sehr komplex werden kann, möchte ich dieses Thema in einem zukünftigen Blogbeitrag behandeln.

Lernen Sie uns kennen

Das sind wir

Wir sind ein Software-Unternehmen mit Hauptsitz in Karlsruhe und auf die Umsetzung von Digitalstrategien durch vernetzte Cloud-Anwendungen spezialisiert. Wir sind Microsoft-Partner und erweitern Standard-Anwendungen bei Bedarf – egal ob Modernisierung, Integration, Implementierung von CRM- oder ERP-Systemen, Cloud Security oder Identity- und Access-Management: Wir unterstützen Sie!

Mehr über uns

Der Objektkultur-Newsletter

Mit unserem Newsletter informieren wir Sie stets über die neuesten Blogbeiträge,
Webcasts und weiteren spannenden Themen rund um die Digitalisierung.

Newsletter abonnieren