Skip to content

Commit a5ebf18

Browse files
committed
Fix Recommendations AI catalog outputs and sklearn pins for 3.14
1 parent 60723cd commit a5ebf18

File tree

4 files changed

+23
-11
lines changed

4 files changed

+23
-11
lines changed

sdks/python/apache_beam/examples/inference/sklearn_examples_requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@
2020
# However, newer sklearn is needed for testing on newer Python version
2121
scikit-learn==1.0.2; python_version < '3.11'
2222
# bump sklearn version when new Python version is supported
23-
scikit-learn==1.7.1; python_version >= '3.11'
23+
scikit-learn==1.7.1; python_version >= '3.11' and python_version < '3.14'
24+
scikit-learn==1.7.2; python_version >= '3.14'

sdks/python/apache_beam/ml/gcp/recommendations_ai.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,19 @@ def get_recommendation_user_event_client():
7979

8080

8181
class CreateCatalogItem(PTransform):
82-
"""Creates catalogitem information.
83-
The ``PTransform`` returns a PCollectionTuple with a PCollections of
84-
successfully and failed created CatalogItems.
82+
"""Creates catalog item records.
83+
84+
The ``PTransform`` returns a ``PCollectionTuple`` of successfully created
85+
catalog items (``created_catalog_items``) and failures
86+
(``failed_catalog_items``).
8587
8688
Example usage::
8789
88-
pipeline | CreateCatalogItem(
89-
project='example-gcp-project',
90-
catalog_name='my-catalog')
90+
result = (
91+
pipeline
92+
| CreateCatalogItem(
93+
project='example-gcp-project', catalog_name='my-catalog'))
94+
created = result.created_catalog_items
9195
"""
9296
def __init__(
9397
self,
@@ -123,13 +127,15 @@ def expand(self, pcoll):
123127
raise ValueError(
124128
"""GCP project name needs to be specified in "project" pipeline
125129
option""")
126-
return pcoll | ParDo(
130+
pardo = ParDo(
127131
_CreateCatalogItemFn(
128132
self.project,
129133
self.retry,
130134
self.timeout,
131135
self.metadata,
132136
self.catalog_name))
137+
return pcoll | pardo.with_outputs(
138+
FAILED_CATALOG_ITEMS, main='created_catalog_items')
133139

134140

135141
class _CreateCatalogItemFn(DoFn):

sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,12 @@ def test_CreateCatalogItem(self):
6565
return_value=self._mock_client):
6666
p = beam.Pipeline()
6767

68-
_ = (
68+
create_outputs = (
6969
p | "Create data" >> beam.Create([self._catalog_item])
7070
| "Create CatalogItem" >>
7171
recommendations_ai.CreateCatalogItem(project="test"))
72+
_ = create_outputs.created_catalog_items | beam.combiners.Count.Globally()
73+
_ = create_outputs.failed_catalog_items | beam.combiners.Count.Globally()
7274

7375
result = p.run()
7476
result.wait_until_finish()

sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,14 @@ def test_create_catalog_item(self):
7777

7878
with TestPipeline(is_integration_test=True) as p:
7979
RecommendationAIIT.test_ran = True
80-
output = (
80+
create_outputs = (
8181
p | 'Create data' >> beam.Create([CATALOG_ITEM])
8282
| 'Create CatalogItem' >>
83-
recommendations_ai.CreateCatalogItem(project=GCP_TEST_PROJECT)
83+
recommendations_ai.CreateCatalogItem(project=GCP_TEST_PROJECT))
84+
output = (
85+
create_outputs.created_catalog_items
8486
| beam.ParDo(extract_id) | beam.combiners.ToList())
87+
_ = create_outputs.failed_catalog_items | beam.combiners.Count.Globally()
8588

8689
assert_that(output, equal_to([[CATALOG_ITEM["id"]]]))
8790

0 commit comments

Comments
 (0)