Issue
I am learning the akka framework and i am trying to close a websocket connection from server.
fun main() {
val system = ActorSystem.create("system")
val materializer = Materializer.createMaterializer(system)
val http = Http.get(system)
val routeFlow = path("ws") {
get {
parameter("as") { sys ->
parameter("id") { id ->
parameter("v") { v ->
handleWebSocketMessages(mainFlow(sys, id, v))
}
}
}
}
}.flow(system, materializer)
http.newServerAt("0.0.0.0", 8080).withMaterializer(materializer).bindFlow(routeFlow).thenRun {
println("Web socket server is running at localhost:8080")
}
}
fun mainFlow(sys: String, id: String, v: String): Flow<Message, Message, NotUsed> {
val source = Source.actorRef<Message>(
{ Optional.empty() },
{ Optional.empty() },
100,
OverflowStrategy.fail())
.mapMaterializedValue {
it.tell(PoisonPill.getInstance(), ActorRef.noSender())
}
val sink = Flow.create<Message>()
.map {
}
.to(Sink.onComplete {
})
return Flow.fromSinkAndSource(sink, source)
}
I am doing it here:
.mapMaterializedValue {
it.tell(PoisonPill.getInstance(), ActorRef.noSender())
}
I checked this Akka websocket - how to close connection by server?. But it is not working. I am getting an error:
PoisonPill message sent to StageActor(akka://system/system/Materializers/StreamSupervisor-1/$$c-actorRefSource) will be ignored, since it is not a real Actor.Use a custom message type to communicate with it instead.
What am i doing wrong and how to close it?
Solution:
fun mainFlow(connectionsController: ActorRef, sys: String, id: String, v: String, system: ActorSystem): Flow<Message, Message, NotUsed> {
val source = Source.actorRef<Message>(
{
if (it is Done) {
Optional.of(CompletionStrategy.immediately())
} else {
Optional.empty()
}
},
{ Optional.empty() },
16,
OverflowStrategy.fail())
.mapMaterializedValue {
it.tell(Done.done(), ActorRef.noSender())
}
val sink = Flow.create<Message>()
.map {
}
.to(Sink.onComplete {
})
return Flow.fromSinkAndSource(sink, source)
}
Solution
The server will close the connection when its upstream completes.
The previous answer relied on an older version's behavior around PoisonPill
, which would trigger completion.
With a Source.actorRef
, the first function argument is the completion matcher: send a message for which that function returns a non-empty Optional
and the source will complete. The content of that Optional
is the completion strategy, which governs whether the source completes the stream immediately or whether it will drain its buffer.
I'm not that familiar with Kotlin, so I can't provide any code, but I think the mapMaterializedValue
is not where you want to close the connection, as that code will execute before the source has a chance to send any messages across the websocket.
Answered By - Levi Ramsey
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.