Datenmigration mit dem SQL-Server: Daten in der Einbahnstraße

post-thumb

In meinem letzten Beitrag habe ich gezeigt, wie man bei einer Datenmigration vom SQL-Server mithilfe von den hauseigenen Boardmitteln in Form von Change Tracking Funktionalitäten erhält, die einen bei diesem Unterfangen unterstützen. Falls man hierzu nochmal nachlesen, dann hier entlang .

In diesem Beitrag möchte ich mich auf den nächsten Migrationsschritt fokussieren. Hierzu gehört das Abrufen der Daten in Bezug auf die festgestellten Änderungen vom SQL-Server und dem Weiterleiten an Drittsysteme.

Datenabruf vom SQL-Server: One-Way-Sync

Der Abruf in eine Richtung wird auch als One-Way-Synchronisation bezeichnet. Zu den Aufgaben gehören u. a. folgende:

  • Abruf der geänderten Datensätze vom SQL-Server
  • Mappen auf eine Domänen-Entität
  • Senden der Daten an eine Middleware

Wahl der Technologie: Worker Service

Nun stellt sich die Frage, wie die Daten abgerufen werden sollen. Genauer gesagt, welche Technologie man dazu nutzt. Üblicherweise würde die Wahl hier entweder auf ein Konsolenprogramm oder auf eine Azure Function fallen. Da wir im gewählten Szenario aber von einem OnPrem-SQL-Server ausgehen können, eigenen sich Technologien, die sowohl im OnPrem- als auch im Cloud-Umfeld (z. B. Azure) funktionieren. Diese Anforderungen erfüllt der Worker Service voll und ganz.

Der Worker Service eignet sich besonderes für die Erstellung von langwierigen Aufgaben, wie u.a. das Ausführen von zeitbasierten Vorgängen nach einem Zeitplan. Der größte Vorteil hierbei ist die Unabhängigkeit zu Windows als Host-System. Man kann somit plattformübergreifende Hintergrunddienste entwickeln und benötigt lediglich .NET 6 oder höher, um direkt zu starten.

Demo mit dem Worker Service

Bevor wir mit der eigentlichen Implementierung einsteigen, möchte ich den groben Ablauf skizzieren:

  1. Abruf der letzten Sync-Version (SYS_CHANGE_VERSION)
  2. Abruf der Änderungen von der Datenbank
  3. Mappen der Daten auf ein gemeinsames Domänen-Objekt
  4. Senden der gemappten Daten an eine Middleware
  5. Speichern der neuen Sync-Version

Gemeinsames Domänen-Objekt

Das gemeinsame Domänen-Objekt wird verwendet, um Daten aus dem Quell- und Zielsystem auf eine gemeinsame Datenbasis zu heben. In unserem Beispiel sähe das so aus:

namespace Blog_Sync.Domain
{
    public class Employee
    {
        public int? Id { get; set; }

        public string Firstname { get; set; }

        public string Lastname { get; set; }

        public Department CurrentDepartment { get; set; }
    }

    public enum Department
    {
        None,
        IT,
        Marketing,
        Finanzen,
        Facility
    }
}

Stored Procedure

Als Basis für unseren Datenabruf verwende ich eine Stored Procedure, die sowohl Änderungsinformationen aus dem Change Tracking enthält, als auch den Payload für die Entität (Employee), die wir abrufen möchten.

CREATE PROCEDURE [dbo].[GetEmplyeeChanges] @last_synchronization_version bigint
AS
SELECT
	CT.[Id],
    Employee.[Firstname],
    Employee.[Lastname],
    Employee.[Current Department],
	CT.SYS_CHANGE_OPERATION,
	CT.SYS_CHANGE_VERSION,
	CT.SYS_CHANGE_CREATION_VERSION
FROM
	CHANGETABLE(CHANGES Employee, @last_synchronization_version) AS CT
LEFT OUTER JOIN [wf-sample].[dbo].[Employee] AS Employee
	ON CT.id = Employee.Id

1. Abruf der letzten Sync-Version

Die Sync-Version wird dazu verwendet, um immer die aktuellsten Änderungen der Datenbank abzurufen. Damit nicht immer alles, sondern nur die noch nicht abgerufenen Änderungen abgerufen wird, speichern wir die letzte bekannte Versionsnummer in eine JSON-Datei.

Die entsprechende Datei (EmployeeVersion.json) sieht dann so aus:

{
  "EmployeeVersion": 0
}

