feat(source-gmail): incremental sync on messages_details, concurrency, and Gmail rate-limit handling#76431
Conversation
…d Gmail rate-limit handling Co-Authored-By: bot_apk <apk@cognition.ai>
This comment was marked as outdated.
This comment was marked as outdated.
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksPR Slash CommandsAirbyte Maintainers (that's you!) can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful Resources
|
Co-Authored-By: bot_apk <apk@cognition.ai>
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
Co-Authored-By: bot_apk <apk@cognition.ai>
Per PR review: CompositeErrorHandler is unnecessary when the wrapped error handlers are a specialized DefaultErrorHandler + a catch-all DefaultErrorHandler. A single DefaultErrorHandler with the Retry-After backoff and 429 response filter is equivalent and cleaner. Co-Authored-By: bot_apk <apk@cognition.ai>
…manifest.py Per PR review: - The api_budget MovingWindowCallRatePolicy used a flat req/sec limit, but Gmail's quota is enforced in quota-units per method (5-100 units). The reactive Retry-After error handler already correctly backs off on 429s, so the proactive budget was redundant and unnecessarily coarse. - test_manifest.py structural assertions duplicated what test_streams.py already verifies end-to-end through the CDK. Co-Authored-By: bot_apk <apk@cognition.ai>
This comment was marked as outdated.
This comment was marked as outdated.
…ds for q=after:
Gmail's users.messages.list returns only {id, threadId} stubs; internalDate is
only in users.messages.get. The cursor therefore has to live on
messages_details, not messages. Keep incremental_dependency: true on the parent
config so the parent's q=after: can be driven by the child cursor on repeat
syncs.
Also fixes the drafts/threads q=after: filters to use unix seconds via
format_datetime — Gmail interprets after:YYYY/MM/DD as midnight PST, which
drifts ~8h for UTC users.
Changelog: link to the correct PR (#76431).
Co-Authored-By: bot_apk <apk@cognition.ai>
This comment was marked as outdated.
This comment was marked as outdated.
…te-limit handling - Give messages stream its own DatetimeBasedCursor on internalDate so incremental_dependency from messages_details is meaningful. Parent's cursor_datetime_formats is %s (unix seconds) to match what the child emits, since list responses carry no internalDate and the parent only reads state via the child's migrated value. - Reclassify HTTP 403 with rateLimitExceeded in the body as RATE_LIMITED so Gmail quota-saturation errors retry with Retry-After instead of terminating as auth failures. - Expose num_workers in the user spec (2..10, default 5) so users can throttle concurrency on lower-tier Gmail API quotas. - Add unit tests covering parent q=after advancement on resume and 403+rateLimitExceeded retry behavior. Co-Authored-By: bot_apk <apk@cognition.ai>
Co-Authored-By: bot_apk <apk@cognition.ai>
…dd ExponentialBackoff fallback Co-Authored-By: bot_apk <apk@cognition.ai>
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
… parent for messages_details
Addresses pnilan's follow-up review. Exposing 'messages' as incremental on
internalDate is unsafe because users.messages.list returns {id, threadId}
stubs with no cursor field; DatetimeBasedCursor silently drops those records
when the stream is selected standalone (no child feeds state back).
Solution: keep public 'messages' as full-refresh (restores pre-0.1.0
semantics, adds optional start_date gate), and introduce an internal
'_messages_for_details' stream whose sole role is to serve as the parent of
'messages_details' so incremental_dependency can migrate child state back
and bound repeat-sync list calls. The internal stream is not added to the
top-level streams list, so it cannot be selected directly by users.
Tests: 2 new regression guards — one asserts public 'messages' emits
list-endpoint stubs standalone with no default q, one asserts it still
injects q=after:<unix> when start_date is configured. 11/11 pass.
Co-Authored-By: bot_apk <apk@cognition.ai>
This comment was marked as outdated.
This comment was marked as outdated.
|
/ai-prove-fix
|
|
|
|
↪️ Triggering Reason: Ready-for-review PR with CI green and no prior |
Reviewing PR for connector safety and quality.
|
What
Resolves https://github.com/airbytehq/airbyte-internal-issues/issues/16209:
Implements the audit recommendations from airbytehq/airbyte-internal-issues#16209 for
source-gmail(v0.0.50 → v0.1.0, manifest-only, community/alpha):Incremental sync on
messages_details—DatetimeBasedCursoroninternalDate.messages_detailsis the only stream whereinternalDateis actually populated (users.messages.listreturns{id, threadId}stubs only), so it is the only viable record-level cursor site.Internal
_messages_for_detailsparent stream (not surfaced to users) — incremental oninternalDate(unix seconds), withq=after:{{ stream_interval.start_time }}on the list call.messages_details'sParentStreamConfigreferences this internal stream withincremental_dependency: true, so the child's cursor state migrates back onto the parent and bounds the parent's list crawl on every repeat sync. The stream is defined underdefinitions.streams._messages_for_detailsand is deliberately not listed under the top-levelstreams:block, so users cannot select it in their catalog. An inline comment documents this constraint.This split replaces an earlier design in this PR that exposed the public
messagesstream as incremental oninternalDate. That was unsafe: list-endpoint records do not carryinternalDate, soDatetimeBasedCursor._is_within_daterange_boundarieswould have silently dropped every record (returnsFalse+WARN) whenevermessageswas selected standalone (withoutmessages_detailsin the same catalog). Keeping the publicmessagesstream full-refresh preserves pre-0.1.0 semantics exactly for users who select only that stream.Public
messagesstream stays full-refresh, with a new optionalstart_dategate on theq=parameter (only injected whenstart_dateis configured). No behavioural change for users who don't setstart_date.Server-side
q=after:filtering onthreads,drafts, and the internal_messages_for_details— unix seconds (notYYYY/MM/DD, which Gmail interprets as midnight PST and drifts by approximately 8h for non-PST users). On the internal parent, when no prior state and nostart_dateis configured,stream_interval.start_timeresolves to theMinMaxDatetimedefault2004-04-01T00:00:00Z→q=after:1080777600, which predates Gmail and is a functional no-op on first sync. On the publicmessages,threads, anddraftsstreams theqis only injected whenstart_dateis configured — preserving pre-0.1.0 request shape on upgrade.Top-level
concurrency_levelwith matching user-facingnum_workersspec field (default 5, min 2, max 10) — parallelizes detail-substream fan-out and lets users tune it down if their Gmail quota tier is low.Reactive rate-limit handling on
base_requester'sDefaultErrorHandler:backoff_strategies:WaitTimeFromHeader(Retry-After)withExponentialBackoffStrategy(factor=5)as fallback for responses that omitRetry-After.response_filters: 429 →RATE_LIMITEDviahttp_codes, and 403 →RATE_LIMITEDonly whenerror.errors[0].reason == "rateLimitExceeded"/userRateLimitExceeded, matched viapredicate. The predicate cannot be OR'd withhttp_codes: [403]becauseHttpResponseFilter's matchers are OR'd — that would reclassify auth/scope 403s as retryable and mask real config errors. Gmail's quota errors are documented as 403 with thisreasonfield, not 429: https://developers.google.com/gmail/api/guides/handle-errors.api_budget: Gmail charges per-method quota units (5–100), whichMovingWindowCallRatePolicycannot model; letting Gmail signal back-off is correct.New optional
start_datespec field — non-required; omitting it preserves full-history semantics on every stream.Scope narrowing — what is NOT in this PR
Issue #16209's recommendations 1 and 2 also call out
threads_detailsfor aninternalDate-derived cursor.threads_detailsremains full-refresh in this PR.users.threads.getreturnsmessages[*].internalDate, so a cursor driven offmax(messages[*].internalDate)is technically possible, but it requires either aCustomRecordExtractor(non-declarative) or a manifest transformation to hoist the nested max into a top-level field beforeDatetimeBasedCursorcan observe it. That is a larger change than the rest of this PR and I would prefer to do it as a follow-up so the messages-side wins can ship first. Happy to split it out earlier if reviewers prefer.How
All changes are YAML-only in
airbyte-integrations/connectors/source-gmail/manifest.yamland use built-in declarative components — no custom Python.Declarative-First Evaluation
The connector has
language:manifest-onlyandcdk:low-codetags, so this gate applies.DatetimeBasedCursoronmessages_details(cursor_datetime_formats: ["%ms"],datetime_format: "%s") and on the internal_messages_for_detailsparent (cursor_datetime_formats: ["%s"], because parent state arrives already-normalized from the child viaincremental_dependency).request_parametersusing theformat_datetimemacro (public streams, conditional onstart_date) andstream_interval.start_time(internal parent, always).ParentStreamConfig.incremental_dependency: truepointing at an internal-only parent definition.ConcurrencyLevelbound toconfig.num_workers.DefaultErrorHandlerwithWaitTimeFromHeader+ExponentialBackoffStrategy, 429 filter viahttp_codes, 403 rate-limit filter viapredicateinspectingerror.errors[0].reason.Test Coverage
Added
airbyte-integrations/connectors/source-gmail/unit_tests/(the connector previously had no unit tests).test_streams.pyuses the CDK'sYamlDeclarativeSource+requests_mock, with mocks shaped to real Gmail response contracts (list returns stubs; get populatesinternalDate):test_public_messages_standalone_is_full_refresh_and_emits_records— regression guard for pnilan's follow-up review. Asserts the publicmessagesstream emits list-endpoint stubs when selected standalone, with no defaultq=after:gate. Ifmessagesis ever re-declared as incremental oninternalDate, this test fails becauseDatetimeBasedCursorwould drop every stub.test_public_messages_injects_after_unix_seconds_when_start_date_set— asserts the publicmessagesstream injectsq=after:<unix>whenstart_dateIS configured.test_messages_details_checkpoints_on_internal_date— asserts the global cursor advances to the maxinternalDate(unix seconds) and per-partition cursors each carry a value.test_messages_parent_defaults_to_pre_gmail_epoch_when_no_start_date— asserts the internal_messages_for_detailsparent list call carriesq=after:1080777600(theMinMaxDatetimedefault resolved throughstream_interval.start_time) on a freshmessages_detailssync with no configstart_date.test_messages_parent_q_advances_with_prior_state— regression guard forincremental_dependencystate migration: seeds prior state at the child and asserts the next internal-parent list call'sq=after:uses the migrated unix-second cursor, not the default.test_messages_request_injects_after_from_start_date— withstart_date=2024-01-01T00:00:00Z, assertsq=after:1704067200on the internal parent list call.test_drafts_injects_after_unix_seconds_when_start_date_set,test_threads_injects_after_unix_seconds_when_start_date_set— avoid PST drift.test_retry_after_on_429_is_honoured— 429 +Retry-After: 7→ sleep ≥7s before a successful retry.test_retry_after_on_403_rate_limit_exceeded_is_honoured— 403 witherror.errors[0].reason = "rateLimitExceeded"is reclassified asRATE_LIMITED, sleeps onRetry-After, retries successfully, and does NOT emit aconfig_errortrace.test_non_rate_limit_403_is_not_retried— negative guard: 403 withreason = "insufficientPermissions"must NOT trigger theRetry-Afterbackoff (catches the bug wherehttp_codes: [403]is OR'd with the predicate).All 11 tests pass locally via
poetry run pytest unit_tests/test_streams.py -v.Breaking Change Evaluation
Not a breaking change:
start_date,num_workers), not required, no config migration needed._messages_for_detailsis defined underdefinitions.streamsbut deliberately not listed under the top-levelstreams:block, so it is not discoverable and cannot be selected.messages_detailsnow emits state, but there is no prior state to invalidate — first run builds up cursor state. Publicmessagesstays full-refresh (no state).→ Standard MINOR bump,
0.0.50→0.1.0.Review guide
airbyte-integrations/connectors/source-gmail/manifest.yaml:messagesis now full-refresh with an optionalstart_date-gatedq=(pre-0.1.0 semantics + new knob)._messages_for_detailsis defined but intentionally omitted from the top-levelstreams:list.messages_details'sParentStreamConfig.streampoints at_messages_for_details, withincremental_dependency: true.WaitTimeFromHeader+ExponentialBackoffStrategybackoff; 429 viahttp_codes; 403rateLimitExceededviapredicate(crucially NOThttp_codes: [403]— see inline comment).concurrency_level+spec.num_workers.airbyte-integrations/connectors/source-gmail/unit_tests/test_streams.py— 11 runtime tests viaYamlDeclarativeSource+requests_mock.airbyte-integrations/connectors/source-gmail/metadata.yaml—dockerImageTagbumped to0.1.0.docs/integrations/sources/gmail.md— changelog entry.User Impact
messages_detailssyncs become drastically cheaper on subsequent runs (only new ids / bodies since last cursor).start_dateonmessages,threads,drafts, and (implicitly)messages_details— server-side, unix seconds, no PST drift.num_workersknob.rateLimitExceededresponses are handled viaRetry-Afterwith an exponential fallback. Auth/scope 403s (e.g.insufficientPermissions) continue to fail fast as config errors — asserted by a negative test.messagesstandalone (withoutmessages_details) see the same behaviour as v0.0.50, aside from the new optionalstart_dateknob. Records are not silently dropped.Can this PR be safely reverted and rolled back?
Link to Devin session: https://app.devin.ai/sessions/c20c8aa034984b66ba7be1de599b1543