Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 144 additions & 1 deletion airbyte-integrations/connectors/source-gmail/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ check:
stream_names:
- messages

concurrency_level:
type: ConcurrencyLevel
default_concurrency: "{{ config.get('num_workers', 5) }}"
Comment thread
pnilan marked this conversation as resolved.
max_concurrency: 10

definitions:
streams:
profile:
Expand Down Expand Up @@ -42,6 +47,7 @@ definitions:
http_method: GET
request_parameters:
includeSpamTrash: "{{ config['include_spam_and_trash'] }}"
q: "{% if config.get('start_date') %}after:{{ format_datetime(config['start_date'], '%s') }}{% endif %}"
record_selector:
type: RecordSelector
extractor:
Expand Down Expand Up @@ -117,6 +123,14 @@ definitions:
schema:
$ref: "#/schemas/labels_details"
messages:
# Public, full-refresh stream. `users.messages.list` returns `{id,
# threadId}` stubs with no `internalDate`, so there is no record-level
# cursor value available on this stream. Exposing it as incremental
# would silently drop every record when selected standalone
# (DatetimeBasedCursor._is_within_daterange_boundaries returns False +
# WARN for records missing the cursor field). The repeat-sync
# efficiency gain is instead realised inside `messages_details` via
# the internal `_messages_for_details` parent.
type: DeclarativeStream
name: messages
primary_key:
Expand All @@ -129,6 +143,7 @@ definitions:
http_method: GET
request_parameters:
includeSpamTrash: "{{ config['include_spam_and_trash'] }}"
q: "{% if config.get('start_date') %}after:{{ format_datetime(config['start_date'], '%s') }}{% endif %}"
record_selector:
type: RecordSelector
extractor:
Expand All @@ -154,6 +169,62 @@ definitions:
type: InlineSchemaLoader
schema:
$ref: "#/schemas/messages"
# INTERNAL — do not add to the top-level `streams:` list. This stream
# exists only so `messages_details` has a parent whose cursor can
# receive migrated child state via `incremental_dependency: true`,
# letting repeat syncs bound the `messages` list call by the last-seen
# `internalDate` instead of re-crawling from the configured start.
#
# The `cursor_field: internalDate` on this internal parent is unusual:
# list-endpoint responses don't populate it, so the cursor only ever
# advances via state migration from the child. That is acceptable here
# because (a) this stream is never surfaced to users, so standalone
# selection is impossible, and (b) the child's state write is the only
# driver of the parent's cursor value by design. See pnilan's review
# on PR #76431 for the full rationale.
_messages_for_details:
$ref: "#/definitions/streams/messages"
name: _messages_for_details
retriever:
type: SimpleRetriever
requester:
$ref: "#/definitions/base_requester"
path: messages
http_method: GET
request_parameters:
includeSpamTrash: "{{ config['include_spam_and_trash'] }}"
q: "after:{{ stream_interval.start_time }}"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path:
- messages
paginator:
type: DefaultPaginator
page_token_option:
type: RequestOption
inject_into: request_parameter
field_name: pageToken
page_size_option:
type: RequestOption
field_name: maxResults
inject_into: request_parameter
pagination_strategy:
type: CursorPagination
page_size: 100
cursor_value: "{{ response.get(\"nextPageToken\", {}) }}"
stop_condition: "{{ not response.get(\"nextPageToken\", {}) }}"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: internalDate
cursor_datetime_formats:
- "%s"
datetime_format: "%s"
start_datetime:
type: MinMaxDatetime
datetime: "{{ config.get('start_date', '2004-04-01T00:00:00Z') }}"
datetime_format: "%Y-%m-%dT%H:%M:%SZ"
messages_details:
type: DeclarativeStream
name: messages_details
Expand All @@ -176,8 +247,19 @@ definitions:
- type: ParentStreamConfig
parent_key: id
partition_field: message_id
incremental_dependency: true
Comment thread
pnilan marked this conversation as resolved.
stream:
$ref: "#/definitions/streams/messages"
$ref: "#/definitions/streams/_messages_for_details"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: internalDate
cursor_datetime_formats:
- "%ms"
datetime_format: "%s"
start_datetime:
type: MinMaxDatetime
datetime: "{{ config.get('start_date', '2004-04-01T00:00:00Z') }}"
datetime_format: "%Y-%m-%dT%H:%M:%SZ"
schema_loader:
type: InlineSchemaLoader
schema:
Expand All @@ -195,6 +277,7 @@ definitions:
http_method: GET
request_parameters:
includeSpamTrash: "{{ config['include_spam_and_trash'] }}"
q: "{% if config.get('start_date') %}after:{{ format_datetime(config['start_date'], '%s') }}{% endif %}"
record_selector:
type: RecordSelector
extractor:
Expand Down Expand Up @@ -261,6 +344,42 @@ definitions:
access_token_name: access_token
refresh_request_body: {}
token_refresh_endpoint: https://accounts.google.com/o/oauth2/token
error_handler:
type: DefaultErrorHandler
backoff_strategies:
# Gmail usually returns a `Retry-After` header on 429 and on 403
# quota-saturation responses, but not always. `WaitTimeFromHeader`
# reads it when present; `ExponentialBackoffStrategy` is the
# fallback when the header is absent so the sync still backs off
# instead of retrying immediately.
- type: WaitTimeFromHeader
header: Retry-After
- type: ExponentialBackoffStrategy
factor: 5
response_filters:
- type: HttpResponseFilter
action: RATE_LIMITED
http_codes:
- 429
Comment thread
pnilan marked this conversation as resolved.
# Gmail returns HTTP 403 with `error.errors[0].reason` set to
# `rateLimitExceeded` or `userRateLimitExceeded` on quota-unit
# saturation. The default CDK 403 mapping is FAIL/config_error
# (e.g. wrong OAuth scope, Gmail API disabled, revoked token),
# which would terminate the sync as an auth failure. We match
# *only* rate-limit 403s via `predicate` so auth-failure 403s
# keep failing fast.
#
# `HttpResponseFilter` matchers are OR'd, not AND'd, so we must
# not pair this with `http_codes: [403]` — that would match any
# 403 regardless of the predicate. Also, `error_message_contains`
# scans the top-level `error.message` ("User-rate limit
# exceeded."), not the nested `error.errors[0].reason`, so the
# reason string must be checked explicitly.
# See https://developers.google.com/gmail/api/guides/handle-errors.
- type: HttpResponseFilter
action: RATE_LIMITED
predicate: >-
{{ 'rateLimitExceeded' in (response.get('error', {}).get('errors', [{}]) | first).get('reason', '') }}

