-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Support system ingest pipelines for bulk update operations #18277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Support system ingest pipelines for bulk update operations #18277
Conversation
Signed-off-by: Andy Qin <[email protected]>
❌ Gradle check result for f8169f3: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
❌ Gradle check result for 94653c1: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Andy Qin <[email protected]>
94653c1
to
959b59f
Compare
❌ Gradle check result for 959b59f: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java
Outdated
Show resolved
Hide resolved
// This means the pipeline is only executed on the partial doc, which may not contain all fields. | ||
// System ingest pipelines and processors should handle these cases individually. | ||
boolean indexRequestHasPipeline = false; | ||
switch (updateRequest.getType()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simplify the logic as:
If(updateRequest.upsertRequest()!=null){
indexRequestHasPipeline |= ingestService.resolvePipelines(actionRequest, updateRequest.upsertRequest(), metadata);
}
if(updateRequest.doc() != null){
if(updateRequest.docAsUpsert()){
indexRequestHasPipeline |= ingestService.resolvePipelines(actionRequest, updateRequest.doc(), metadata);
}else{
indexRequestHasPipeline |= ingestService. resolveSystemIngestPipeline(actionRequest, updateRequest.doc(), metadata);
}
}
Seems currently we don't care about the script and it just complicate the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
BulkItemResponse.Failure failure = new BulkItemResponse.Failure( | ||
indexRequest.index(), | ||
indexRequest.id(), | ||
e, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say our update request has doc and upsert. The getIndexWriteRequest
function will return upsert. What if the upsert is processed successfully but we failed to process the doc? In that case we will attach the exception to the wrong request.
So I think we need to pass the inner slot to this function to handle the update request properly. If both doc and upsert failed ideally I think we should return both failures. But it will require significant changes so we can just take one for now. But we should map the failure to the request properly for the update request case.
Similar comment to the markItemAsDropped function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in case it is should be okay to use getIndexWriteRequest
because the failed request is used to create a single BulkItemResponse.Failure
using indexRequest.index()
, indexRequest.id()
, and indexRequest.opType()
. Since the doc and upsert are from the same parent UpdateRequest
, they should share the same index
, id
, and opType
already as well so the same information would be retrieved either child request. Looking at BulkItemResponse.Failure
the failure information does not contain detailed information mapping to a specific index request other than those above shared values as well.
Agree that ideally we should be able to surface both exceptions to the user in the case where both fail but it would require significant refactoring, maybe it's a task we could take on for #17742. From user perspective though, regardless if one or both fail, they need to retry the request anyways.
onCompletion.accept(originalThread, null); | ||
List<IndexRequest> childIndexRequests = new ArrayList<>(); | ||
if (actionRequest instanceof UpdateRequest updateRequest) { | ||
childIndexRequests.addAll(updateRequest.getChildIndexRequests()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider to add this logic to getIndexWriteRequest
and let it return a list of requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered this as well, it requires some minor refactoring wherever we call getIndexWriteRequest
to make it work (changing null checks to isEmpty checks and extracting the inner element of the list). The way I see it, UpdateRequests (and the case where an update has two child index requests) are the edge case that we handle separately, otherwise its fine to call getIndexWriteRequest
directly. So its okay to have it separate.
if (IngestService.NOOP_PIPELINE_NAME.equals(systemPipelineId) == false) { | ||
pipelinesInfoList.add(new IngestPipelineInfo(systemPipelineId, IngestPipelineType.SYSTEM_FINAL)); | ||
} | ||
if (pipelineId != null && IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check pipelineId != null
since null doesn't equal NOOP_PIPELINE_NAME.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case where pipelineId = null
then:
IngestService.NOOP_PIPELINE_NAME.equals(null)
evaluates tofalse
false == false
evaluates totrue
which enters the block
So then null pipeline is added to the pipelinesInfoList which causes an NPE later on.
The null check avoids this NPE, so I think it's needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ic I think you are right.
@@ -1196,7 +1242,8 @@ private Pipeline getPipeline( | |||
// In very edge case it is possible the cache is invalidated after we resolve the | |||
// pipeline. So try to resolve the system ingest pipeline again here. | |||
if (indexPipeline == null) { | |||
final String newPipelineId = resolveSystemIngestPipeline(actionRequest, indexRequest, state.metadata()); | |||
resolveSystemIngestPipeline(actionRequest, indexRequest, state.metadata()); | |||
final String newPipelineId = indexRequest.getSystemIngestPipeline(); | |||
// set it as NOOP to avoid duplicated execution after we switch back to the write thread | |||
indexRequest.setSystemIngestPipeline(NOOP_PIPELINE_NAME); | |||
indexPipeline = systemIngestPipelineCache.getSystemIngestPipeline(newPipelineId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I have an issue in getSystemIngestPipeline
that it should not mark the input as NonNull. Could you help remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain a little more? What's the issue and which input is being marked non-null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In getSystemIngestPipeline
I mark the pipeline id as non-null but I think it's ok to allow to pass a null. Remove the non-null check can make it more resilient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, fixed in latest revision
❌ Gradle check result for b2c9db0: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
b2c9db0
to
fbafb82
Compare
❌ Gradle check result for fbafb82: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
fbafb82
to
2341708
Compare
❌ Gradle check result for 2341708: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Andy Qin <[email protected]>
2341708
to
f5aa904
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #18277 +/- ##
============================================
+ Coverage 72.44% 72.53% +0.09%
- Complexity 67256 67397 +141
============================================
Files 5488 5488
Lines 310994 311063 +69
Branches 45212 45228 +16
============================================
+ Hits 225288 225624 +336
+ Misses 67297 67088 -209
+ Partials 18409 18351 -58 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Description
Support system ingest pipelines for bulk update operations
Update Request Type Classification
Pipeline Resolution Enhancement
resolveSystemIngestPipeline
to enable resolving only the system ingest pipeline while setting the others to NOOPSlot Management
innerSlot
to track individual child index requests within anupdate operationRelated Issues
Resolves #18276
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.