Describe the bug
If there are any malformed events in input connectors, then pipeline.wait_for_idle() never finished, blocks indefinitely.
The reason is that this method decides that pipeline is in idle state only when numbers of total processed records and total input records are equal. It never gonna happen if at least one record is malformed.
equal = total_input_records == total_processed_records
prev = (total_input_records, total_processed_records)
if unchanged and equal:
To Reproduce
Here is how I encountered it:
- Create a table in PostreSQL.
- Create connector for it in pipeline.
- Set up Debezium and Kafka Connect to stream changes from PostgreSQL to Feldera Pipeline.
- Insert some records to PostgreSQL table. Make sure that they got ingested and stored in Kafka topics.
- Change schema of the PostgreSQL table, use completely different column names.
- Start pipeline again.
- Try to do
pipeline.wait_for_idle(), it never finishes.
Expected behavior
Expected pipeline.wait_for_idle() to unblock, when all input records were ingested, even if some of them were malformed.
Context (please complete the following information):
- Feldera Version: 0.98.0
- Feldera Python lib Version: 0.98.0
Describe the bug
If there are any malformed events in input connectors, then
pipeline.wait_for_idle()never finished, blocks indefinitely.The reason is that this method decides that pipeline is in idle state only when numbers of total processed records and total input records are equal. It never gonna happen if at least one record is malformed.
To Reproduce
Here is how I encountered it:
pipeline.wait_for_idle(), it never finishes.Expected behavior
Expected
pipeline.wait_for_idle()to unblock, when all input records were ingested, even if some of them were malformed.Context (please complete the following information):