Issue
I have in my app a reactive endpoint that returns a flux from database that never ends unless communication from the front stops. What I want to accomplish is to load first element as soon as possible and then throttle next elements by adding some delay. What I accomplished so far is to delay all elements that first one is loaded after same delay as every next one.
var connection = Mono.fromFuture(
() -> dataCollector.getDataFromCassandra((CassandraExposureRetrieveRequest) command.getBody()))
.delayElement(Duration.ofMillis(delayExposure))
.repeat()
.share();
Does anyone know if there is a rather simple way to delay second, third etc. element from the flux ?
Kind regards, Bartek
Solution
Since your elements come from a Mono that is repeated multiple times, you can achieve this my prepending your Flux with a single non-delayed database call Mono:
var singleCall = Mono.fromFuture(
() -> dataCollector.getDataFromCassandra((CassandraExposureRetrieveRequest) command.getBody()));
var connection = singleCall
.concatWith(singleCall.delayElement(Duration.ofMillis(delayExposure))
.repeat())
.share();
Edit: By request, here is a solution if the data source is a Flux instead of a repeated Mono:
Flux solution
To split the first element from all other elements, I use Flux.switchOnFirst(BiFunction)
to modify the Flux when the first signal is received. The BiFunction
receives the first signal and the original Flux and maps it to the modified Flux. In this case, I map the original Flux to a Flux with its first element removed, then throttling all remaining elements , and lastly the first element is re-added at the start from the signal.
var connection = originalFlux.switchOnFirst((signal, flux) -> {
if (signal.hasValue()) {
return flux.skip(1)
.delaySequence(Duration.ofMillis(delayExposure))
.startWith(signal.get());
}
return flux;
});
Note that the BiFunction receives the first signal, which can also be an onError or an onComplete. To handle these, the Flux is only modified if the Signal has a value, else the Flux is returned unmodified.
This solution works for any operation that has to modify all the elements except the first one, for that replace the .delaySequence
line with your desired operation(s).
Answered By - Patrick Hooijer
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.