Issue
I have a simple Spring controller which have a Spring service as dependency. In the service class I have a static volatile field of type int called flag. When I invoke createFlux() method through my controller, the flag is set to 5 and then a new Flux is created, which checks the flag every second, and prints a message based on the flag value. Because of the delayElements method semantics the code will be executed in parallel. And after that if I invoke the changeFlag() method, which changes the value of the flag, and because the flag variable is volatile I expect the message which is printed to change but that is not happening.
Here is the code:
@RestController
public class MyController {
@Autowired private MyService myService;
@GetMapping("createFlux")
public void createFlux() {
myService.createFlux();
}
@GetMapping("changeFlag")
public void changeFlag() {
myService.changeFlag();
}
}
@Service
public class MyService {
private static volatile int flag = 3;
public void changeFlag() {
flag = 3;
System.out.println("############# Flag = " + flag);
}
public void createFlux() {
flag = 5;
System.out.println("Flag = " + flag);
Flux.generate(sink -> {
if (flag == 3) {
sink.next("Stop");
} else {
sink.next("Start");
}
}).delayElements(Duration.ofSeconds(1)).subscribe(s -> System.out.println(Thread.currentThread().getName() + " : " + s));
}
}
And this is the output in the console:
Flag = 5
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
############# Flag = 3
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Stop
parallel-2 : Stop
parallel-3 : Stop
parallel-4 : Stop
parallel-5 : Stop
parallel-6 : Stop
From the output it can be seen that it keeps printing the message Start even when the value of flag is changed to 3. And after a while the printed message is changed. I suppose that there is some caching or something like that, but the volatile variables are not cached.
The question is - is this a bug or I'm missing something?
Solution
The generate consumer is executed instantly for a bunch of elements. You can easily confirm that by adding a log right after the generate
:
Flux.generate(...).doOnNext(e -> log.info("executed: {}", e))
Prints:
2022-01-20 13:31:50,346 INFO parallel-1 - Flag = 5
2022-01-20 13:31:50,349 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
generate
method emits elements based on the demand from the downstream. It generates 32 elements first and buffers them. As and when downstream starts processing elements and when the buffer size drops the below threshold, it emits a few more elements.
Answered By - lkatiforis
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.