Issue
I use final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create);
to combine two sources. Reading https://doc.akka.io/docs/akka/current/stream/operators/Source/combine.html does not provide an example of merging > 2 sources so I'm unsure if using null is a correct method of combining 2 sources.
When I run the below code 100.0 is continually printed. Each source computes the average of a sliding window of values where each window size is 3. The difference between each source is source1
utilises and source2
utilises 10. But source2
is not being executed as
sources.to(printSink).run(actorSystem); just outputs `100` - the first source result.
How to correctly combine source1
and source2
such that each source is executed?
src :
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Concat;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
public class MultipleStreams {
public static void main(String args[]) {
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");
final String json1 = "100";
Sink<Double, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);
final Source<Double, NotUsed> source1 = Source.repeat(json1).throttle(3, Duration.ofMillis(1000))
.sliding(3, 3)
.map(x -> {
final Double b = x.stream()
.mapToDouble(a -> Double.valueOf(a))
.sum() / x.size();
return b;
});
final String json2 = "10";
final Source<Double, NotUsed> source2 = Source.repeat(json2).throttle(3, Duration.ofMillis(1000))
.sliding(3, 3)
.map(x -> {
return x.stream()
.mapToDouble(a -> Double.valueOf(a))
.sum() / x.size();
});
final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create);
sources.to(printSink).run(actorSystem);
}
}
Solution
Concat
tries to empty the first source first.
Changing it to Merge
gives the Output
100.0
10.0
100.0
10.0
100.0
10.0
100.0
10.0
10.0
100.0
10.0
Answered By - Turo
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.