Skip to content

[python] pipeline.wait_for_idle blocks indefinitely, if there are any malformed events in input connectors #4340

@vladimir-vg

Description

@vladimir-vg

Describe the bug

If there are any malformed events in input connectors, then pipeline.wait_for_idle() never finished, blocks indefinitely.

The reason is that this method decides that pipeline is in idle state only when numbers of total processed records and total input records are equal. It never gonna happen if at least one record is malformed.

equal = total_input_records == total_processed_records
prev = (total_input_records, total_processed_records)
if unchanged and equal:

To Reproduce

Here is how I encountered it:

  1. Create a table in PostreSQL.
  2. Create connector for it in pipeline.
  3. Set up Debezium and Kafka Connect to stream changes from PostgreSQL to Feldera Pipeline.
  4. Insert some records to PostgreSQL table. Make sure that they got ingested and stored in Kafka topics.
  5. Change schema of the PostgreSQL table, use completely different column names.
  6. Start pipeline again.
  7. Try to do pipeline.wait_for_idle(), it never finishes.

Expected behavior
Expected pipeline.wait_for_idle() to unblock, when all input records were ingested, even if some of them were malformed.

Context (please complete the following information):

  • Feldera Version: 0.98.0
  • Feldera Python lib Version: 0.98.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    connectorsIssues related to the adapters/connectors crate

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions