Mit der Einführung von Azure Durable Function können neue Anwendungsfälle mithilfe von Azure Functions umgesetzt werden. Die Dokumentation
von Microsoft bietet hierzu bereits einen guten Einblick und Einstieg in die Entwicklung. Zu den dargestellten Anwendungsmuster gehört z.B. die Funktionsverkettung (Function Chaining), also die Abfolge von Funktionen in einer Reihenfolge oder das Auffächern (fan out/) also der parallelen Ausführung von Funktionen.
Szenario
Wenn eine API verwendet wird, um die Daten abzurufen (GET) und diese wiederum nur den Abruf von Batches, also einer bestimmten Anzahl von Datensätzen pro Aufruf, erlaubt. Die API wiederum liefert neben dem eigentlichen Payload auch eine Information mit, ob noch weitere Daten abgerufen werden können. Es wird also ein Mechanismus benötigt, der die entsprechende API so oft aufruft, bis alle Daten abgerufen worden sind. Hierbei besteht die Herausforderung darin, dass der Umfang der Datenmenge unbekannt ist. Der Einsatz der klassischen Azure Function ist hier leider nur bedingt zu empfehlen. So kann eine solche Logik realisiert werden, allerdings funktioniert das nur so lange, bis der 10-Minuten-Timeout erreicht wird, da der Aufrufer so lange auf eine Antwort warten muss. Somit wäre das nur eine temporäre, aber nicht nachhaltige Lösung.
Die Funktionsverkettung der Azure Durable Functions bietet hier eine Lösungsmöglichkeit für das Problem.
Funktionsverkettung mit unbekannter Größe
Für die komplette Umsetzung werden drei Bereiche betrachtet. Zum einen muss der eigentliche Service angepasst werden, um ein Batchweises Abrufen von Daten zu ermöglichen und um den Startpunkt (also die erste ID der Entität) zu definieren. Zum anderen muss die aufrufende Activity prüfen, ob weitere Daten abgerufen werden können und zuletzt muss eine entsprechende Schleife im Orchestrator implementiert werden.
HTTP-Service
Unser angepasster Service nimmt nun als Parameter eine batchSize
für die Anzahl der abzurufenden Datensätze entgegen, welcher sich idealerweise an der oberen Grenze der Drittanbieter-API orientiert. Zusätzlich wird als weiterer Parameter die afterId
angegeben. Mit dieser kann angegeben werden, ab welcher ID der jeweiligen Entität erneut ein weiterer Batch abgerufen werden soll. Der erste Aufruf wird mit afterId
und dem Wert 0 durchgeführt, somit wird der erste Batch abgerufen.
In diesem Code-Sample stammt die Info der afterId
aus dem Header. Je nachdem wie die Drittanbieter API diese Information zurückliefert, muss diese auch entsprechend abgerufen werden.
Als Antwort gibt der Service ein Tupel aus einem Long (afterId
) und dem tatsächlichen Result-Set der Entität zurück.
public async Task<Tuple<long, List<MyEntity>> GetAllEntities(int batchSize, long afterId)
{
var after = string.Empty;
if(afterId >= 0)
{
after = "&after=" + afterId;
}
// Abruf aller Datensätze
var response = await this.httpClient.GetAsync(new Uri(this.requestUri) "&limit=" + batchSize + after);
// Deserialize
var responseResultJson = response.Content.ReadAsStringAsync()?.Result;
var responseResultList = JsonConvert.DeserializeObject<List<MyEntity>>(responseResultJson);
HttpHeaders headers = response.Headers;
IEnumerable<long> headerValues;
long afterIdResponse = 0;
if (request.Headers.TryGetValues("X-AfterID", out headerValues))
{
afterIdResponse = headerValues.FirstOrDefault();
}
return Tuple.Create(responseResultList, afterIdResponse);
}
Durable Activity
Nun kann die Activity soweit angepasst werden, als dass diese nun die afterId
und den Boolean-Wert hasPaging
an den Service weitergibt. Die Activity ermittelt nun, ob noch weitere Datensätze abgerufen werden können und gibt diese Information in Form des Boolean-Werts hasPaging
an den Orchestrator zurück. Außerdem wird die afterId
ebenfalls zurückgegeben. Beide Werte werden in einer neuen Datenstruktur (DurableChain
) gespeichert.
public class DurableChain
{
public long AfterId { get; set; }
public bool HasPaging { get; set; }
}
Die Activity ermittelt, ob weitere Datensätze abgerufen werden können und speichert diese in der DurableChain
-Property HasPaging
und gibt sowohl das DurableChain
-Objekt, als auch die eigentliche Result-Liste der Entitäten zurück.
[FunctionName("GetMyEntities_Activity")]
public async Task<Tuple<DurableChain, List<CrmContact>>> GetAllContacts([ActivityTrigger] IDurableActivityContext context)
{
var durableChain = context.GetInput<DurableChain>();
var result = await this.contactService.GetAllDurableAsync(100, durableChain.AfterId);
durableChain.AfterId = result.Item2;
var myEntitiesList = result.Item1.ToList();
durableChain.HasPaging = myEntitiesList.Count() == 100;
return Tuple.Create(durableChain, myEntitiesList);
}
Durable Orchestrator
Am Ende muss die Activity in der Orchestrator-Function noch korrekt aufgerufen werden. Die DurableChain
wird für den ersten Aufruf vorbereitet: AfterId
wird auf 0 und HasPaging
auf false
gesetzt.
Die Property HasPaging
ist die Basis für die verwendete Do-While-Schleife. Solange die Eigenschaft auf true
gesetzt ist, wird die Schleife ausgeführt und ruft im Zusammenspiel mit der Activitiy alle Datensätze auf.
[FunctionName("GetMyEntities_Activity_DurableFunction")]
public async Task<IList<MyEntity>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var output = new List<MyEntity>();
// Settings for first batch
var durableChain = new DurableChain
{
AfterId = 0,
HasPaging = false,
};
do
{
var activity = await context.CallActivityAsync<Tuple<DurableChain, List<MyEntity>>>("GetMyEntities_Activity", DurableChain);
DurableChain = activity.Item1;
var resultList = activity.Item2;
output.AddRange(resultList);
}
while(resultList.HasPaging); // HasPaging will be false, once all Batches will be retrieved
return output;
}
Durch das await
-Schlüsselwort wird zunächst jeder Aufruf auf die Activity Awaited und die Rückgabewerte (insbesondere AfterId
) als Basis für den nächsten Aufruf verwendet. Hierdurch kann eine Funktionsverkettung mit unbekannter Länge bzw. unbekannter Anzahl an Aufrufen umgesetzt werden.