Skip to content

ES|QL: Add support for LOOKUP JOIN on aliases #128519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128519.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128519
summary: Add support for LOOKUP JOIN on aliases
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_RERANK_ADDED = def(9_080_0_00);
public static final TransportVersion SETTINGS_IN_DATA_STREAMS_DRY_RUN = def(9_081_0_00);
public static final TransportVersion ML_INFERENCE_SAGEMAKER_CHAT_COMPLETION = def(9_082_0_00);
public static final TransportVersion JOIN_ON_ALIASES = def(9_083_0_00);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ tasks.named("yamlRestCompatTestTransform").configure({ task ->
task.replaceValueInMatch("Size", 49, "Test flamegraph from test-events")
task.skipTest("esql/90_non_indexed/fetch", "Temporary until backported")
task.skipTest("esql/63_enrich_int_range/Invalid age as double", "TODO: require disable allow_partial_results")
task.skipTest("esql/191_lookup_join_on_datastreams/data streams not supported in LOOKUP JOIN", "Added support for aliases in JOINs")
})

tasks.named('yamlRestCompatTest').configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.RangeFieldMapper;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.internal.AliasFilter;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -46,6 +47,7 @@
*/
public abstract class QueryList {
protected final SearchExecutionContext searchExecutionContext;
protected final AliasFilter aliasFilter;
protected final MappedFieldType field;
protected final Block block;
@Nullable
Expand All @@ -54,10 +56,12 @@ public abstract class QueryList {
protected QueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block,
OnlySingleValueParams onlySingleValueParams
) {
this.searchExecutionContext = searchExecutionContext;
this.aliasFilter = aliasFilter;
this.field = field;
this.block = block;
this.onlySingleValueParams = onlySingleValueParams;
Expand All @@ -74,7 +78,7 @@ int getPositionCount() {
* Returns a copy of this query list that only returns queries for single-valued positions.
* That is, it returns `null` queries for either multivalued or null positions.
* <p>
* Whenever a multi-value position is encountered, whether in the input block or in the queried index, a warning is emitted.
* Whenever a multi-value position is encountered, whether in the input block or in the queried index, a warning is emitted.
* </p>
*/
public abstract QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage);
Expand All @@ -93,6 +97,17 @@ final Query getQuery(int position) {

Query query = doGetQuery(position, firstValueIndex, valueCount);

if (aliasFilter != null && aliasFilter != AliasFilter.EMPTY) {
BooleanQuery.Builder builder = new BooleanQuery.Builder();
builder.add(query, BooleanClause.Occur.FILTER);
try {
builder.add(aliasFilter.getQueryBuilder().toQuery(searchExecutionContext), BooleanClause.Occur.FILTER);
query = builder.build();
} catch (IOException e) {
throw new UncheckedIOException("Error while building query for alias filter", e);
}
}

if (onlySingleValueParams != null) {
query = wrapSingleValueQuery(query);
}
Expand Down Expand Up @@ -138,7 +153,12 @@ private Query wrapSingleValueQuery(Query query) {
* using only the {@link ElementType} of the {@link Block} to determine the
* query.
*/
public static QueryList rawTermQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, Block block) {
public static QueryList rawTermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block
) {
IntFunction<Object> blockToJavaObject = switch (block.elementType()) {
case BOOLEAN -> {
BooleanBlock booleanBlock = (BooleanBlock) block;
Expand Down Expand Up @@ -170,17 +190,22 @@ public static QueryList rawTermQueryList(MappedFieldType field, SearchExecutionC
case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't read values from [aggregate metric double] block");
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
};
return new TermQueryList(field, searchExecutionContext, block, null, blockToJavaObject);
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, blockToJavaObject);
}

/**
* Returns a list of term queries for the given field and the input block of
* {@code ip} field values.
*/
public static QueryList ipTermQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, BytesRefBlock block) {
public static QueryList ipTermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
BytesRefBlock block
) {
BytesRef scratch = new BytesRef();
byte[] ipBytes = new byte[InetAddressPoint.BYTES];
return new TermQueryList(field, searchExecutionContext, block, null, offset -> {
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, offset -> {
final var bytes = block.getBytesRef(offset, scratch);
if (ipBytes.length != bytes.length) {
// Lucene only support 16-byte IP addresses, even IPv4 is encoded in 16 bytes
Expand All @@ -195,10 +220,16 @@ public static QueryList ipTermQueryList(MappedFieldType field, SearchExecutionCo
* Returns a list of term queries for the given field and the input block of
* {@code date} field values.
*/
public static QueryList dateTermQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, LongBlock block) {
public static QueryList dateTermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
LongBlock block
) {
return new TermQueryList(
field,
searchExecutionContext,
aliasFilter,
block,
null,
field instanceof RangeFieldMapper.RangeFieldType rangeFieldType
Expand All @@ -210,8 +241,13 @@ public static QueryList dateTermQueryList(MappedFieldType field, SearchExecution
/**
* Returns a list of geo_shape queries for the given field and the input block.
*/
public static QueryList geoShapeQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, Block block) {
return new GeoShapeQueryList(field, searchExecutionContext, block, null);
public static QueryList geoShapeQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block
) {
return new GeoShapeQueryList(field, searchExecutionContext, aliasFilter, block, null);
}

private static class TermQueryList extends QueryList {
Expand All @@ -220,11 +256,12 @@ private static class TermQueryList extends QueryList {
private TermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block,
OnlySingleValueParams onlySingleValueParams,
IntFunction<Object> blockValueReader
) {
super(field, searchExecutionContext, block, onlySingleValueParams);
super(field, searchExecutionContext, aliasFilter, block, onlySingleValueParams);
this.blockValueReader = blockValueReader;
}

Expand All @@ -233,6 +270,7 @@ public TermQueryList onlySingleValues(Warnings warnings, String multiValueWarnin
return new TermQueryList(
field,
searchExecutionContext,
aliasFilter,
block,
new OnlySingleValueParams(warnings, multiValueWarningMessage),
blockValueReader
Expand Down Expand Up @@ -264,10 +302,11 @@ private static class GeoShapeQueryList extends QueryList {
private GeoShapeQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block,
OnlySingleValueParams onlySingleValueParams
) {
super(field, searchExecutionContext, block, onlySingleValueParams);
super(field, searchExecutionContext, aliasFilter, block, onlySingleValueParams);

this.blockValueReader = blockToGeometry(block);
this.shapeQuery = shapeQuery();
Expand All @@ -278,6 +317,7 @@ public GeoShapeQueryList onlySingleValues(Warnings warnings, String multiValueWa
return new GeoShapeQueryList(
field,
searchExecutionContext,
aliasFilter,
block,
new OnlySingleValueParams(warnings, multiValueWarningMessage)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void testQueries() throws Exception {
var inputTerms = makeTermsBlock(List.of(List.of("b2"), List.of("c1", "a2"), List.of("z2"), List.of(), List.of("a3"), List.of()))
) {
MappedFieldType uidField = new KeywordFieldMapper.KeywordFieldType("uid");
QueryList queryList = QueryList.rawTermQueryList(uidField, directoryData.searchExecutionContext, inputTerms);
QueryList queryList = QueryList.rawTermQueryList(uidField, directoryData.searchExecutionContext, AliasFilter.EMPTY, inputTerms);
assertThat(queryList.getPositionCount(), equalTo(6));
assertThat(queryList.getQuery(0), equalTo(new TermQuery(new Term("uid", new BytesRef("b2")))));
assertThat(queryList.getQuery(1), equalTo(new TermInSetQuery("uid", List.of(new BytesRef("c1"), new BytesRef("a2")))));
Expand Down Expand Up @@ -153,7 +154,12 @@ public void testRandomMatchQueries() throws Exception {
}).toList();

try (var directoryData = makeDirectoryWith(directoryTermsList); var inputTerms = makeTermsBlock(inputTermsList)) {
var queryList = QueryList.rawTermQueryList(directoryData.field, directoryData.searchExecutionContext, inputTerms);
var queryList = QueryList.rawTermQueryList(
directoryData.field,
directoryData.searchExecutionContext,
AliasFilter.EMPTY,
inputTerms
);
int maxPageSize = between(1, 256);
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(
blockFactory,
Expand Down Expand Up @@ -190,8 +196,12 @@ public void testQueries_OnlySingleValues() throws Exception {
List.of(List.of("b2"), List.of("c1", "a2"), List.of("z2"), List.of(), List.of("a3"), List.of("a3", "a2", "z2", "xx"))
)
) {
QueryList queryList = QueryList.rawTermQueryList(directoryData.field, directoryData.searchExecutionContext, inputTerms)
.onlySingleValues(warnings(), "multi-value found");
QueryList queryList = QueryList.rawTermQueryList(
directoryData.field,
directoryData.searchExecutionContext,
AliasFilter.EMPTY,
inputTerms
).onlySingleValues(warnings(), "multi-value found");
// pos -> terms -> docs
// -----------------------------
// 0 -> [b2] -> []
Expand Down
Loading
Loading