Skip to content

[Bug]: RouterQueryEngine _aquery throws RuntimeErrors with more than one Vectorstore #18779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
JonasMedu opened this issue May 19, 2025 · 3 comments
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized

Comments

@JonasMedu
Copy link

JonasMedu commented May 19, 2025

Bug Description

Related: #17349

The method _aquery in the class RouterQueryEngine fires "aquery"s and later delegates the task to be run again, generating RuntimeErrors.

lines 211 to 223:

            if len(result.inds) > 1:
                    [...]
                    tasks.append(selected_query_engine.aquery(query_bundle))
                responses = run_async_tasks(tasks)

I assume the error appears when RouterQueryEngine uses more than one VectorStore or collection. This indeed is when a RuntimeError occurs on my end and in the linked issue. In my case the error is only thrown when using a front end (see below—I replaced the front end interaction with print statements).

The method run_async_tasks generates a list of coroutines, which is than executed via asyncio_run. The method asyncio_run is used to conrurrently execute the coroutines.

I suggest to rely on the newer functionalty of Task Groups to run the coroutines directly in the RouterEngine.
The implementation below shows how the tasks are grouped together and executed before the acombine_responses generates a summary based on the results of all selected collections.

asyncio.TaskGroup.create_task() is a new alternative leveraging structural concurrency; it allows for waiting for a group of related tasks with strong safety guarantees.

import asyncio
[...]
if len(result.inds) > 1:
    responses = []  
    async with asyncio.TaskGroup() as tg:
        task_list = []
        for i, engine_ind in enumerate(result.inds):
            log_str = (
                f"Selecting query engine {engine_ind}: " f"{result.reasons[i]}."
            )
            logger.info(log_str)
            if self._verbose:
                print_text(log_str + "\n", color="pink")
            selected_query_engine = self._query_engines[engine_ind]
            task_list.append(tg.create_task(selected_query_engine.aquery(query_bundle)))
    responses = [task.result() for task in task_list]
    if len(responses) > 1:
        final_response = await acombine_responses(
            self._summarizer, responses, query_bundle
        )
    else:
        final_response = responses[0]

I am using Python 12.9

Version

0.12.35

Steps to Reproduce

  1. Make a Qdrant-instance available to your python run time (i.e. via a docker container).
    for reference, this is llamas Qdrant example
  2. Insert your OpenAI Key.
  3. Install the dependencies (ref. imports.)
import asyncio
from llama_index.core.llama_dataset import download_llama_dataset
from llama_index.core import VectorStoreIndex
import os
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core import StorageContext
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.core import Settings
from llama_index.core.workflow import Context
from llama_index.core.agent.workflow import (
    FunctionAgent,
    AgentWorkflow,
    AgentOutput,
    AgentInput,   
)
from llama_index.core.selectors import LLMMultiSelector
from llama_index.core.indices.base import BaseQueryEngine
from llama_index.core.tools import FunctionTool

from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.core.query_engine import RouterQueryEngine
from qdrant_client import QdrantClient, AsyncQdrantClient

# 1. first load the environment variables for your LLMs I hide this via
## This is how i hide the Mistral API Key init and llama Index Settings 
## from config_loader import LLM
# Copied this from the Qdrant example
os.environ["OPENAI_API_KEY"] = "your_api_key_here"
# download and install dependencies for benchmark dataset

data_collection = 'Uber10KDataset2021'
col_name_2 = "".join([data_collection, "_copy"])
# if data does not exists, download it
if not os.path.exists(f'./data/{data_collection}'):
    rag_dataset, documents = download_llama_dataset(data_collection, './data')
else:
    documents = SimpleDirectoryReader(f'./data/{data_collection}').load_data()


def init_collection_1_and_2():
    client = QdrantClient(host="localhost", port=6333)    
    vector_store1 = QdrantVectorStore(client=client, collection_name=data_collection)
    if vector_store1._collection_exists(data_collection):
        print(f"collection {data_collection} exists")
    else:    
        storage_context1 = StorageContext.from_defaults(vector_store=vector_store1)
        index1 = VectorStoreIndex.from_documents(
            documents,
            storage_context=storage_context1,
        )
        print(index1.as_query_engine().query("What is the total revenue of Uber in 2021?"))

    
    vector_store2 = QdrantVectorStore(client=client, collection_name=col_name_2)
    if vector_store2._collection_exists(col_name_2):
        print(f"collection {col_name_2} exists")
    else:
        storage_context2 = StorageContext.from_defaults(vector_store=vector_store2)
        index2 = VectorStoreIndex.from_documents(
            documents,
            storage_context=storage_context2,
        )
        print(index2.as_query_engine().query("What is the total revenue of Uber in 2021?"))

    # test if it works
def build_workflow_and_context():
    aqdrant_client = AsyncQdrantClient(host="localhost", port=6333)

    def get_query_engine(collection_name: str) -> BaseQueryEngine:    
        vector_store: QdrantVectorStore = QdrantVectorStore(aclient=aqdrant_client, collection_name=collection_name)
        index: VectorStoreIndex = VectorStoreIndex.from_vector_store( vector_store)
        return index.as_query_engine(use_async=True)

    query_engine_tools = [
        QueryEngineTool(
            query_engine=get_query_engine(data_collection),
            metadata=ToolMetadata(
                name=data_collection,
                description="Public data for {data_collection}",
        )),
        QueryEngineTool(
            query_engine=get_query_engine(col_name_2),
            metadata=ToolMetadata(
                name=col_name_2,
                description="Public data for {data_collection}",
        )),
    ]


    router_engine = RouterQueryEngine(
        selector=LLMMultiSelector.from_defaults(
            prompt_template_str = "Given a user query, select a collection."
            ),
        query_engine_tools=query_engine_tools,
        verbose=True,
    )

    async def query_router_engine(query: str) -> str:
        response = await router_engine.aquery(query)
        return str(response)

    retrieve_tool = FunctionTool.from_defaults(
        async_fn=query_router_engine,
        name="RagAgent",
        description=f"The RagAgent for Uber finance data  collections."
    )
    response_agent = FunctionAgent(
        name="GeneralAgent",
        description="The GeneralAgent communicates with the user",
        system_prompt="""
            You are the GeneralAgent. Your task is to query the available knowledge resources. Make sure to leverage all available collections.
        """,
        tools=[retrieve_tool]
    )
    workflow = AgentWorkflow(
        agents=[response_agent],
        root_agent=response_agent.name
    )
    ctx = Context(workflow=workflow)
    return workflow, ctx


async def main():
    user_request = "What is the total revenue for Uber in 2021 ?"

    workflow, ctx = build_workflow_and_context()
    
    handler = workflow.run(
        user_msg=user_request,
        ctx=ctx,
        verbose=True
    )
    async for event in handler.stream_events():
        if isinstance(event, AgentInput):
            print(f"========{event.current_agent_name}:=========>")
            print(event.input)
            print("=================<")
        if isinstance(event, AgentOutput) and event.response.content:
            print("<================>")
            print(f"{event.current_agent_name}: {event.response.content}")
            print("<================>")



if __name__ == "__main__":
    init_collection_1_and_2()
    asyncio.run(main())

Relevant Logs/Tracbacks

See the Error Message: "Detected nested async. Please use nest_asyncio.apply() to allow nested event loops.Or, use async entry methods like aquery(), aretriever, achat, etc." in the verbose Log.

collection Uber10KDataset2021 exists
collection Uber10KDataset2021_copy exists
Keyword arguments are not supported when 'run()' is invoked with the 'start_event' parameter. These keyword arguments will be ignored: {'verbose': True}
========GeneralAgent:=========>
[ChatMessage(role=<MessageRole.SYSTEM: 'system'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='\n            You are the GeneralAgent. Your task is to query the available knowledge resources. Make sure to leverage all available collections.\n        ')]), ChatMessage(role=<MessageRole.USER: 'user'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='What is the total revenue for Uber in 2021 ?')])]
=================<
Selecting query engine 0: This choice is selected because it best fits the criteria of the user query..
Selecting query engine 1: This choice is selected because it provides additional relevant information to the user query..
========GeneralAgent:=========>
[ChatMessage(role=<MessageRole.SYSTEM: 'system'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='\n            You are the GeneralAgent. Your task is to query the available knowledge resources. Make sure to leverage all available collections.\n        ')]), ChatMessage(role=<MessageRole.USER: 'user'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='What is the total revenue for Uber in 2021 ?')]), ChatMessage(role=<MessageRole.ASSISTANT: 'assistant'>, additional_kwargs={'tool_calls': [ToolCall(function=FunctionCall(name='RagAgent', arguments='{"query": "What is the total revenue for Uber in 2021 ?"}'), id='w8jKyCeKk', type=None, index=0)]}, blocks=[TextBlock(block_type='text', text='')]), ChatMessage(role=<MessageRole.TOOL: 'tool'>, additional_kwargs={'tool_call_id': 'w8jKyCeKk'}, blocks=[TextBlock(block_type='text', text='Detected nested async. Please use nest_asyncio.apply() to allow nested event loops.Or, use async entry methods like `aquery()`, `aretriever`, `achat`, etc.')])]
=================<
Selecting query engine 0: This collection is selected because it matches the user's query criteria..
Selecting query engine 1: This collection is selected because it contains relevant information to the user's query..
========GeneralAgent:=========>
[ChatMessage(role=<MessageRole.SYSTEM: 'system'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='\n            You are the GeneralAgent. Your task is to query the available knowledge resources. Make sure to leverage all available collections.\n        ')]), ChatMessage(role=<MessageRole.USER: 'user'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='What is the total revenue for Uber in 2021 ?')]), ChatMessage(role=<MessageRole.ASSISTANT: 'assistant'>, additional_kwargs={'tool_calls': [ToolCall(function=FunctionCall(name='RagAgent', arguments='{"query": "What is the total revenue for Uber in 2021 ?"}'), id='w8jKyCeKk', type=None, index=0)]}, blocks=[TextBlock(block_type='text', text='')]), ChatMessage(role=<MessageRole.TOOL: 'tool'>, additional_kwargs={'tool_call_id': 'w8jKyCeKk'}, blocks=[TextBlock(block_type='text', text='Detected nested async. Please use nest_asyncio.apply() to allow nested event loops.Or, use async entry methods like `aquery()`, `aretriever`, `achat`, etc.')]), ChatMessage(role=<MessageRole.ASSISTANT: 'assistant'>, additional_kwargs={'tool_calls': [ToolCall(function=FunctionCall(name='RagAgent', arguments='{"query": "What is the total revenue for Uber in 2021 ?"}'), id='xI20fyhbU', type=None, index=0)]}, blocks=[TextBlock(block_type='text', text='')]), ChatMessage(role=<MessageRole.TOOL: 'tool'>, additional_kwargs={'tool_call_id': 'xI20fyhbU'}, blocks=[TextBlock(block_type='text', text='Detected nested async. Please use nest_asyncio.apply() to allow nested event loops.Or, use async entry methods like `aquery()`, `aretriever`, `achat`, etc.')])]
=================<
<================>
GeneralAgent: The total revenue for Uber in 2021 was $17.45 billion.
@JonasMedu JonasMedu added bug Something isn't working triage Issue needs to be triaged/prioritized labels May 19, 2025
@logan-markewich
Copy link
Collaborator

@JonasMedu task groups are added in python3.11. Since we support python3.9+, we cannot use them

@run-llama run-llama deleted a comment from dosubot bot May 19, 2025
@logan-markewich
Copy link
Collaborator

Also, the code provided is not enough to reproduce. What vector store are you using?

@JonasMedu
Copy link
Author

@logan-markewich, thank you for your response.
I created an example. I unfortunately was not able to isolate the async error, as a result the code is clode to the real issue, but also requires dependencies.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized
Projects
None yet
Development

No branches or pull requests

2 participants