Issue
I'm facing an issue with Spring Kafka which is that it cannot access state store from process event I added that particular store into topology/streams.
method 1:
@Component
@RequiredArgsConstructor
@EnableKafkaStreams
@Order(2)
public class TimelineVersionUpdatedStream implements EventStream {
private static final Logger logger =
LoggerFactory.getLogger(TimelineVersionUpdatedStream.class);
@Autowired
private StreamConfiguration configuration;
@Autowired
private TimeLineChangesCaptureService timeLineChangesCaptureService;
@Autowired
public void TimelineVersionUpdatedProccess(StreamsBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
final SpecificAvroSerde<TimelineVersionUpdated> timelineVersionUpdatedSpecificAvroSerde = new SpecificAvroSerde<>();
timelineVersionUpdatedSpecificAvroSerde.configure(getSerdeConfig(), false);
final SpecificAvroSerde<PaymentChanged> paymentChangedSpecificAvroSerde = new SpecificAvroSerde<>();
paymentChangedSpecificAvroSerde.configure(getSerdeConfig(), false);
KeyValueStoreBuilder paymentStoreBuilder = new KeyValueStoreBuilder(
Stores.persistentKeyValueStore("demo-store-2"),
stringSerde,
paymentChangedSpecificAvroSerde,
new SystemTime());
KStream<String, TimelineVersionUpdated> stream = builder.stream(
Topics.MOS_BUDGET_TIMELINE_VERSION,
Consumed.with(
stringSerde,
timelineVersionUpdatedSpecificAvroSerde
));
StreamsBuilder stateStore = builder.addStateStore(paymentStoreBuilder);
stream.process(new ProcessorSupplier<>() {
@Override
public Processor<String, TimelineVersionUpdated> get() {
return new Processor<>() {
private ProcessorContext context;
private KeyValueStore<String, PaymentChanged> store;
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
this.store = context.getStateStore("demo-store-2");
}
@Override
public void process(String s, TimelineVersionUpdated timelineVersionUpdated) {
logger.info("TimelineVersionUpdatedStream.TimelineVersionUpdatedProccess record key {} value{}", s, timelineVersionUpdated.toString());
if (timelineVersionUpdated == null) {
return;
}
timeLineChangesCaptureService.captureTimeLineChanges(timelineVersionUpdated, store);
}
@Override
public void close() {
}
};
}
});
Topology topology = builder.build();
logger.info("{}", topology.describe().toString());
}
when I ran the above code I'm getting the below exception:
org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-PROCESSOR-0000000001
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:127) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:879) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:234) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:494) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:754) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:636) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523) ~[kafka-streams-2.7.2.jar:na]
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KSTREAM-PROCESSOR-0000000001 has no access to StateStore demo-store-2 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:149) ~[kafka-streams-2.7.2.jar:na]
at au.com.mybudget.mos.mostimelinekafkaetl.transport.stream.TimelineVersionUpdatedStream$1$1.init(TimelineVersionUpdatedStream.java:92) ~[classes/:na]
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.init(ProcessorAdapter.java:57) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$init$0(ProcessorNode.java:120) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:117) ~[kafka-streams-2.7.2.jar:na]
... 7 common frames omitted
Then I'm trying to add a store like below: Method 2:
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
final Serde<String> stringSerde = Serdes.String();
final SpecificAvroSerde<PaymentChanged> paymentChangedSpecificAvroSerde = new SpecificAvroSerde<>();
paymentChangedSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()), false);
return factoryBean -> {
try {
final StreamsBuilder streamsBuilder = factoryBean.getObject();
streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("store-demo-3"),
stringSerde,
paymentChangedSpecificAvroSerde
));
} catch (Exception e) {
logger.error("StreamsBuilderFactoryBeanCustomizer exception:{}", e.getMessage());
}
};
}
Then trying to access that store from the process but end up getting the same exception.
Kindly help to understand the issue.
Solution
Adding a state store to a Topology
is just the first step but it does not make it available: in order to allow a Processor
to use a state store, you must connect both.
The simplest way is to pass in the state store name when adding the Processor
:
stream.process(..., "storeName");
Answered By - Matthias J. Sax
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.