Issue
I've seen an Apache Flink program of a class which implements SinkFunction that periodically uploads the data without proper synchronization primitive.
Is it consider dangerous?
Since the api reads
"Writes the given value to the sink. This function is called for every record."
I assume the given function can be called by multiple threads given the same instance, which could potentially cause race condition without potential locking mechanism or concurrent data structure. Is this the right interpretation?
private List<Record> bufferedRecords;
@Override
public void invoke(Point point, Context context) throws Exception {
bufferedRecords.add(point);
if (bufferedRecords.size() == batchSize) {
writeRecords(bufferedRecords);
bufferedRecords.empty();
}
}
Follow up: To make invoke thread safe, I see wrapping the entire function around a lock seems to be sufficient. Is there a better way to handle this situation without sacrificing the property that bufferedRecords
can not go over batchSize
and there is no missed or duplicated records?
Solution
All user-defined functions in Flink are only invoked by the same thread. You usually have one copy of such a function (through Serializable) per subtask/thread to exactly avoid costly synchronization.
So your Sink function is safe. However, when you cache values, you need to ensure to put them in the state if you rely on Flink's fault tolerance for exact results. If you use checkpointing, you should also be aware that no checkpoint can be done as long as bufferedRecords.empty()
blocks.
Answered By - Arvid Heise
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.