streams:
- $ref: "#/definitions/streams/profile"
Expand Down Expand Up @@ -307,6 +426,30 @@ spec:
title: Include Spam & Trash
default: false
order: 3
num_workers:
type: integer
title: Number of concurrent workers
description: >-
Number of concurrent workers used when syncing. Higher values result
in faster syncs but may trigger rate limiting on lower-tier Gmail API
quotas. The default works well for most accounts; if you see frequent
rate-limit errors in sync logs, reduce this value.
default: 5
minimum: 2
maximum: 10
order: 4
start_date:
type: string
title: Start date
description: >-
UTC date and time in the format YYYY-MM-DDTHH:MM:SSZ. Only messages,
threads, and drafts received on or after this date will be replicated.
If not set, all historical data will be replicated.
format: date-time
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$"
examples:
- "2020-01-01T00:00:00Z"
order: 5
additionalProperties: true

metadata:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-gmail/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: f7833dac-fc18-4feb-a2a9-94b22001edc6
dockerImageTag: 0.0.51
dockerImageTag: 0.1.0
dockerRepository: airbyte/source-gmail
githubIssueLabel: source-gmail
icon: icon.svg
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from pathlib import Path
from typing import Any, Mapping, Optional

import pytest
import yaml

from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.state_builder import StateBuilder


_MANIFEST_PATH = Path(__file__).parent.parent / "manifest.yaml"


@pytest.fixture(scope="session")
def manifest() -> Mapping[str, Any]:
with open(_MANIFEST_PATH) as f:
return yaml.safe_load(f)


@pytest.fixture
def base_config() -> Mapping[str, Any]:
return {
"client_id": "test_client_id",
"client_secret": "test_client_secret",
"client_refresh_token": "test_refresh_token",
"include_spam_and_trash": False,
}


@pytest.fixture
def config_with_start_date(base_config) -> Mapping[str, Any]:
return {**base_config, "start_date": "2024-01-01T00:00:00Z"}


def build_source(config: Mapping[str, Any], state: Optional[list] = None) -> YamlDeclarativeSource:
catalog = CatalogBuilder().build()
state = StateBuilder().build() if not state else state
return YamlDeclarativeSource(path_to_yaml=str(_MANIFEST_PATH), catalog=catalog, config=config, state=state)


@pytest.fixture(autouse=True)
def mock_oauth(requests_mock):
requests_mock.post(
"https://accounts.google.com/o/oauth2/token",
json={"access_token": "access_token", "expires_in": 3600},
)
Loading
Loading