Das Auslesen kann auf folgende Art und Weise abgebildet werden:

 private long LoadLastSyncVersion()
 {
     // load and read Json-File
     var filePath = Path.Combine(Directory.GetCurrentDirectory(), "SyncVersions", "EmployeeVersion.json");
     var jsonContent = File.ReadAllText(filePath);
     var jsonObject = JsonNode.Parse(jsonContent).AsObject();

     var lastSyncVersion = (long)jsonObject["EmployeeVersion"];

     _logger.LogInformation("EmplyeeSender #### Read #### Change Tracking #### Version: {0}", lastSyncVersion);
     
     return lastSyncVersion;
 }

2. Abruf der Änderungen von der Datenbank

Mit der ermittelten Version kann nun auf die Stored Procedure zugegriffen werden, um die Änderungen aus der Datenbank abzurufen:

 private List<EmployeeSource> LoadChangesForEmplyoeeFromDatabase(long lastSyncVersion)
 {
     var employeeEntityList = new List<EmployeeSource>();

     try
     {
         using (var sqlConnection = new SqlConnection(_databaseConnectionString))
         {
             sqlConnection.Open();

             using (var command = new SqlCommand(_databaseStoredProcedureName, sqlConnection))
             {
                 command.CommandType = CommandType.StoredProcedure;

                 command.Parameters.AddWithValue("@last_synchronization_version", lastSyncVersion);

                 using (var reader = command.ExecuteReader())
                 {
                     if (reader.HasRows)
                     {
                         while (reader.Read())
                         {
                             employeeEntityList.Add(new EmployeeSource
                             {
                                 Id = reader.IsDBNull(0) ? null : reader.GetInt32(0),
                                 Firstname = reader.IsDBNull(1) ? null : reader.GetString(1),
                                 Lastname = reader.IsDBNull(2) ? null : reader.GetString(2),
                                 CurrentDepartment = reader.IsDBNull(3) ? null : (Department)Enum.Parse(typeof(Department), reader.GetString(3)),
                                 SYS_CHANGE_OPERATION = reader.IsDBNull(4) ? null : reader.GetString(4),
                                 SYS_CHANGE_VERSION = reader.IsDBNull(5) ? null : reader.GetInt64(5),
                                 SYS_CHANGE_CREATION_VERSION = reader.IsDBNull(6) ? null : reader.GetInt64(6),
                             });
                         }
                     }
                 }
             }

             sqlConnection.Close();
         }
     }
     catch (Exception ex)
     {
         _logger.LogError(ex, "Error #### Processing Database Operations");
     }

     return employeeEntityList;
 }

Die Daten werden übrigens in ein Zwischenobjekt (EmployeeSource) gespeichert:

public class EmployeeSource
{
    public int? Id { get; set; }

    public string? Firstname { get; set; }

    public string? Lastname { get; set; }

    public Department? CurrentDepartment { get; set; }

    public string? SYS_CHANGE_OPERATION { get; set; }

    public long? SYS_CHANGE_VERSION { get; set; }
    
    public long? SYS_CHANGE_CREATION_VERSION { get; set; }
}

3. Mappen der Daten auf ein gemeinsames Domänen-Objekt

Nach dem Laden der Quelldaten müssen diese zunächst in ein einheitliches Format gebracht werden. Hierzu können neben gängigen Bibliotheken wie Automapper auch eigene Mapping-Funktionalität verwendet werden.

In diesem Fall greife ich auf eine eigene Implementierung zurück:

private Employee MapDatabaseToDomainEntity(EmployeeSource employeeDatabase)
{
    return new Employee
    {
        Id = (int)employeeDatabase.Id,
        Firstname = (string)employeeDatabase.Firstname,
        Lastname = (string)employeeDatabase.Lastname,
        CurrentDepartment = (Department)employeeDatabase.CurrentDepartment,
    };
}

4. Senden der gemappten Daten an eine Middleware

Nun können die Daten in Domänenrepräsentation an eine Middleware weitergeleitet werden. Das hat den Vorteil, dass der Sende- und Empfangsvorgang hierdurch entkoppelt wird. In Azure eignet sich hierfür z. B. der Service Bus. Durch das Pub-Sub-Prinzip können Nachrichten an ein Thema (engl. Topic) (in diesem Fall emplyeeDatabase) gesendet werden. Drittsysteme, die diese Daten empfangen möchten, müssen sich lediglich für dieses Thema abonnieren (engl. subscribe). Zusätzlich zum Nachrichteninhalt (Payload der Employee-Entität) wird der Nachricht noch eine Meta-Information hinzugefügt, die den Änderungstyp (Erstellung, Aktualisierung, Löschung) angibt, damit die Drittsysteme eine entsprechende Anpassung ihrer Daten durchführen können.

Das Senden auf den Service Bus führt auch die zuvor implementierte Mapping-Logik aus:

