Skip to content

Commit 55b67e5

Browse files
committed
docs: Added more docstrings and documentation
1 parent 277b28d commit 55b67e5

3 files changed

Lines changed: 160 additions & 1 deletion

File tree

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
.. NOTE TO CONTRIBUTORS:
19+
Please, only add notes to the Changelog just below the "Changelog" header when there are some breaking changes
20+
and you want to add an explanation to the users on how they are supposed to deal with them.
21+
The changelog is updated and maintained semi-automatically by release manager.
22+
23+
24+
IBM MQ Message Queue
25+
====================
26+
27+
.. contents::
28+
:local:
29+
:depth: 2
30+
31+
32+
IBM MQ Queue Provider
33+
---------------------
34+
35+
Implemented by :class:`~airflow.providers.ibm.mq.queues.mq.IBMMQMessageQueueProvider`
36+
37+
The IBM MQ Queue Provider is a
38+
:class:`~airflow.providers.common.messaging.providers.base_provider.BaseMessageQueueProvider`
39+
that uses IBM MQ as the underlying message queue system.
40+
41+
It allows you to send and receive messages using IBM MQ queues in your Airflow workflows
42+
via the common message queue interface
43+
:class:`~airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger`.
44+
45+
46+
.. include:: /../src/airflow/providers/ibm/mq/queues/mq.py
47+
:start-after: [START ibmmq_message_queue_provider_description]
48+
:end-before: [END ibmmq_message_queue_provider_description]
49+
50+
51+
.. _howto/triggers:IBMMQMessageQueueTrigger:
52+
53+
54+
IBM MQ Message Queue Trigger
55+
----------------------------
56+
57+
Implemented by :class:`~airflow.providers.ibm.mq.triggers.mq.AwaitMessageTrigger`
58+
59+
Inherited from
60+
:class:`~airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger`
61+
62+
Wait for a message in a queue
63+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
64+
65+
Below is an example of how you can configure an Airflow DAG to be triggered
66+
by a message arriving in an IBM MQ queue.
67+
68+
.. exampleinclude:: /../tests/system/ibm/mq/example_dag_message_queue_trigger.py
69+
:language: python
70+
:start-after: [START howto_trigger_message_queue]
71+
:end-before: [END howto_trigger_message_queue]
72+
73+
74+
How it works
75+
------------
76+
77+
1. **IBM MQ Message Queue Trigger**
78+
The ``AwaitMessageTrigger`` listens for messages from an IBM MQ queue.
79+
80+
2. **Asset and Watcher**
81+
The ``Asset`` abstracts the external entity, the IBM MQ queue in this example.
82+
The ``AssetWatcher`` associates a trigger with a name. This name helps you
83+
identify which trigger is associated with which asset.
84+
85+
3. **Event-Driven DAG**
86+
Instead of running on a fixed schedule, the DAG executes when the asset receives
87+
an update (for example, when a new message arrives in the queue).
88+
89+
For how to use the trigger, refer to the documentation of the
90+
:ref:`Messaging Trigger <howto/trigger:MessageQueueTrigger>`.

providers/ibm/mq/src/airflow/providers/ibm/mq/queues/mq.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from urllib.parse import urlparse
2323

2424
from airflow.providers.common.messaging.providers.base_provider import BaseMessageQueueProvider
25-
from airflow.providers.ibm.mq.hooks.mq import IBMMQHook
2625
from airflow.providers.ibm.mq.triggers.mq import AwaitMessageTrigger
2726
from airflow.providers.ibm.mq.version_compat import AIRFLOW_V_3_0_PLUS
2827

@@ -38,6 +37,29 @@
3837

3938

4039
class IBMMQMessageQueueProvider(BaseMessageQueueProvider):
40+
"""
41+
Configuration for IBM MQ integration with common-messaging.
42+
43+
[START ibmmq_message_queue_provider_description]
44+
45+
* It uses ``mq`` as scheme for identifying IBM MQ queues.
46+
* For parameter definitions take a look at
47+
:class:`~airflow.providers.ibm.mq.triggers.mq.AwaitMessageTrigger`.
48+
49+
.. code-block:: python
50+
51+
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
52+
from airflow.sdk import Asset, AssetWatcher
53+
54+
trigger = MessageQueueTrigger(
55+
queue="mq://mq_default/MY.QUEUE.NAME",
56+
)
57+
58+
asset = Asset("mq_topic_asset", watchers=[AssetWatcher(name="mq_watcher", trigger=trigger)])
59+
60+
[END ibmmq_message_queue_provider_description]
61+
"""
62+
4163
scheme = "mq"
4264

4365
def queue_matches(self, queue: str) -> bool:
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
# [START howto_trigger_message_queue]
20+
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
21+
from airflow.sdk import DAG, Asset, AssetWatcher, task
22+
23+
# Define a trigger that listens to an external message queue (IBM MQ in this case)
24+
trigger = MessageQueueTrigger(
25+
queue="mq://mq_default/MY.QUEUE.NAME",
26+
)
27+
28+
mq_topic_asset = Asset(
29+
"mq_topic_asset",
30+
watchers=[AssetWatcher(name="mq_watcher", trigger=trigger)],
31+
)
32+
33+
with DAG(dag_id="example_ibm_mq_watcher", schedule=[mq_topic_asset]) as dag:
34+
@task
35+
def process_message(**context):
36+
for event in context["triggering_asset_events"][mq_topic_asset]:
37+
# Get the message from the TriggerEvent payload
38+
print("Processing event: ", event)
39+
payload = event["payload"]
40+
print("Actual payload: ", payload)
41+
# [END howto_trigger_message_queue]
42+
43+
44+
from tests_common.test_utils.system_tests import get_test_run # noqa: E402
45+
46+
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
47+
test_run = get_test_run(dag)

0 commit comments

Comments
 (0)