Skip to content

[Feature Request] Ingestion management APIs for pull-based ingestion #17442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
varunbharadwaj opened this issue Feb 24, 2025 · 1 comment · Fixed by #18332
Closed

[Feature Request] Ingestion management APIs for pull-based ingestion #17442

varunbharadwaj opened this issue Feb 24, 2025 · 1 comment · Fixed by #18332
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing

Comments

@varunbharadwaj
Copy link
Contributor

varunbharadwaj commented Feb 24, 2025

Is your feature request related to a problem? Please describe

Create ingestion management APIs for pull-based ingestion for the following.

  1. Pause and resume ingestion.
  2. Update offset.
  3. Get ingestion state.
  4. Update error strategy (update_settings API will be used)

As a separate effort, we also plan to support custom consumer configurations and add an API to update consumer config.

Describe the solution you'd like

Following APIs will be created.

Pause Ingestion

POST /<index>/ingestion/_pause

Path parameters:
index: the index for which ingestion needs to be paused.

Description:
This API can be used to pause ingestion for a given index. Ingestion will be paused on all shards for the provided index. This API will update the cluster state to reflect ingestion pause and also run pause operation on each shard. The response will include request and shard level acknowledgements.

Response:

{
    "acknowledged": true/false,
    "shards_acknowledged": true/false,
    "error": "error message if any",
    "failures": {
        "indexName": [
            {
                "shard": 0,
                "error": "error message "
            }
        ]
    }
}

Resume Ingestion

POST /<index>/ingestion/_resume

Path parameters:
index: the index for which ingestion needs to be resumed.

Description:
This API can be used to resume ingestion for a given index. All shards for the provided index will be resumed. Optionally, a list of reset settings can be provided for a subset of shards. If reset settings are provided, the consumers will first be reset following which ingestion will be resumed. Ingestion will only be resumed if all consumers are successfully reset.

Resume operation will first update the cluster state following which each shard will be resumed. The response will indicate request and shard level acknowledgements.

Request:

{
  "reset_settings": [
    {
       "shard": 0,
       "mode": "offset/timestamp",
       "value": "1"
    }
  ]
}

Response:

{
    "acknowledged": true/false,
    "shards_acknowledged": true/false,
    "error": "error message if any",
    "failures": {
        "indexName": [
            {
                "shard": 0,
                "error": "error message "
            }
        ]
    }
}

Get Ingestion State

GET /<index>/ingestion/_state

Path parameters:
index: the index for which ingestion state needs to be returned.
shards: optional

Description:
This API returns the current state of ingestion for the provided index. Optionally, a list of shards can be provided.
This API supports pagination.

Response:

{
    "_shards": {
        "total": 1,
        "successful": 1,
        "failed": 0,
        "failures": [
            {
                "shard": 0,
                "index": "my-index",
                "status": "INTERNAL_SERVER_ERROR",
                "reason": {
                    "type": "timeout_exception",
                    "reason": "error message"
                }
            }
        ]
    },
    "next_page_token" : "page token if not on last page"
    "ingestion_state": {
        "indexName": [
            {
                "shard": 0,
                "poller_state": "POLLING",
                "error_policy": "DROP",
                "poller_paused": false
            }
        ]
    }
}

Update Error Strategy

update_settings API will be used to update the error strategy. IndexSettingsHandler for “index.ingestion_source.error_strategy” will be registered in the IngestionEngine, which will update the error strategy in the poller and writer threads.

Related component

Indexing

Describe alternatives you've considered

No response

Additional context

No response

@krisfreedain
Copy link
Member

[Catch All Triage - 1, 2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants