fix(openrouter): merge fragmented reasoning_details in streaming#36401
fix(openrouter): merge fragmented reasoning_details in streaming#36401Xi Zhang (X-iZhang) wants to merge 4 commits intolangchain-ai:masterfrom
reasoning_details in streaming#36401Conversation
…i-turn During streaming, AIMessageChunk.__add__ list-concatenates reasoning_details in additional_kwargs, fragmenting a single entry into many. When _convert_message_to_dict() serializes conversation history back to the OpenRouter API, these fragments cause BadRequestResponseError. Add _merge_reasoning_details() that merges consecutive entries sharing the same type and index (streaming fragments) while preserving distinct entries (legitimate non-streaming data). Metadata from later fragments (e.g. signature) is preserved in the merged result. Closes langchain-ai#36400
This comment has been minimized.
This comment has been minimized.
Merging this PR will not alter performance
Comparing Footnotes
|
reasoning_details in streaming
| text_key = ( | ||
| "text" if "text" in entry else "content" if "content" in entry else None | ||
| ) |
There was a problem hiding this comment.
i think this can be refactored, too much nested if else, i know this could be very pythonic but not very much readable, ur call?
| while i < len(details): | ||
| entry = details[i] | ||
| if not isinstance(entry, dict): | ||
| merged.append(entry) | ||
| i += 1 | ||
| continue | ||
|
|
||
| entry_type = entry.get("type", "") | ||
| entry_index = entry.get("index") | ||
| text_key = ( | ||
| "text" if "text" in entry else "content" if "content" in entry else None | ||
| ) | ||
|
|
||
| # Only merge if entry has both a text field and an index. | ||
| # Without index we cannot distinguish fragments from distinct entries. | ||
| if text_key is None or entry_index is None: | ||
| merged.append(entry) | ||
| i += 1 | ||
| continue | ||
|
|
||
| # Merge consecutive fragments (same type + same index) | ||
| texts = [entry.get(text_key, "")] | ||
| base = {k: v for k, v in entry.items() if k != text_key} | ||
| i += 1 | ||
| while i < len(details): | ||
| nxt = details[i] | ||
| if ( | ||
| isinstance(nxt, dict) | ||
| and nxt.get("type") == entry_type | ||
| and nxt.get("index") == entry_index | ||
| ): | ||
| texts.append(nxt.get(text_key, "") or "") | ||
| # Preserve metadata from later fragments (e.g. signature) | ||
| base.update( | ||
| {k: v for k, v in nxt.items() if k != text_key and v is not None} | ||
| ) | ||
| i += 1 | ||
| else: | ||
| break | ||
|
|
||
| base[text_key] = "".join(texts) | ||
| merged.append(base) |
There was a problem hiding this comment.
wow this is such a long while loop, i was wondering where this while loop ends
| while i < len(details): | ||
| nxt = details[i] | ||
| if ( | ||
| isinstance(nxt, dict) | ||
| and nxt.get("type") == entry_type | ||
| and nxt.get("index") == entry_index | ||
| ): | ||
| texts.append(nxt.get(text_key, "") or "") | ||
| # Preserve metadata from later fragments (e.g. signature) | ||
| base.update( | ||
| {k: v for k, v in nxt.items() if k != text_key and v is not None} | ||
| ) | ||
| i += 1 |
There was a problem hiding this comment.
can this be decomposed into another function or so otherwise with 2 while loops, i already lost focus to understand what exactly is it doing. ur cal?
|
Hi Mason Daugherty (@mdrxy), just wanted to check if this is on your radar. Happy to make any changes if needed. Thanks! |
|
Thank, hoping to review soon |
Description
Fixes #36400
During streaming,
AIMessageChunk.__add__list-concatenatesreasoning_detailsinadditional_kwargs, fragmenting a single entry into many. When_convert_message_to_dict()serializes conversation history back to the OpenRouter API for the next turn, these fragmented entries causeBadRequestResponseError.Changes
_merge_reasoning_details()helper that merges consecutive entries sharing the sametypeandindex(streaming fragments) while preserving distinct entries (legitimate non-streaming data)signature) is preserved in the merged resultindexare never merged (safe for non-streaming responses)_merge_reasoning_details()in_convert_message_to_dict()before serializingreasoning_detailsWhy merge instead of drop?
Non-streaming users (
invoke()) rely onreasoning_detailsfor structured metadata (type,signature,format,index). Dropping it entirely would be a regression. This approach fixes streaming while preserving non-streaming functionality, similar tolangchain-openai's_implode_reasoning_blocks().Test plan