Skip to content

[Bug]: Human-in-the-loop not working correctly with Nested Workflow #18848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
main-2983 opened this issue May 26, 2025 · 7 comments
Open

[Bug]: Human-in-the-loop not working correctly with Nested Workflow #18848

main-2983 opened this issue May 26, 2025 · 7 comments
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized

Comments

@main-2983
Copy link

Bug Description

I try to replicate this example for Human-in-the-loop, but using nested workflow. I got TimeOut: llama_index.core.workflow.errors.WorkflowTimeoutError: Operation timed out after 10.0 seconds

Version

0.12.37

Steps to Reproduce

First I rewrite the HumanInTheLoopWorkflow to accept another workflow

class HumanInTheLoopWorkflow2(Workflow):
    @step
    async def step0(self, ev: StartEvent) -> Event1:
        return Event1()

    @step
    async def step1(self, ev: Event1, wf: HITLWorkflow) -> StopEvent:
        result = await wf.run()

        return StopEvent(result)

The actually Human in the Loop sub-workflow

class HITLWorkflow(Workflow):
    @step
    async def step1(self, ev: StartEvent) -> InputRequiredEvent:
        return InputRequiredEvent(prefix="Enter a number: ")

    @step
    async def fin(self, ev: HumanResponseEvent) -> StopEvent:
        return StopEvent(ev.response)

Then I run the workflow using the code snippet

async def main():
    # workflow should work with streaming
    workflow = HumanInTheLoopWorkflow2()
    sub_wf = HITLWorkflow()

    workflow.add_workflows(wf=sub_wf)

    handler = workflow.run()

    async for event in handler.stream_events():
        print(event)
        if isinstance(event, InputRequiredEvent):
            # here, we can handle human input however you want
            # this means using input(), websockets, accessing async state, etc.
            # here, we just use input()
            response = input(event.prefix)
            handler.ctx.send_event(HumanResponseEvent(response=response))

    final_result = await handler

    print(final_result)


if __name__ == '__main__':
    import asyncio

    asyncio.run(main())

Relevant Logs/Tracbacks

Exception in callback Dispatcher.span.<locals>.wrapper.<locals>.handle_future_result(span_id='Workflow.run...-6e45d4fb0827', bound_args=<BoundArguments ()>, instance=<__main__.HIT...0015750096090>, context=<_contextvars...0015750097C80>)(<WorkflowHandler cancelled>) at E:\Code\Miniconda\conda\envs\llmapi\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py:276
handle: <Handle Dispatcher.span.<locals>.wrapper.<locals>.handle_future_result(span_id='Workflow.run...-6e45d4fb0827', bound_args=<BoundArguments ()>, instance=<__main__.HIT...0015750096090>, context=<_contextvars...0015750097C80>)(<WorkflowHandler cancelled>) at E:\Code\Miniconda\conda\envs\llmapi\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py:276>
Traceback (most recent call last):
  File "E:\Code\Miniconda\conda\envs\llmapi\Lib\asyncio\events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "E:\Code\Miniconda\conda\envs\llmapi\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 286, in handle_future_result
    exception = future.exception()
                ^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError
