Issue while connecting to GCP using Secrets / Impersonation Chain in DataprocCreateClusterOperator #13042
Unanswered
abhishekshenoy
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
We are using airflow 2.0.0b2 version and our airflow runs on GKE clusters.
The executor pods run with a service account that has permission to access secrets in Google Secret Manager.
When i pass the secret name (prefix and all correctly set) to gcp_conn_id , i see that though the secrets are correctly fetched it is not used to create a cluster in Dataproc. Rather the default account in the pod is used to create the cluser which does not have the dataproc admin role.
Task definition is below:
I get the below exception:
I know i am missing something here , i tried retrieving the connection and passing the connection.uri string to gcp_conn_id using templates but as gcp_conn_id is not a templated field the variable was not getting resolved.
I am still unable to establish a connection when i try gcp_conn_id with connections retrieved from secrets.
I moved from using secrets to using impersonation_chain by giving the service accounts with which my pods run serviceAccountTokenCreator role for .
The below Task definition works fine in my local Docker setup and i am able to create a cluster wherein my tasks are running as local executor.
The same when run on my dev setup throws the below excpetion , i do not understand why it is looking into the secret manager to get google_cloud_default information ? Any help would unblock me in proceeding with setting up a dataproc flow using Airflow. Have attached the file which has the whole exception stack trace.
impersonation_chain_exception.log
[2020-12-13 07:31:00,542] {taskinstance.py:1018} INFO - Starting attempt 1 of 2
[2020-12-13 07:31:00,542] {taskinstance.py:1019} INFO -
[2020-12-13 07:31:00,640] {taskinstance.py:1038} INFO - Executing <Task(DataprocCreateClusterOperator): create_cluster> on 2020-12-13T07:30:40.273638+00:00
[2020-12-13 07:31:00,645] {standard_task_runner.py:50} INFO - Started process 14 to run task
[2020-12-13 07:31:00,650] {standard_task_runner.py:74} INFO - Running: ['airflow', 'tasks', 'run', '4_spark_submit_dataproc', 'create_cluster', '2020-12-13T07:30:40.273638+00:00', '--job-id', '203', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/templates/4_spark_submit_dataproc.py', '--cfg-path', '/tmp/tmpeqcdvbvb']
[2020-12-13 07:31:00,651] {standard_task_runner.py:75} INFO - Job 203: Subtask create_cluster
[2020-12-13 07:31:01,189] {logging_mixin.py:103} INFO - Running <TaskInstance: 4_spark_submit_dataproc.create_cluster 2020-12-13T07:30:40.273638+00:00 [running]> on host 4sparksubmitdataproccreatecluster-8d0cc477352847c6a93d4b2568051
[2020-12-13 07:31:01,550] {taskinstance.py:1230} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=airflow@example.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=4_spark_submit_dataproc
AIRFLOW_CTX_TASK_ID=create_cluster
AIRFLOW_CTX_EXECUTION_DATE=2020-12-13T07:30:40.273638+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2020-12-13T07:30:40.273638+00:00
[2020-12-13 07:31:01,550] {dataproc.py:603} INFO - Creating cluster: first-cluster-setup
[2020-12-13 07:31:02,030] {secret_manager_client.py:89} ERROR - Google Cloud API Call Error (PermissionDenied): No access for Secret ID airflow-connections-google_cloud_default.
Did you add 'secretmanager.versions.access' permission?
[2020-12-13 07:31:02,103] {taskinstance.py:1396} ERROR - (psycopg2.errors.UndefinedColumn) column connection.description does not exist
LINE 1: ...id, connection.conn_type AS connection_conn_type, connection...
^
[SQL: SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.description AS connection_description, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted
FROM connection
WHERE connection.conn_id = %(conn_id_1)s
LIMIT %(param_1)s]
[parameters: {'conn_id_1': 'google_cloud_default', 'param_1': 1}]
(Background on this error at: http://sqlalche.me/e/13/f405)
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.UndefinedColumn: column connection.description does not exist
LINE 1: ...id, connection.conn_type AS connection_conn_type, connection...
^
Beta Was this translation helpful? Give feedback.
All reactions