In meinem letzten Beitrag habe ich gezeigt, wie man bei einer Datenmigration vom SQL-Server mithilfe von den hauseigenen Boardmitteln in Form von Change Tracking Funktionalitäten erhält, die einen bei diesem Unterfangen unterstützen. Falls man hierzu nochmal nachlesen, dann hier entlang
.
In diesem Beitrag möchte ich mich auf den nächsten Migrationsschritt fokussieren. Hierzu gehört das Abrufen der Daten in Bezug auf die festgestellten Änderungen vom SQL-Server und dem Weiterleiten an Drittsysteme.
Datenabruf vom SQL-Server: One-Way-Sync
Der Abruf in eine Richtung wird auch als One-Way-Synchronisation bezeichnet. Zu den Aufgaben gehören u. a. folgende:
- Abruf der geänderten Datensätze vom SQL-Server
- Mappen auf eine Domänen-Entität
- Senden der Daten an eine Middleware
Wahl der Technologie: Worker Service
Nun stellt sich die Frage, wie die Daten abgerufen werden sollen. Genauer gesagt, welche Technologie man dazu nutzt. Üblicherweise würde die Wahl hier entweder auf ein Konsolenprogramm oder auf eine Azure Function fallen. Da wir im gewählten Szenario aber von einem OnPrem-SQL-Server ausgehen können, eigenen sich Technologien, die sowohl im OnPrem- als auch im Cloud-Umfeld (z. B. Azure) funktionieren. Diese Anforderungen erfüllt der Worker Service voll und ganz.
Der Worker Service eignet sich besonderes für die Erstellung von langwierigen Aufgaben, wie u.a. das Ausführen von zeitbasierten Vorgängen nach einem Zeitplan. Der größte Vorteil hierbei ist die Unabhängigkeit zu Windows als Host-System. Man kann somit plattformübergreifende Hintergrunddienste entwickeln und benötigt lediglich .NET 6 oder höher, um direkt zu starten.
Demo mit dem Worker Service
Bevor wir mit der eigentlichen Implementierung einsteigen, möchte ich den groben Ablauf skizzieren:
- Abruf der letzten Sync-Version (
SYS_CHANGE_VERSION
)
- Abruf der Änderungen von der Datenbank
- Mappen der Daten auf ein gemeinsames Domänen-Objekt
- Senden der gemappten Daten an eine Middleware
- Speichern der neuen Sync-Version
Gemeinsames Domänen-Objekt
Das gemeinsame Domänen-Objekt wird verwendet, um Daten aus dem Quell- und Zielsystem auf eine gemeinsame Datenbasis zu heben. In unserem Beispiel sähe das so aus:
namespace Blog_Sync.Domain
{
public class Employee
{
public int? Id { get; set; }
public string Firstname { get; set; }
public string Lastname { get; set; }
public Department CurrentDepartment { get; set; }
}
public enum Department
{
None,
IT,
Marketing,
Finanzen,
Facility
}
}
Stored Procedure
Als Basis für unseren Datenabruf verwende ich eine Stored Procedure, die sowohl Änderungsinformationen aus dem Change Tracking enthält, als auch den Payload für die Entität (Employee
), die wir abrufen möchten.
CREATE PROCEDURE [dbo].[GetEmplyeeChanges] @last_synchronization_version bigint
AS
SELECT
CT.[Id],
Employee.[Firstname],
Employee.[Lastname],
Employee.[Current Department],
CT.SYS_CHANGE_OPERATION,
CT.SYS_CHANGE_VERSION,
CT.SYS_CHANGE_CREATION_VERSION
FROM
CHANGETABLE(CHANGES Employee, @last_synchronization_version) AS CT
LEFT OUTER JOIN [wf-sample].[dbo].[Employee] AS Employee
ON CT.id = Employee.Id
1. Abruf der letzten Sync-Version
Die Sync-Version wird dazu verwendet, um immer die aktuellsten Änderungen der Datenbank abzurufen. Damit nicht immer alles, sondern nur die noch nicht abgerufenen Änderungen abgerufen wird, speichern wir die letzte bekannte Versionsnummer in eine JSON
-Datei.
Die entsprechende Datei (EmployeeVersion.json
) sieht dann so aus:
Das Auslesen kann auf folgende Art und Weise abgebildet werden:
private long LoadLastSyncVersion()
{
// load and read Json-File
var filePath = Path.Combine(Directory.GetCurrentDirectory(), "SyncVersions", "EmployeeVersion.json");
var jsonContent = File.ReadAllText(filePath);
var jsonObject = JsonNode.Parse(jsonContent).AsObject();
var lastSyncVersion = (long)jsonObject["EmployeeVersion"];
_logger.LogInformation("EmplyeeSender #### Read #### Change Tracking #### Version: {0}", lastSyncVersion);
return lastSyncVersion;
}
2. Abruf der Änderungen von der Datenbank
Mit der ermittelten Version kann nun auf die Stored Procedure zugegriffen werden, um die Änderungen aus der Datenbank abzurufen:
private List<EmployeeSource> LoadChangesForEmplyoeeFromDatabase(long lastSyncVersion)
{
var employeeEntityList = new List<EmployeeSource>();
try
{
using (var sqlConnection = new SqlConnection(_databaseConnectionString))
{
sqlConnection.Open();
using (var command = new SqlCommand(_databaseStoredProcedureName, sqlConnection))
{
command.CommandType = CommandType.StoredProcedure;
command.Parameters.AddWithValue("@last_synchronization_version", lastSyncVersion);
using (var reader = command.ExecuteReader())
{
if (reader.HasRows)
{
while (reader.Read())
{
employeeEntityList.Add(new EmployeeSource
{
Id = reader.IsDBNull(0) ? null : reader.GetInt32(0),
Firstname = reader.IsDBNull(1) ? null : reader.GetString(1),
Lastname = reader.IsDBNull(2) ? null : reader.GetString(2),
CurrentDepartment = reader.IsDBNull(3) ? null : (Department)Enum.Parse(typeof(Department), reader.GetString(3)),
SYS_CHANGE_OPERATION = reader.IsDBNull(4) ? null : reader.GetString(4),
SYS_CHANGE_VERSION = reader.IsDBNull(5) ? null : reader.GetInt64(5),
SYS_CHANGE_CREATION_VERSION = reader.IsDBNull(6) ? null : reader.GetInt64(6),
});
}
}
}
}
sqlConnection.Close();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error #### Processing Database Operations");
}
return employeeEntityList;
}
Die Daten werden übrigens in ein Zwischenobjekt (EmployeeSource
) gespeichert:
public class EmployeeSource
{
public int? Id { get; set; }
public string? Firstname { get; set; }
public string? Lastname { get; set; }
public Department? CurrentDepartment { get; set; }
public string? SYS_CHANGE_OPERATION { get; set; }
public long? SYS_CHANGE_VERSION { get; set; }
public long? SYS_CHANGE_CREATION_VERSION { get; set; }
}
3. Mappen der Daten auf ein gemeinsames Domänen-Objekt
Nach dem Laden der Quelldaten müssen diese zunächst in ein einheitliches Format gebracht werden. Hierzu können neben gängigen Bibliotheken wie Automapper auch eigene Mapping-Funktionalität verwendet werden.
In diesem Fall greife ich auf eine eigene Implementierung zurück:
private Employee MapDatabaseToDomainEntity(EmployeeSource employeeDatabase)
{
return new Employee
{
Id = (int)employeeDatabase.Id,
Firstname = (string)employeeDatabase.Firstname,
Lastname = (string)employeeDatabase.Lastname,
CurrentDepartment = (Department)employeeDatabase.CurrentDepartment,
};
}
4. Senden der gemappten Daten an eine Middleware
Nun können die Daten in Domänenrepräsentation an eine Middleware weitergeleitet werden. Das hat den Vorteil, dass der Sende- und Empfangsvorgang hierdurch entkoppelt wird. In Azure eignet sich hierfür z. B. der Service Bus. Durch das Pub-Sub-Prinzip können Nachrichten an ein Thema (engl. Topic) (in diesem Fall emplyeeDatabase
) gesendet werden. Drittsysteme, die diese Daten empfangen möchten, müssen sich lediglich für dieses Thema abonnieren (engl. subscribe). Zusätzlich zum Nachrichteninhalt (Payload der Employee-Entität) wird der Nachricht noch eine Meta-Information hinzugefügt, die den Änderungstyp (Erstellung, Aktualisierung, Löschung) angibt, damit die Drittsysteme eine entsprechende Anpassung ihrer Daten durchführen können.
Das Senden auf den Service Bus führt auch die zuvor implementierte Mapping-Logik aus:
private async Task SendEmployeesToServiceBus(List<EmployeeSource> employeeSourceList)
{
var client = new ServiceBusClient(_serviceBusConnectionString);
var sender = client.CreateSender("emplyeeDatabase");
try
{
foreach (var employeeSource in employeeSourceList)
{
var employeeJson = JsonSerializer.Serialize(MapDatabaseToDomainEntity(employeeSource));
var serviceBusMessage = new ServiceBusMessage(employeeJson);
serviceBusMessage.ApplicationProperties.Add("OperationType", employeeSource.SYS_CHANGE_OPERATION);
await sender.SendMessageAsync(serviceBusMessage);
_logger.LogInformation($"EmplyeeSender #### Emplyoee sent #### SB topic '{_serviceBusTopicName} #### ID: '{employeeSource.Id}'");
}
}
finally
{
await sender.DisposeAsync();
await client.DisposeAsync();
}
}
5. Speichern der neuen Sync-Version
Zu guter Letzt wird die neue Sync-Version in der JSON
-Datei abgespeichert:
private void SaveLastSyncVersion(long lastSyncVersion)
{
// load and read Json-File
var filePath = Path.Combine(Directory.GetCurrentDirectory(), "SyncVersions", "EmployeeVersion.json");
var jsonContent = File.ReadAllText(filePath);
var jsonObject = JsonNode.Parse(jsonContent).AsObject();
jsonObject["EmployeeVersion"] = lastSyncVersion;
var updatedJson = jsonObject.ToJsonString();
File.WriteAllText(filePath, updatedJson);
_logger.LogInformation("EmplyeeSender #### Read #### Change Tracking #### Version: {0}", lastSyncVersion);
}
Das Ermitteln der neuen Sync-Version erfolgt im Worker selbst.
Abruf der Methoden im Worker Service
Die zuvor erstellten Methoden werden nun im Worker Service aufgerufen. Der Ablauf ist im Folgenden dargestellt:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// 1. Abruf der letzten Sync-Version
var lastSyncVersion = LoadLastSyncVersion();
// 2. Abruf der Änderungen von der Datenbank
var employeeChangesList = LoadChangesForEmplyoeeFromDatabase(lastSyncVersion);
if (employeeChangesList.Count != 0)
{
_logger.LogInformation($"EmplyeeSender #### Changes Detected #### Count: {employeeChangesList.Count}");
// 3. Mappen der Daten auf ein gemeinsames Domänen-Objekt
// 4. Senden der gemappten Daten an eine Middleware
await SendEmployeesToServiceBus(employeeChangesList);
// Determine new SyncVersion
var newSyncVersion = (long)employeeChangesList.Max(x => x.SYS_CHANGE_VERSION);
// 5. Speichern der neuen Sync-Version
SaveLastSyncVersion(newSyncVersion);
}
_logger.LogInformation("EmplyeeSender #### Wait 5 Seconds");
// Wait 5 seconds
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
Der Worker Service lädt zunächst die Change-Tracking-Versionsnummer aus der JSON
-Datei und ruft anschließend die Änderungen aus der Datenbank ab. Die ermittelten Datensätze werden dann auf die allgemeingültige Domänenrepräsentation gemappt und an eine Middleware (in unserem Fall Azure Service Bus) gesendet. Die neue Change-Tracking-Versionsnummer wird zum Schluss in der JSON
-Datei für den nächsten Durchlauf aktualisiert und der Worker Service wartet 5 Sekunden bis zum nächsten Durchlauf. Je nach Datenmenge bzw. Änderungen, welche sich in der SQL-Datenbank ergeben, kann dieser Wert hoch oder heruntergesetzt werden.