Exception in callback Dispatcher.span.<locals>.wrapper.<locals>.handle_future_result(span_id='Workflow.run...-701142d08a97', bound_args=<BoundArguments ()>, instance=<__main__.Hum...0015750095B10>, context=<_contextvars...0015750096180>)(<WorkflowHand...0.0 seconds')>) at E:\Code\Miniconda\conda\envs\llmapi\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py:276
handle: <Handle Dispatcher.span.<locals>.wrapper.<locals>.handle_future_result(span_id='Workflow.run...-701142d08a97', bound_args=<BoundArguments ()>, instance=<__main__.Hum...0015750095B10>, context=<_contextvars...0015750096180>)(<WorkflowHand...0.0 seconds')>) at E:\Code\Miniconda\conda\envs\llmapi\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py:276>
Traceback (most recent call last):
  File "E:\Code\Miniconda\conda\envs\llmapi\Lib\asyncio\events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "E:\Code\Miniconda\conda\envs\llmapi\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 288, in handle_future_result
    raise exception
  File "E:\Code\PyCharm\Projects\RAGDemo\main.py", line 64, in main
    final_result = await handler
                   ^^^^^^^^^^^^^
  File "E:\Code\Miniconda\conda\envs\llmapi\Lib\site-packages\llama_index\core\workflow\workflow.py", line 410, in _run_workflow
    raise WorkflowTimeoutError(msg)
llama_index.core.workflow.errors.WorkflowTimeoutError: Operation timed out after 10.0 seconds
Task exception was never retrieved
future: <Task finished name='Task-23' coro=<Workflow.run.<locals>._run_workflow() done, defined at E:\Code\Miniconda\conda\envs\llmapi\Lib\site-packages\llama_index\core\workflow\workflow.py:352> exception=InvalidStateError('invalid state')>
Traceback (most recent call last):
  File "E:\Code\Miniconda\conda\envs\llmapi\Lib\site-packages\llama_index\core\workflow\workflow.py", line 410, in _run_workflow
    raise WorkflowTimeoutError(msg)
llama_index.core.workflow.errors.WorkflowTimeoutError: Operation timed out after 10.0 seconds

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "E:\Code\Miniconda\conda\envs\llmapi\Lib\site-packages\llama_index\core\workflow\workflow.py", line 414, in _run_workflow
    result.set_exception(e)
asyncio.exceptions.InvalidStateError: invalid state
Traceback (most recent call last):
  File "E:\Code\PyCharm\Projects\RAGDemo\main.py", line 72, in <module>
    asyncio.run(main())
  File "E:\Code\Miniconda\conda\envs\llmapi\Lib\asyncio\runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "E:\Code\Miniconda\conda\envs\llmapi\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "E:\Code\Miniconda\conda\envs\llmapi\Lib\asyncio\base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "E:\Code\PyCharm\Projects\RAGDemo\main.py", line 64, in main
    final_result = await handler
                   ^^^^^^^^^^^^^
  File "E:\Code\Miniconda\conda\envs\llmapi\Lib\site-packages\llama_index\core\workflow\workflow.py", line 410, in _run_workflow
    raise WorkflowTimeoutError(msg)
llama_index.core.workflow.errors.WorkflowTimeoutError: Operation timed out after 10.0 seconds
@main-2983 main-2983 added bug Something isn't working triage Issue needs to be triaged/prioritized labels May 26, 2025
Copy link

dosubot bot commented May 26, 2025

The timeout error you’re seeing (WorkflowTimeoutError: Operation timed out after 10.0 seconds) is due to the default timeout set on LlamaIndex workflows. By default, each workflow (including nested ones) has a 10-second timeout, which can be too short for human-in-the-loop steps that wait for user input.

To resolve this, you should explicitly set the timeout parameter to None (for no timeout) or to a higher value when you instantiate your workflows. For example:

workflow = HumanInTheLoopWorkflow2(timeout=None)
sub_wf = HITLWorkflow(timeout=None)

This approach is supported by both the documentation and similar issues reported by other users. Setting timeout=None in the workflow constructor will prevent the workflow from timing out while waiting for human input or other long-running steps. This is especially important for nested workflows, as each one manages its own timeout independently. You can see an example of this solution in a related issue and in the documentation for nested workflows:

If you want a finite timeout, you can set it to a higher value (e.g., timeout=60 for 60 seconds).

In summary:
Set timeout=None (or a higher value) in both your main and nested workflow constructors to prevent the timeout error in human-in-the-loop scenarios.

To continue talking to Dosu, mention @dosu-bot.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other

@main-2983
Copy link
Author

main-2983 commented May 26, 2025

The timeout error you’re seeing (WorkflowTimeoutError: Operation timed out after 10.0 seconds) is due to the default timeout set on LlamaIndex workflows. By default, each workflow (including nested ones) has a 10-second timeout, which can be too short for human-in-the-loop steps that wait for user input.

To resolve this, you should explicitly set the timeout parameter to None (for no timeout) or to a higher value when you instantiate your workflows. For example:

workflow = HumanInTheLoopWorkflow2(timeout=None)
sub_wf = HITLWorkflow(timeout=None)
This approach is supported by both the documentation and similar issues reported by other users. Setting timeout=None in the workflow constructor will prevent the workflow from timing out while waiting for human input or other long-running steps. This is especially important for nested workflows, as each one manages its own timeout independently. You can see an example of this solution in a related issue and in the documentation for nested workflows:

If you want a finite timeout, you can set it to a higher value (e.g., timeout=60 for 60 seconds).

In summary: Set timeout=None (or a higher value) in both your main and nested workflow constructors to prevent the timeout error in human-in-the-loop scenarios.

To continue talking to Dosu, mention @dosu-bot.

Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other

When setting timeout=None, the process does not progress and the prefix of InputRequiredEvent: ("Enter a number: ") is not even printed in the terminal
@dosubot

@logan-markewich
Copy link
Collaborator

@main-2983 the two workflows aren't really connected. Your outer workflow needs tk expose the events of the inner workflow

Copy link

dosubot bot commented May 26, 2025

When setting timeout=None, the workflow should wait indefinitely for tasks to complete, which is the intended way to support human-in-the-loop steps that require user input. However, your observation that the process does not progress and the InputRequiredEvent prefix is not printed suggests that something is blocking or not triggering the event loop as expected.

Here’s what is known and what you can try:

  • The LlamaIndex workflow engine uses asyncio.wait with the provided timeout. If timeout=None, it should wait forever for tasks to complete, and human-in-the-loop examples (like ChooseYourOwnAdventureWorkflow) do set timeout=None successfully in some cases (source).
  • There are no documented extra configuration steps required for timeout=None, and the official documentation and examples do not mention any caveats for this setting (source).
  • The only related issue found (#18773) suggests setting timeout=None but does not resolve or address the problem of the workflow not progressing.

Potential causes and workarounds:

  1. Event Loop/Nesting Issues:
    If you are running this code in an environment that already has an event loop (e.g., Jupyter, some IDEs), asyncio.run() may not work as expected. In the official example, nest_asyncio.apply() is used to allow nested event loops. Try adding:

    import nest_asyncio
    nest_asyncio.apply()

    before running your main function.

  2. Handler Initialization:
    Double-check that you are awaiting the correct handler and that the workflow is started as intended. Sometimes, if the handler is not properly awaited or the event stream is not consumed, events may not be triggered.

  3. Debug Logging:
    Add print/logging statements before and after each major step (especially before yielding or printing the InputRequiredEvent) to see where the process is stalling.

  4. Finite Timeout as a Diagnostic:
    Try setting a large finite timeout (e.g., timeout=600) instead of None to see if the behavior changes. If it works, it may indicate an edge case or bug with how indefinite timeouts are handled in your environment or version.

  5. Version/Environment:
    There may be a version-specific or environment-specific bug. The official examples are designed for recent LlamaIndex versions and standard Python environments. If you are running in a non-standard environment, try running the minimal example in a plain Python script.

Summary:
There is no documented workaround or known bug for this specific issue in the official docs or issues. The official pattern is to use timeout=None for human-in-the-loop, and it should work as shown in the ChooseYourOwnAdventureWorkflow example. If it does not, it is likely due to an environment or event loop issue. Try the suggestions above, especially using nest_asyncio if you are in a notebook or interactive environment.

If the problem persists, consider sharing a minimal reproducible example and details about your runtime environment for further investigation.

To continue talking to Dosu, mention @dosu-bot.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other

@main-2983
Copy link
Author

@main-2983 the two workflows aren't really connected. Your outer workflow needs tk expose the events of the inner workflow

@logan-markewich Can you please provide me with examples? I found nothing on the llama-index documentation page

@main-2983
Copy link
Author

@logan-markewich sorry for tagging you again, but can you provide me example how to "expose the events of the inner workflow" in a nested workflow? Or can you tell me what documentation/ code I need to explore to understand this concept

@logan-markewich
Copy link
Collaborator

@main-2983 What I mean is something like this

class HumanInTheLoopWorkflow2(Workflow):
    @step
    async def step0(self, ev: StartEvent) -> Event1:
        return Event1()

    @step
    async def step1(self, ctx: Context, ev: Event1, wf: HITLWorkflow) -> StopEvent:
        # Do not just run the workflow
        # result = await wf.run()
        handler = wf.run()
        async for ev in handler.stream_events():
            if isinstance(ev, InputRequiredEvent):
                response_ev = await ctx.wait_for_event(HumanResponseEvent, waiter_event=ev)
                handler.ctx.send_event(response_ev)
                break
        result = await handler

        return StopEvent(result)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized
Projects
None yet
Development

No branches or pull requests

2 participants