In den letzten zwei Blogbeiträgen habe ich die Datenmigration vom SQL-Server thematisiert. Im ersten Teil wurden die Funktionalitäten rund um Change Tracking auf dem SQL-Server erläutert und im zweiten Teil habe ich gezeigt, wie mit Worker Service die Änderungen abgerufen und an den Azure Service Bus weitergeleitet werden.
Hier entlang zum Nachlesen:
In diesem Beitrag möchte ich den Weg zurück betrachten. Das bedeutet, dass ein bidirektionaler Austausch der Daten zwischen dem SQL-Server und dem Drittsystem erfolgt. Welche Herausforderungen auf uns warten und wie man diese am besten bewältigt, zeige ich erneut in einer beispielhaften Implementierung.
Datenverkehr in beide Richtungen: Two-Way-Sync
Das Senden und Empfangen von Daten wird auch als Two-Way-Synchronisation bezeichnet. Die Komplexität ist im Vergleich zu einem One-Way-Sync natürlich deutlich höher. So können durch die Anpassbarkeit der Datensätze in beiden Systemen Inkonsistenzen entstehen, welche die Sync-Logik im besten Fall selbstständig lösen kann. Für gewöhnlich agiert in einem solchen Szenario ein System als führend, wessen Änderungen an den Datensätzen dann entsprechend vorgezogen wird.
Zusätzlich zur bestehenden Sende-Logik aus der One-Way-Sync-Demo
muss diese um eine Empfangslogik erweitert werden:
- Empfangen der Domänen-Entität
- Mappen auf Ziel-Entität
- Speichern der Änderungen in Datenbank (Erstellung, Aktualisierung, Löschung)
Wir gehen in diesem Szenario davon aus, dass das führende System die Anwendung mit dem SQL-Server ist. Die Drittsysteme kennen somit die Primärschlüssel der SQL-Datenbank und senden diese stets mit, wodurch entsprechende Aktionen an der Datenbank vorgenommen werden können.
Zusätzlich muss die Stored Procedure für die Sende-Logik angepasst werden, damit einkommende Änderungen nicht von der Sende-Logik fälschlicherweise als solche erkannt und gesendet werden.
Erweiterung der Demo um einen Two-Way-Sync
Neben dem Erstellen eines komplett neuen Workers (ReceiveWorker
) wird auch die Stored Procedure entsprechend angepasst.
Folgende Schritte sind für die Empfangslogik zu implementieren:
- Abruf der Änderungen vom Azure Service Bus
- Mappen der Domänen-Entität auf Ziel-Entität
- Speicherung der Änderung in Datenbank
- Erstellen (Create)
- Ändern (Update)
- Löschen (Delete)
Anpassen der Stored Procedure
Die Stored Procedure wird um eine WHERE
-Clause erweitert, in der geprüft wird, ob ein Änderungskontext (im Rahmen von Change Tracking) gesetzt wurde:
ALTER PROCEDURE [dbo].[GetEmplyeeChanges] @last_synchronization_version bigint
AS
SELECT
CT.[Id],
Employee.[Firstname],
Employee.[Lastname],
Employee.[Current Department],
CT.SYS_CHANGE_OPERATION,
CT.SYS_CHANGE_VERSION,
CT.SYS_CHANGE_CREATION_VERSION,
CT.SYS_CHANGE_CONTEXT,
FROM
CHANGETABLE(CHANGES Employee, @last_synchronization_version) AS CT
LEFT OUTER JOIN [wf-sample].[dbo].[Employee] AS Employee
ON CT.id = Employee.Id
WHERE
COALESCE(DATALENGTH(CT.SYS_CHANGE_CONTEXT), 0) = 0
1. Abruf der Änderungen vom Azure Service Bus
Der erste Schritt der Empfangslogik bildet den Empfang von Änderungen über den Service Bus ab. Hierzu soll vom Service Bus auf dem jeweiligen Empfangs-Topic (in unserem Fall EmployeeThirdParty
) eine Nachricht abgerufen werden.
private async Task<ServiceBusReceivedMessage?> RecieveMessageAsync(CancellationToken stoppingToken)
{
var client = new ServiceBusClient(_serviceBusConnectionString);
ServiceBusReceiver receiver = client.CreateReceiver(_serviceBusTopicName, "EmployeeThirdParty", new ServiceBusReceiverOptions
{
ReceiveMode = ServiceBusReceiveMode.PeekLock
});
return await receiver.ReceiveMessageAsync(cancellationToken: stoppingToken);
}
Im Worker selbst muss später geprüft werden, ob die Nachricht einen Inhalt hat oder leer (null
) ist, bevor weitere Aktionen ausgeführt werden.
2. Mappen der Domänen-Entität auf Ziel-Entität
Nach dem Empfang der Nachricht vom Service Bus muss die Domänen-Entität auf die Zielentität gemappt werden. In unserem Fall umfasst das Ganze vier Attribute, die gemeinsam mit dem Deserialisieren der Nachricht umgesetzt wird:
private EmployeeSource EmployeeMap2Domaintype(ServiceBusReceivedMessage message)
{
var employeeDomain = JsonSerializer.Deserialize<Employee>(message.Body.ToString()) ?? throw new Exception("EmplyeeReceiver #### Read Changes from Service Bus #### Employee is NULL!");
return new EmployeeSource
{
Id = employeeDomain.Id,
Firstname = employeeDomain.Firstname,
Lastname = employeeDomain.Lastname,
CurrentDepartment = employeeDomain.CurrentDepartment
};
}
3. Speicherung Änderung in Datenbank
Im letzten Schritt müssen die Änderungen auf die Datenbank angewandt werden. Hierzu gehören Methoden, mit denen man alle Operationen (Create, Update, Delete) durchführen kann. Der Aufbau ist jeweils sehr ähnlich: Es werden SQL-Transaktionen mit entsprechenden Operationen durchgeführt, welche im Fehlerfall einen Rollback ausführen. Dabei wird auch jeweils ein Änderungskontext gesetzt, welcher von der Stored Procedure, die für das Laden der Änderungen verwendet wird, berücksichtigt wird. Somit werden die einkommenden Änderungen nicht wieder abgerufen.
Erstellen (Create)
private void InsertChangesToDB(EmployeeSource employee)
{
using (var sqlConnection = new SqlConnection(_databaseConnectionString))
{
sqlConnection.Open();
SqlTransaction transaction = sqlConnection.BeginTransaction();
try
{
using (SqlCommand insertCommand = sqlConnection.CreateCommand())
{
insertCommand.Transaction = transaction;
// Insert Employee
insertCommand.CommandText =
"DECLARE @varSyncContext VARBINARY(100) = convert(varbinary, 'Two-Way-Sync'); " +
"WITH CHANGE_TRACKING_CONTEXT (@varSyncContext) " +
"INSERT INTO [dbo].[Employee] ( " +
"Firstname, " +
"Lastname, " +
"Current Department " +
") " +
"VALUES ( " +
"@Firstname, " +
"@Lastname, " +
"@CurrentDepartment " +
")";
insertCommand.Parameters.AddWithValue("@Firstname", employee.Firstname);
insertCommand.Parameters.AddWithValue("@Lastname", employee.Lastname);
insertCommand.Parameters.AddWithValue("@CurrentDepartment", employee.CurrentDepartment.ToString());
// Execute the INSERT-Command
insertCommand.ExecuteNonQuery();
}
transaction.Commit();
}
catch (Exception ex)
{
// Transaction-Rollback on Error
transaction.Rollback();
_logger.LogError(ex, "Error #### Processing Database Operations ### Rollback initiaded");
throw;
}
finally
{
sqlConnection.Close();
}
}
}
Ändern (Update)
private void UpdateChangesToDB(EmployeeSource employee)
{
using (var sqlConnection = new SqlConnection(_databaseConnectionString))
{
sqlConnection.Open();
SqlTransaction transaction = sqlConnection.BeginTransaction();
try
{
using (SqlCommand updateCommand = sqlConnection.CreateCommand())
{
updateCommand.Transaction = transaction;
// Update Employee
updateCommand.CommandText =
"DECLARE @varSyncContext VARBINARY(100) = convert(varbinary, 'Two-Way-Sync'); " +
"WITH CHANGE_TRACKING_CONTEXT (@varSyncContext) " +
"UPDATE [dbo].[Employee] " +
"SET " +
"Firstname = @Firstname, " +
"Lastname = @Lastname, " +
"Current Department = @CurrentDepartment" +
"WHERE " +
"Id = @Id;";
updateCommand.Parameters.AddWithValue("@Id", employee.Id);
updateCommand.Parameters.AddWithValue("@Firstname", employee.Firstname);
updateCommand.Parameters.AddWithValue("@Lastname", employee.Lastname);
updateCommand.Parameters.AddWithValue("@CurrentDepartment", employee.CurrentDepartment.ToString());
// Execute the UPDATE-Command
updateCommand.ExecuteNonQuery();
}
transaction.Commit();
}
catch (Exception ex)
{
// Transaction-Rollback on Error
transaction.Rollback();
_logger.LogError(ex, "Error #### Processing Database Operations ### Rollback initiaded");
throw;
}
finally
{
sqlConnection.Close();
}
}
}
Löschen (Delete)
private void DeleteChangesToDB(EmployeeSource employee)
{
using (var sqlConnection = new SqlConnection(_databaseConnectionString))
{
sqlConnection.Open();
SqlTransaction transaction = sqlConnection.BeginTransaction();
try
{
using (SqlCommand deleteCommand = sqlConnection.CreateCommand())
{
deleteCommand.Transaction = transaction;
// Delete Employee
deleteCommand.CommandText =
"DECLARE @varSyncContext VARBINARY(100) = convert(varbinary, 'Two-Way-Sync'); " +
"WITH CHANGE_TRACKING_CONTEXT (@varSyncContext) " +
"DELETE FROM [dbo].[Employee] " +
"WHERE " +
"Id = @Id;";
deleteCommand.Parameters.AddWithValue("@Id", employee.Id);
// Execute the DELETE-Command
deleteCommand.ExecuteNonQuery();
}
transaction.Commit();
}
catch (Exception ex)
{
// Transaction-Rollback on Error
transaction.Rollback();
_logger.LogError(ex, "Error #### Processing Database Operations ### Rollback initiaded");
throw;
}
finally
{
sqlConnection.Close();
}
}
}
Anpassen Receive-Worker-Service für den Two-Way-Sync
Der Worker Service für den Empfang sieht dann so aus:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 1. Abruf der Änderungen vom Azure Service Bus
var message = await RecieveMessageAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
if (message == null)
{
continue;
}
// 2. Mappen der Domänen-Entität auf Ziel-Entität
var employee = EmployeeMap2Domaintype(message);
var operationType = (string)message.ApplicationProperties["OperationType"];
// 3. Änderung in Datenbank speichern
switch (operationType)
{
case "I": // Insert
InsertChangesToDB(employee);
break;
case "U": // Update
UpdateChangesToDB(employee);
break;
case "D": // Delete
DeleteChangesToDB(employee);
break;
default:
break;
}
_logger.LogInformation("EmplyeeReceiver #### Wait 5 Seconds");
// Wait 5 seconds
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
Der Worker Service lädt die Nachricht vom Service Bus und mappt den Inhalt auf die Ziel-Entität. Durch die Angabe des Änderungstyps aus der Service-Bus-Nachricht wir die entsprechende Methode im switch-case
aufgerufen und eine Datenbankooperation ausgeführt. Durch das Setzen des Änderungskontextes wird der aktualisierte Datensatz vom Change Tracking bzw. der dazugehörenden Stored Procedure beim Senden ignoriert. So entstehen keine Endlosschleifen.
Die Konflikterkennung wurde in diesem Beitrag nur erwähnt, aber nicht in der Implementierung berücksichtigt. Da die Konflikterkennung mitunter sehr komplex werden kann, möchte ich dieses Thema in einem zukünftigen Blogbeitrag behandeln.