Issue
I'm currently working on a grpc server that will receive streaming grpc calls from the first server and redirect these calls to the second server, and redirect responses from the second server as streams to the first one.
I have 2 proto files first proto
First file:
syntax = "proto3";
package first.proto.pack;
service FirstProtoService {
rpc StreamingCall(stream RequestToFirstServer) returns (stream ResponseForFirstServer){}
}
message RequestToFirstServer {
oneof firstStreamingRequest {
int32 x = 1;
int32 y = 2;
}
}
message ResponseForFirstServer {
string someprocessedinformation = 1;
}
Second file:
syntax = "proto3";
package second.proto.pack;
service SecondProtoService {
rpc StreamingCall(stream RequestToSecondServer) returns (stream ResponseFromSecondServer){}
}
message RequestToSecondServer {
oneof secondStreamingRequest {
int32 processedX = 1;
int32 procesdedY = 2;
}
}
message ResponseFromSecondServer {
string computedInformation = 1;
}
First server knows about first proto file but doesn't know about second.
Second server knows about second proto file but doesn't know about first.
Middle server knows about first and second proto.
Need to write a server that will transmit requests from one server from one server to another
I started writing it on Java. But faced the problem of sending to much requests to second server
That how my service middle implementation looks on Java:
package middle.server.pack;
import first.proto.pack.First;
import first.proto.pack.FirstProtoServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import second.proto.pack.Second;
import second.proto.pack.SecondProtoServiceGrpc;
import java.util.logging.LogManager;
import java.util.logging.Logger;
public class MiddleService extends FirstProtoServiceGrpc.FirstProtoServiceImplBase {
private final ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:8080").build();
private final Logger logger = LogManager.getLogManager().getLogger(MiddleService.class.getName());
@Override
public StreamObserver<First.RequestToFirstServer> streamingCall(StreamObserver<First.ResponseForFirstServer> responseObserver) {
return new StreamObserver<First.RequestToFirstServer>() {
@Override
public void onNext(First.RequestToFirstServer value) {
SecondProtoServiceGrpc.SecondProtoServiceStub stub = SecondProtoServiceGrpc.newStub(channel);
StreamObserver<Second.RequestToSecondServer> requestObserver = stub.streamingCall(
new StreamObserver<Second.ResponseFromSecondServer>() {
@Override
public void onNext(Second.ResponseFromSecondServer value) {
doProcessOnResponse(value);
First.ResponseForFirstServer responseForFirstServer =
mapToFirstResponse(value);
responseObserver.onNext(responseForFirstServer);
}
@Override
public void onError(Throwable t) {
logger.info(t.getMessage());
}
@Override
public void onCompleted() {
logger.info("sucess");
}
}
);
Second.RequestToSecondServer requestToSecondServer = mapToSecondRequest(value);
requestObserver.onNext(requestToSecondServer);
requestObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
logger.info(t.getMessage());
}
@Override
public void onCompleted() {
logger.info("Everything okay");
}
};
}
}
After a request from the first client on the side of the middle server, I get the following errors:
CANCELLED: Failed to read message.
CANCELLED: io.grpc.Context was cancelled without error
I know that I am doing it wrong. So the question is how to make it right or if I can't make it on java could I make it in any other language?
Solution
There are several problems here:
- as pointed by @SergiiTkachenko, you create a new RPC to the second server per each message from the first. To solve this, Move the call to the second server 3 lines up to the beginning of the outer method.
- a call to
requestObserver.onCompleted()
should be moved toonCompleted()
ofStreamObserver<First.RequestToFirstServer>
few lines below (next tologger.info("Everything okay");
). - you never call
responseObserver.onCompleted()
. You should do so inonCompleted()
ofStreamObserver<Second.ResponseFromSecondServer>
(next tologger.info("sucess");
). - you should signal errors received from one server to the other (relation between
onError(...)
methods should be analogical toonCompleted()
) - you should handle cancellations by the first server using setOnCancelHandler(...) and propagate them to the second.
So generally speaking (and as @SergiiTkachenko also pointed), receiving a given callback (onNext
, onError
, onCompleted
, "onCancel
") from one of the servers should trigger issuing the corresponding call to the other server (after "translating" the argument where needed).
Finally, you should respect both servers' readiness using methods from CallStreamObserver: disableAutoInboundFlowControl(), request(1), isReady() and setOnReadyHandler(...). You can always cast your outbound StreamObserver
s to CallStreamObserver
s. More specifically, you should cast responseObserver
to ServerCallStreamObserver and requestObserver
to ClientCallStreamObserver.
This should be implemented criss-cross:
- You should
request(1)
message from a given server at the end of processing a message from it (inonNext()
) if the other serverisReady
. - receiving "
onReady
" callback from one server should triggerrequest(...)
-ing 1 message from the other server.
As far as I remember, after returning the observer from the outer method you should receive initial "onReady
" callbacks from the both servers, which will put everything in motion. However I'm not 100% sure about the callback from the second server and cannot verify it at the moment: in case you don't receive the initial callback from it, simply request 1 initial message from the first server before returning the observer.
Answered By - morgwai
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.