Observable concatMap() vs concatAll()

Ich wollte sicherstellen, dass mit Cordova File Plugin keine Einträge verlorengehen, da die Write-Operationen asynchron sind und vielleicht beim Schreiben mit Append auch Race-Conditions entstehen. Also habe ich einige Test-Cases geschrieben und festgestellt, dass concatMap() und concatAll() zusammen mit rxjs-Observables nicht ganz identisch sind.

Eigentlich ist der Test-Case einfach:

„write-append to file“ soll nun also keine Einträge verlieren und sich nicht selber überschreiben, z.B. wenn der vierte Schreibvorgang schneller wäre, als der dritte.

Ein vereinfachter Test-Case mit Observables und console.log

subscribe()

Um den asynchronen write-append Vorgang zu simulieren, verwende ich hier eine Promise, welche durch zufallsmässig verlangsamt wird, damit nicht die Zahlen 0-9 sortiert vorkommen.

Wie erwartet gibt dies eine unsortierte Liste:

concatAll().subscribe()

Eigentlich will man nun ja eine sortierte Liste schreiben und somit war mein erster Ansatz concatAll() zu verwenden. Also alle inner-observables so auszuführen, dass das nächste Observable erst ausgeführt wird, wenn das vorhergehende beendet ist.

Das ergibt dann folgende Reihenfolge

Hmmm. Damit habe ich nicht gerechnet. Die Subscriptions kommen schon sortiert zurück, aber die inneren Observables werden gemäss dem Delay ausgeführt. Ja, klar. Ist ja asynchron!

Also Doku lesen:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/concatall.md

Concatenates a sequence of observable sequences or promises into a single observable sequence.

Noch mehr Hmmmmmmm. 

Also noch mehr Doku lesen.

http://reactivex.io/documentation/operators/concat.html

concatAll is an instance method that operates on an Observable of Observables, concatinating each of these Observables in the order they are emitted.

Aha. „in the order they are emitted“. Der Random-Delay von

 führt ja genau dazu, dass die ursprüngliche Reihenfolge nicht eingehalten soll und wird. 

Also brauchte ich etwas, dass nicht „order emitted„, sondern „order created and finished“ macht.

Da concatAll() und concatMap() häufig gleichgestellt wird, schaute ich mir concatMap genauer an.

Zuerst Doku lesen:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/concatmap.md

Projects each element of the source observable sequence to the other observable sequence or Promise or array/iterable and merges the resulting observable sequences into one observable sequence.

Hmmm. Hmmm. Aber das Beispiel lässt erahnen, dass die äussere Observable Sequenz massgebend ist. 

concatMap().subscribe()

Also schnell testen:

Und das ergibt dann folgendes Resultat:

Genau was ich brauche.

concatMap() und concatAll() sind nicht gleich

Meistens werden concatMap() und concatAll() als Alternative beschrieben.

Der Punkt ist aber , dass concatAll() nach dem map() auf dem Source-Observable ausgeführt wird. concatMap() hingegen ersetzt map().

Nach meinem Verständnis garantiert concatAll() die Sequenz der inneren Observables und concatMap() auch die Sequenz des äusseren Observables.

Das heisst aber auch, dass ein RIESEN Performance Unterschied bestehen muss. concapMap() ist rein sequenziell. Testen wir das, in dem wir den Delay auf 1000*next ms setzen. Damit man den Unterschied der Ausführung des inneren Observables sieht, drehen wir die Zeit noch um.

 

mit concatMap() ist die Runtime somit gleich der Summe der Delays (plus etwas Overhead), also rund 10+9+8..+1 Sekunden. Von 13:47:14 bis 13:48:10 und geschrieben wird 0..9

 

mit concatAll() ist die Runtime somit gleich dem längsten Delay (plus etwas Overhead), also rund 10 Sekunden. Von 13:49:18 bis 13:49:28 und geschrieben wird 9..0

Fazit

Die sortierte Liste, sprich das sequenzielle Ausführen geht nur mit concatMap(). Das kostet aber Faktoren an Performance, wenn die inneren Observables längere Ausführungszeiten haben!

Urs Verfasst von: