Add e2e test suite for Airflow event-driven DAGs with Apache Kafka#64833
Add e2e test suite for Airflow event-driven DAGs with Apache Kafka#64833jason810496 wants to merge 7 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds an event-driven (Kafka-backed) E2E test suite for Airflow 3 Assets/AssetWatchers and wires it into Breeze selective checks + CI workflows so the suite runs when relevant files change.
Changes:
- Introduces Kafka producer/consumer example DAGs and a new E2E test validating triggers, retries/DLQ behavior, and Kafka offsets.
- Adds Kafka docker-compose + init script and updates the E2E harness to spin up Kafka and create required topics.
- Extends Breeze selective checks and GitHub workflows to support an
event_drivenE2E test mode.
Reviewed changes
Copilot reviewed 14 out of 15 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| dev/breeze/src/airflow_breeze/utils/selective_checks.py | Adds a new selective-checks file group + toggle for event-driven E2E tests. |
| dev/breeze/src/airflow_breeze/commands/testing_commands.py | Extends --e2e-test-mode choices with event_driven. |
| dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt | Updates generated CLI help output artifact. |
| dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg | Updates generated CLI help output artifact (includes event_driven). |
| airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/test_event_driven.py | Adds Kafka event-driven E2E test validating DAG triggers + Kafka offsets. |
| airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/init.py | Adds package init for the new test module. |
| airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py | Adds list_dag_runs helper used by the new test. |
| airflow-e2e-tests/tests/airflow_e2e_tests/dags/example_event_driven.py | Adds example producer/consumer event-driven DAGs (Assets + Kafka trigger + DLQ). |
| airflow-e2e-tests/tests/airflow_e2e_tests/constants.py | Adds Kafka docker directory path constant. |
| airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py | Adds event_driven mode setup, Kafka file copying, and topic creation. |
| airflow-e2e-tests/docker/kafka/update_run.sh | Adds broker init script for Kafka container startup. |
| airflow-e2e-tests/docker/kafka.yml | Adds Kafka broker docker-compose definition for E2E runs. |
| .github/workflows/ci-amd-arm.yml | Plumbs through the new selective-check output for event-driven E2E tests. |
| .github/workflows/airflow-e2e-tests.yml | Updates workflow input docs to include event_driven mode. |
| .github/workflows/additional-prod-image-tests.yml | Adds a new job to run event-driven E2E tests when selected. |
351c307 to
f72d2aa
Compare
| # 5. All 9 DAG runs should succeed | ||
| for run in consumer_runs: | ||
| assert run["state"] == "success", ( | ||
| f"Expected all consumer runs to succeed, but run {run['dag_run_id']} " | ||
| f"has state '{run['state']}'" | ||
| ) | ||
|
|
There was a problem hiding this comment.
This test asserts every consumer DAG run is success, but later it expects one run to have a process_message task in failed state. In Airflow, a DagRun is marked failed if any task instance is in a failed state once the run finishes (see DagRun.update_state). Either adjust the expectations to allow one failed DagRun, or change the consumer DAG logic so the malformed message path does not leave a task instance in failed while still producing to the DLQ.
| # 5. All 9 DAG runs should succeed | |
| for run in consumer_runs: | |
| assert run["state"] == "success", ( | |
| f"Expected all consumer runs to succeed, but run {run['dag_run_id']} " | |
| f"has state '{run['state']}'" | |
| ) | |
| # 5. Expect 8 successful consumer runs and 1 failed run for the malformed message | |
| successful_runs = [run for run in consumer_runs if run["state"] == "success"] | |
| failed_runs = [run for run in consumer_runs if run["state"] == "failed"] | |
| assert len(successful_runs) == 8, ( | |
| f"Expected 8 successful consumer runs, got {len(successful_runs)}: " | |
| f"{[(run['dag_run_id'], run['state']) for run in consumer_runs]}" | |
| ) | |
| assert len(failed_runs) == 1, ( | |
| f"Expected 1 failed consumer run for the DLQ path, got {len(failed_runs)}: " | |
| f"{[(run['dag_run_id'], run['state']) for run in consumer_runs]}" | |
| ) |
| r"^providers/apache-kafka/.*", | ||
| r"^providers/common-messaging/.*", |
There was a problem hiding this comment.
The provider path regexes here don’t match the repository layout. Providers live under providers/apache/kafka/ and providers/common/messaging/, so these patterns will never match and CI selective-checks won’t enable the event-driven e2e suite when the Kafka/common-messaging providers change. Update the regexes to the correct directory paths.
| r"^providers/apache-kafka/.*", | |
| r"^providers/common-messaging/.*", | |
| r"^providers/apache/kafka/.*", | |
| r"^providers/common/messaging/.*", |
|
|
||
|
|
||
| def _copy_kafka_files(tmp_dir): | ||
| """Copy Kafka compose file, init script, and provider source into the temp directory.""" |
There was a problem hiding this comment.
This docstring says it copies “provider source” into the temp directory, but the function only copies kafka.yml and update_run.sh. Please either update the docstring to match what’s actually being copied, or add the missing copy step if provider source is required for the integration.
| """Copy Kafka compose file, init script, and provider source into the temp directory.""" | |
| """Copy the Kafka compose file and update script into the temp directory.""" |
| """Airflow Kafka connection | ||
| AIRFLOW_CONN_KAFKA_DEFAULT='{ | ||
| "conn_type": "general", | ||
| "extra": { | ||
| "bootstrap.servers": "broker:29092", | ||
| "group.id": "kafka_default_group", | ||
| "security.protocol": "PLAINTEXT", | ||
| "enable.auto.commit": false, | ||
| "auto.offset.reset": "latest" | ||
| } | ||
| }' | ||
| """ | ||
| """Kafka Command to verify messages are being produced to the topic: | ||
|
|
||
| # Create Topic | ||
| /bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz | ||
| /bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlq | ||
|
|
||
|
|
||
| # Get offsets for the topic to verify messages are being produced | ||
| /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic fizz_buzz | ||
| /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic dlq | ||
|
|
||
| # List consumer groups to verify our consumer group is being registered | ||
| /bin/kafka-consumer-groups --bootstrap-server broker:29092 --list |
There was a problem hiding this comment.
These triple-quoted strings are standalone expression statements (not module docstrings), so they execute as no-ops at import time and are easy to miss/forget. Convert them to real comments, a proper module docstring, or constants referenced from documentation/tests.
| """Airflow Kafka connection | |
| AIRFLOW_CONN_KAFKA_DEFAULT='{ | |
| "conn_type": "general", | |
| "extra": { | |
| "bootstrap.servers": "broker:29092", | |
| "group.id": "kafka_default_group", | |
| "security.protocol": "PLAINTEXT", | |
| "enable.auto.commit": false, | |
| "auto.offset.reset": "latest" | |
| } | |
| }' | |
| """ | |
| """Kafka Command to verify messages are being produced to the topic: | |
| # Create Topic | |
| /bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz | |
| /bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlq | |
| # Get offsets for the topic to verify messages are being produced | |
| /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic fizz_buzz | |
| /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic dlq | |
| # List consumer groups to verify our consumer group is being registered | |
| /bin/kafka-consumer-groups --bootstrap-server broker:29092 --list | |
| # Airflow Kafka connection | |
| # AIRFLOW_CONN_KAFKA_DEFAULT='{ | |
| # "conn_type": "general", | |
| # "extra": { | |
| # "bootstrap.servers": "broker:29092", | |
| # "group.id": "kafka_default_group", | |
| # "security.protocol": "PLAINTEXT", | |
| # "enable.auto.commit": false, | |
| # "auto.offset.reset": "latest" | |
| # } | |
| # }' | |
| # | |
| # Kafka commands to verify messages are being produced to the topic: | |
| # | |
| # Create Topic | |
| # /bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz | |
| # /bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlq | |
| # | |
| # Get offsets for the topic to verify messages are being produced | |
| # /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic fizz_buzz | |
| # /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic dlq | |
| # | |
| # List consumer groups to verify our consumer group is being registered | |
| # /bin/kafka-consumer-groups --bootstrap-server broker:29092 --list | |
| # |
| """Airflow Kafka connection | ||
| AIRFLOW_CONN_KAFKA_DEFAULT='{ | ||
| "conn_type": "general", | ||
| "extra": { | ||
| "bootstrap.servers": "broker:29092", | ||
| "group.id": "kafka_default_group", | ||
| "security.protocol": "PLAINTEXT", | ||
| "enable.auto.commit": false, | ||
| "auto.offset.reset": "latest" | ||
| } | ||
| }' | ||
| """ | ||
| """Kafka Command to verify messages are being produced to the topic: | ||
|
|
||
| # Create Topic | ||
| /bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz | ||
| /bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlq | ||
|
|
||
|
|
||
| # Get offsets for the topic to verify messages are being produced | ||
| /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic fizz_buzz | ||
| /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic dlq | ||
|
|
||
| # List consumer groups to verify our consumer group is being registered | ||
| /bin/kafka-consumer-groups --bootstrap-server broker:29092 --list | ||
|
|
||
| # Get current offsets for the consumer group to verify messages are being consumed | ||
| /bin/kafka-consumer-groups --bootstrap-server broker:29092 --describe --group group_1 | ||
| """ |
There was a problem hiding this comment.
Same issue here: this is a standalone triple-quoted string (a no-op expression), not a docstring. Prefer comments or documentation in a README/rst, or fold this into the module docstring if it’s meant to be user-facing guidance.
| """Airflow Kafka connection | |
| AIRFLOW_CONN_KAFKA_DEFAULT='{ | |
| "conn_type": "general", | |
| "extra": { | |
| "bootstrap.servers": "broker:29092", | |
| "group.id": "kafka_default_group", | |
| "security.protocol": "PLAINTEXT", | |
| "enable.auto.commit": false, | |
| "auto.offset.reset": "latest" | |
| } | |
| }' | |
| """ | |
| """Kafka Command to verify messages are being produced to the topic: | |
| # Create Topic | |
| /bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz | |
| /bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlq | |
| # Get offsets for the topic to verify messages are being produced | |
| /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic fizz_buzz | |
| /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic dlq | |
| # List consumer groups to verify our consumer group is being registered | |
| /bin/kafka-consumer-groups --bootstrap-server broker:29092 --list | |
| # Get current offsets for the consumer group to verify messages are being consumed | |
| /bin/kafka-consumer-groups --bootstrap-server broker:29092 --describe --group group_1 | |
| """ | |
| # Airflow Kafka connection: | |
| # AIRFLOW_CONN_KAFKA_DEFAULT='{ | |
| # "conn_type": "general", | |
| # "extra": { | |
| # "bootstrap.servers": "broker:29092", | |
| # "group.id": "kafka_default_group", | |
| # "security.protocol": "PLAINTEXT", | |
| # "enable.auto.commit": false, | |
| # "auto.offset.reset": "latest" | |
| # } | |
| # }' | |
| # | |
| # Kafka commands to verify messages are being produced to the topic: | |
| # | |
| # Create Topic | |
| # /bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz | |
| # /bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlq | |
| # | |
| # Get offsets for the topic to verify messages are being produced | |
| # /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic fizz_buzz | |
| # /bin/kafka-get-offsets --bootstrap-server broker:29092 --topic dlq | |
| # | |
| # List consumer groups to verify our consumer group is being registered | |
| # /bin/kafka-consumer-groups --bootstrap-server broker:29092 --list | |
| # | |
| # Get current offsets for the consumer group to verify messages are being consumed | |
| # /bin/kafka-consumer-groups --bootstrap-server broker:29092 --describe --group group_1 |
| print(f"Asset event: {event}") | ||
| process_one_message(cast("str", event.extra["payload"])) | ||
| return True |
There was a problem hiding this comment.
typing.cast expects a type as its first argument, not a string. Using cast("str", ...) defeats static checking and is inconsistent with normal typing usage. Use cast(str, ...) or (preferably) validate/cast the payload more explicitly if it can be non-string.
5eb37d7 to
9ef8404
Compare
- Use timedelta(seconds=1) for retry_delay instead of bare int - Return exactly expected_count runs from _wait_for_consumer_dag_runs - Replace fixed 30s sleep with polling for Kafka consumer group registration - Pin --partitions 1 --replication-factor 1 for deterministic topic creation - Add kafka.yml pattern to selective checks file group - Remove container_name and unused JMX port from kafka.yml - Regenerate breeze CLI doc images
9ef8404 to
8e10497
Compare
jason810496
left a comment
There was a problem hiding this comment.
The CI failure require additional PR to fix the AwaitTrigger first:
airflow-triggerer-1 | 2026-05-02T15:25:00.831436Z [error ] Trigger ID 1 exited with error 'AwaitMessageTrigger' object has no attribute '_task_instance' [airflow.jobs.triggerer_job_runner] error_detail=[{'exc_type': 'AttributeError', 'exc_value': "'AwaitMessageTrigger' object has no attribute '_task_instance'", 'exc_notes': [], 'syntax_error': None, 'is_cause': False, 'frames': [{'filename': '/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/triggerer_job_runner.py', 'lineno': 1238, 'name': 'cleanup_finished_triggers'}, {'filename': '/home/airflow/.local/lib/python3.10/site-packages/greenback/_impl.py', 'lineno': 119, 'name': 'greenback_shim'}, {'filename': '/home/airflow/.local/lib/python3.10/site-packages/greenback/_impl.py', 'lineno': 208, 'name': '_greenback_shim'}, {'filename': '/home/airflow/.local/lib/python3.10/site-packages/greenback/_impl.py', 'lineno': 84, 'name': 'trampoline'}, {'filename': '/home/airflow/.local/lib/python3.10/site-packages/outcome/_impl.py', 'lineno': 185, 'name': 'send'}, {'filename': '/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/triggerer_job_runner.py', 'lineno': 1387, 'name': 'run_trigger'}, {'filename': '/home/airflow/.local/lib/python3.10/site-packages/airflow/triggers/base.py', 'lineno': 104, 'name': 'task_instance'}], 'is_group': False, 'exceptions': []}] loc=triggerer_job_runner.py:929
airflow-triggerer-1 | 2026-05-02T15:25:00.832067Z [error ] Trigger exited without sending an event. Dependent tasks will be failed. [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:929 name='ID 1'
airflow-triggerer-1 | 2026-05-02T15:25:02.858280Z [error ] Trigger ID 1 exited with error 'AwaitMessageTrigger' object has no attribute '_task_instance' [airflow.jobs.triggerer_job_runner] error_detail=[{'exc_type': 'AttributeError', 'exc_value': "'AwaitMessageTrigger' object has no attribute '_task_instance'", 'exc_notes': [], 'syntax_error': None, 'is_cause': False, 'frames': [{'filename': '/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/triggerer_job_runner.py', 'lineno': 1238, 'name': 'cleanup_finished_triggers'}, {'filename': '/home/airflow/.local/lib/python3.10/site-packages/greenback/_impl.py', 'lineno': 119, 'name': 'greenback_shim'}, {'filename': '/home/airflow/.local/lib/python3.10/site-packages/greenback/_impl.py', 'lineno': 208, 'name': '_greenback_shim'}, {'filename': '/home/airflow/.local/lib/python3.10/site-packages/greenback/_impl.py', 'lineno': 84, 'name': 'trampoline'}, {'filename': '/home/airflow/.local/lib/python3.10/site-packages/outcome/_impl.py', 'lineno': 185, 'name': 'send'}, {'filename': '/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/triggerer_job_runner.py', 'lineno': 1387, 'name': 'run_trigger'}, {'filename': '/home/airflow/.local/lib/python3.10/site-packages/airflow/triggers/base.py', 'lineno': 104, 'name': 'task_instance'}], 'is_group': False, 'exceptions': []}] loc=triggerer_job_runner.py:929
parkhojeong
left a comment
There was a problem hiding this comment.
i reviewed about term usage :)
| """Poll until *dag_id* is registered (parsed & serialized) and reachable via the API. | ||
|
|
||
| On remote CI runners the DAG processor may need extra time to parse & serialize | ||
| new DAG files, so calling ``un_pause_dag`` immediately can return a 404. Retry |
There was a problem hiding this comment.
| new DAG files, so calling ``un_pause_dag`` immediately can return a 404. Retry | |
| new Dag files, so calling ``un_pause_dag`` immediately can return a 404. Retry |
|
|
||
| On remote CI runners the DAG processor may need extra time to parse & serialize | ||
| new DAG files, so calling ``un_pause_dag`` immediately can return a 404. Retry | ||
| until the DAG exists or *timeout* seconds elapse. |
There was a problem hiding this comment.
| until the DAG exists or *timeout* seconds elapse. | |
| until the Dag exists or *timeout* seconds elapse. |
| raise | ||
| last_error = exc | ||
| time.sleep(check_interval) | ||
| raise TimeoutError(f"DAG {dag_id} was not registered within {timeout}s. Last error: {last_error}") |
There was a problem hiding this comment.
| raise TimeoutError(f"DAG {dag_id} was not registered within {timeout}s. Last error: {last_error}") | |
| raise TimeoutError(f"Dag {dag_id} was not registered within {timeout}s. Last error: {last_error}") |
| ) | ||
|
|
||
| def list_dag_runs(self, dag_id: str, limit: int = 100): | ||
| """List DAG runs for a given DAG.""" |
There was a problem hiding this comment.
| """List DAG runs for a given DAG.""" | |
| """List Dag runs for a given DAG.""" |
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| """E2E tests for the event-driven DAG pattern using Apache Kafka. |
There was a problem hiding this comment.
| """E2E tests for the event-driven DAG pattern using Apache Kafka. | |
| """E2E tests for the event-driven Dag pattern using Apache Kafka. |
| """Return a mapping of task_id -> state for a consumer DAG run.""" | ||
| response = self.airflow_client.get_task_instances(CONSUMER_DAG_ID, run_id) |
There was a problem hiding this comment.
| """Return a mapping of task_id -> state for a consumer DAG run.""" | |
| response = self.airflow_client.get_task_instances(CONSUMER_DAG_ID, run_id) | |
| """Return a mapping of task_id -> state for a consumer Dag run.""" | |
| response = self.airflow_client.get_task_instances(CONSUMER_DAG_ID, run_id) |
| 1. Unpause the consumer DAG so the triggerer starts the AssetWatcher. | ||
| 2. Wait for the Kafka MessageQueueTrigger to begin polling. | ||
| 3. Trigger the producer DAG and wait for it to succeed. | ||
| 4. Wait for 9 consumer DAG runs to reach a terminal state. | ||
| 5. All 9 DAG runs succeed. Verify task-level behavior: |
There was a problem hiding this comment.
| 1. Unpause the consumer DAG so the triggerer starts the AssetWatcher. | |
| 2. Wait for the Kafka MessageQueueTrigger to begin polling. | |
| 3. Trigger the producer DAG and wait for it to succeed. | |
| 4. Wait for 9 consumer DAG runs to reach a terminal state. | |
| 5. All 9 DAG runs succeed. Verify task-level behavior: | |
| 1. Unpause the consumer Dag so the triggerer starts the AssetWatcher. | |
| 2. Wait for the Kafka MessageQueueTrigger to begin polling. | |
| 3. Trigger the producer Dag and wait for it to succeed. | |
| 4. Wait for 9 consumer Dag runs to reach a terminal state. | |
| 5. All 9 Dag runs succeed. Verify task-level behavior: |
|
|
||
| # 3. Trigger producer and wait for it to complete | ||
| producer_state = self.airflow_client.trigger_dag_and_wait(PRODUCER_DAG_ID) | ||
| assert producer_state == "success", f"Producer DAG did not succeed. Final state: {producer_state}" |
There was a problem hiding this comment.
| assert producer_state == "success", f"Producer DAG did not succeed. Final state: {producer_state}" | |
| assert producer_state == "success", f"Producer Dag did not succeed. Final state: {producer_state}" |
| producer_state = self.airflow_client.trigger_dag_and_wait(PRODUCER_DAG_ID) | ||
| assert producer_state == "success", f"Producer DAG did not succeed. Final state: {producer_state}" | ||
|
|
||
| # 4. Wait for all 9 consumer DAG runs |
There was a problem hiding this comment.
| # 4. Wait for all 9 consumer DAG runs | |
| # 4. Wait for all 9 consumer Dag runs |
|
|
||
|
|
||
| def _create_kafka_topics(compose_instance): | ||
| """Create Kafka topics required by the event-driven DAG.""" |
There was a problem hiding this comment.
| """Create Kafka topics required by the event-driven DAG.""" | |
| """Create Kafka topics required by the event-driven Dag.""" |
Why
Airflow 3 introduces event-driven DAGs with Assets, AssetWatchers, and MessageQueueTrigger to enable reactive, message-driven pipelines. This PR adds a comprehensive e2e
test suite to verify that Kafka-based event routing works correctly, including proper handling of malformed messages and dead-letter queues.
What
AssetWatcherkafka.yamlandupdate_run.share copied frombreeze --integration kafkaWas generative AI tooling used to co-author this PR?