private async Task SendEmployeesToServiceBus(List<EmployeeSource> employeeSourceList)
{
    var client = new ServiceBusClient(_serviceBusConnectionString);
    var sender = client.CreateSender("emplyeeDatabase");

    try
    {
        foreach (var employeeSource in employeeSourceList)
        {
            var employeeJson = JsonSerializer.Serialize(MapDatabaseToDomainEntity(employeeSource));

            var serviceBusMessage = new ServiceBusMessage(employeeJson);
            serviceBusMessage.ApplicationProperties.Add("OperationType", employeeSource.SYS_CHANGE_OPERATION);

            await sender.SendMessageAsync(serviceBusMessage);

            _logger.LogInformation($"EmplyeeSender #### Emplyoee sent #### SB topic '{_serviceBusTopicName} #### ID: '{employeeSource.Id}'");
        }
    }
    finally
    {
        await sender.DisposeAsync();
        await client.DisposeAsync();
    }
}

5. Speichern der neuen Sync-Version

Zu guter Letzt wird die neue Sync-Version in der JSON-Datei abgespeichert:

private void SaveLastSyncVersion(long lastSyncVersion)
{
    // load and read Json-File
    var filePath = Path.Combine(Directory.GetCurrentDirectory(), "SyncVersions", "EmployeeVersion.json");
    var jsonContent = File.ReadAllText(filePath);
    var jsonObject = JsonNode.Parse(jsonContent).AsObject();

    jsonObject["EmployeeVersion"] = lastSyncVersion;

    var updatedJson = jsonObject.ToJsonString();

    File.WriteAllText(filePath, updatedJson);

    _logger.LogInformation("EmplyeeSender #### Read #### Change Tracking #### Version: {0}", lastSyncVersion);
}

Das Ermitteln der neuen Sync-Version erfolgt im Worker selbst.

Abruf der Methoden im Worker Service

Die zuvor erstellten Methoden werden nun im Worker Service aufgerufen. Der Ablauf ist im Folgenden dargestellt:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (!stoppingToken.IsCancellationRequested)
    {
        // 1. Abruf der letzten Sync-Version
        var lastSyncVersion = LoadLastSyncVersion();

        // 2. Abruf der Änderungen von der Datenbank
        var employeeChangesList = LoadChangesForEmplyoeeFromDatabase(lastSyncVersion);

        if (employeeChangesList.Count != 0)
        {
            _logger.LogInformation($"EmplyeeSender #### Changes Detected #### Count: {employeeChangesList.Count}");

            // 3. Mappen der Daten auf ein gemeinsames Domänen-Objekt
            // 4. Senden der gemappten Daten an eine Middleware
            await SendEmployeesToServiceBus(employeeChangesList);

            // Determine new SyncVersion
            var newSyncVersion = (long)employeeChangesList.Max(x => x.SYS_CHANGE_VERSION);

            // 5. Speichern der neuen Sync-Version
            SaveLastSyncVersion(newSyncVersion);
        }

        _logger.LogInformation("EmplyeeSender #### Wait 5 Seconds");

        // Wait 5 seconds
        await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
    }
}

Der Worker Service lädt zunächst die Change-Tracking-Versionsnummer aus der JSON-Datei und ruft anschließend die Änderungen aus der Datenbank ab. Die ermittelten Datensätze werden dann auf die allgemeingültige Domänenrepräsentation gemappt und an eine Middleware (in unserem Fall Azure Service Bus) gesendet. Die neue Change-Tracking-Versionsnummer wird zum Schluss in der JSON-Datei für den nächsten Durchlauf aktualisiert und der Worker Service wartet 5 Sekunden bis zum nächsten Durchlauf. Je nach Datenmenge bzw. Änderungen, welche sich in der SQL-Datenbank ergeben, kann dieser Wert hoch oder heruntergesetzt werden.

Lernen Sie uns kennen

Das sind wir

Wir sind ein Software-Unternehmen mit Hauptsitz in Karlsruhe und auf die Umsetzung von Digitalstrategien durch vernetzte Cloud-Anwendungen spezialisiert. Wir sind Microsoft-Partner und erweitern Standard-Anwendungen bei Bedarf – egal ob Modernisierung, Integration, Implementierung von CRM- oder ERP-Systemen, Cloud Security oder Identity- und Access-Management: Wir unterstützen Sie!

Mehr über uns

Der Objektkultur-Newsletter

Mit unserem Newsletter informieren wir Sie stets über die neuesten Blogbeiträge,
Webcasts und weiteren spannenden Themen rund um die Digitalisierung.

Newsletter abonnieren