Funktionsverkettung mit Azure Durable Functions

post-thumb

Mit der Einführung von Azure Durable Function können neue Anwendungsfälle mithilfe von Azure Functions umgesetzt werden. Die Dokumentation von Microsoft bietet hierzu bereits einen guten Einblick und Einstieg in die Entwicklung. Zu den dargestellten Anwendungsmuster gehört z.B. die Funktionsverkettung (Function Chaining), also die Abfolge von Funktionen in einer Reihenfolge oder das Auffächern (fan out/) also der parallelen Ausführung von Funktionen.

Szenario

Wenn eine API verwendet wird, um die Daten abzurufen (GET) und diese wiederum nur den Abruf von Batches, also einer bestimmten Anzahl von Datensätzen pro Aufruf, erlaubt. Die API wiederum liefert neben dem eigentlichen Payload auch eine Information mit, ob noch weitere Daten abgerufen werden können. Es wird also ein Mechanismus benötigt, der die entsprechende API so oft aufruft, bis alle Daten abgerufen worden sind. Hierbei besteht die Herausforderung darin, dass der Umfang der Datenmenge unbekannt ist. Der Einsatz der klassischen Azure Function ist hier leider nur bedingt zu empfehlen. So kann eine solche Logik realisiert werden, allerdings funktioniert das nur so lange, bis der 10-Minuten-Timeout erreicht wird, da der Aufrufer so lange auf eine Antwort warten muss. Somit wäre das nur eine temporäre, aber nicht nachhaltige Lösung.

Die Funktionsverkettung der Azure Durable Functions bietet hier eine Lösungsmöglichkeit für das Problem.

Funktionsverkettung mit unbekannter Größe

Für die komplette Umsetzung werden drei Bereiche betrachtet. Zum einen muss der eigentliche Service angepasst werden, um ein Batchweises Abrufen von Daten zu ermöglichen und um den Startpunkt (also die erste ID der Entität) zu definieren. Zum anderen muss die aufrufende Activity prüfen, ob weitere Daten abgerufen werden können und zuletzt muss eine entsprechende Schleife im Orchestrator implementiert werden.

HTTP-Service

Unser angepasster Service nimmt nun als Parameter eine batchSize für die Anzahl der abzurufenden Datensätze entgegen, welcher sich idealerweise an der oberen Grenze der Drittanbieter-API orientiert. Zusätzlich wird als weiterer Parameter die afterId angegeben. Mit dieser kann angegeben werden, ab welcher ID der jeweiligen Entität erneut ein weiterer Batch abgerufen werden soll. Der erste Aufruf wird mit afterId und dem Wert 0 durchgeführt, somit wird der erste Batch abgerufen.

In diesem Code-Sample stammt die Info der afterId aus dem Header. Je nachdem wie die Drittanbieter API diese Information zurückliefert, muss diese auch entsprechend abgerufen werden.

Als Antwort gibt der Service ein Tupel aus einem Long (afterId) und dem tatsächlichen Result-Set der Entität zurück.

public async Task<Tuple<long, List<MyEntity>> GetAllEntities(int batchSize, long afterId)
{
    var after = string.Empty;
    if(afterId >= 0)
    {
        after = "&after=" + afterId;
    }

    // Abruf aller Datensätze
    var response = await this.httpClient.GetAsync(new Uri(this.requestUri) "&limit=" + batchSize + after);

    // Deserialize
    var responseResultJson = response.Content.ReadAsStringAsync()?.Result;
    var responseResultList = JsonConvert.DeserializeObject<List<MyEntity>>(responseResultJson);

    HttpHeaders headers = response.Headers;
    IEnumerable<long> headerValues;
    long afterIdResponse = 0;

    if (request.Headers.TryGetValues("X-AfterID", out headerValues))
    {
        afterIdResponse = headerValues.FirstOrDefault();
    }           

    return Tuple.Create(responseResultList, afterIdResponse);
}

Durable Activity

Nun kann die Activity soweit angepasst werden, als dass diese nun die afterId und den Boolean-Wert hasPaging an den Service weitergibt. Die Activity ermittelt nun, ob noch weitere Datensätze abgerufen werden können und gibt diese Information in Form des Boolean-Werts hasPaging an den Orchestrator zurück. Außerdem wird die afterId ebenfalls zurückgegeben. Beide Werte werden in einer neuen Datenstruktur (DurableChain) gespeichert.

 public class DurableChain
{
    public long AfterId { get; set; }

    public bool HasPaging { get; set; }
}

Die Activity ermittelt, ob weitere Datensätze abgerufen werden können und speichert diese in der DurableChain-Property HasPaging und gibt sowohl das DurableChain-Objekt, als auch die eigentliche Result-Liste der Entitäten zurück.

[FunctionName("GetMyEntities_Activity")]
public async Task<Tuple<DurableChain, List<CrmContact>>> GetAllContacts([ActivityTrigger] IDurableActivityContext context)
{
    var durableChain = context.GetInput<DurableChain>();

    var result = await this.contactService.GetAllDurableAsync(100, durableChain.AfterId);

    durableChain.AfterId = result.Item2;

    var myEntitiesList = result.Item1.ToList();

    durableChain.HasPaging = myEntitiesList.Count() == 100; 

    return Tuple.Create(durableChain, myEntitiesList);
}

Durable Orchestrator

Am Ende muss die Activity in der Orchestrator-Function noch korrekt aufgerufen werden. Die DurableChain wird für den ersten Aufruf vorbereitet: AfterId wird auf 0 und HasPaging auf false gesetzt.

Die Property HasPaging ist die Basis für die verwendete Do-While-Schleife. Solange die Eigenschaft auf true gesetzt ist, wird die Schleife ausgeführt und ruft im Zusammenspiel mit der Activitiy alle Datensätze auf.

[FunctionName("GetMyEntities_Activity_DurableFunction")]
public async Task<IList<MyEntity>> RunOrchestrator(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var output = new List<MyEntity>();

    // Settings for first batch
    var durableChain = new DurableChain
    {
        AfterId = 0,
        HasPaging = false,
    };

    do
    {
        var activity = await context.CallActivityAsync<Tuple<DurableChain, List<MyEntity>>>("GetMyEntities_Activity", DurableChain);

        DurableChain = activity.Item1;

        var resultList = activity.Item2;

        output.AddRange(resultList);
    }
    while(resultList.HasPaging); // HasPaging will be false, once all Batches will be retrieved

    return output;
}

Durch das await-Schlüsselwort wird zunächst jeder Aufruf auf die Activity Awaited und die Rückgabewerte (insbesondere AfterId) als Basis für den nächsten Aufruf verwendet. Hierdurch kann eine Funktionsverkettung mit unbekannter Länge bzw. unbekannter Anzahl an Aufrufen umgesetzt werden.

Lernen Sie uns kennen

Das sind wir

Wir sind ein Software-Unternehmen mit Hauptsitz in Karlsruhe und auf die Umsetzung von Digitalstrategien durch vernetzte Cloud-Anwendungen spezialisiert. Wir sind Microsoft-Partner und erweitern Standard-Anwendungen bei Bedarf – egal ob Modernisierung, Integration, Implementierung von CRM- oder ERP-Systemen, Cloud Security oder Identity- und Access-Management: Wir unterstützen Sie!

Mehr über uns

Der Objektkultur-Newsletter

Mit unserem Newsletter informieren wir Sie stets über die neuesten Blogbeiträge,
Webcasts und weiteren spannenden Themen rund um die Digitalisierung.

Newsletter abonnieren