Skip to content

Commit dfdd6ec

Browse files
fulmicotonphilippemnoel
authored andcommitted
Fixes bug that causes out-of-order sstable key. (quickwit-oss#2445)
The previous way to address the problem was to replace \u{0000} with 0 in different places. This logic had several flaws: Done on the serializer side (like it was for the columnar), there was a collision problem. If a document in the segment contained a json field with a \0 and antoher doc contained the same json field but `0` then we were sending the same field path twice to the serializer. Another option would have been to normalizes all values on the writer side. This PR simplifies the logic and simply ignore json path containing a \0, both in the columnar and the inverted index. Closes quickwit-oss#2442
1 parent 9ae65bc commit dfdd6ec

File tree

8 files changed

+91
-47
lines changed

8 files changed

+91
-47
lines changed

columnar/src/columnar/writer/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::net::Ipv6Addr;
88

99
use column_operation::ColumnOperation;
1010
pub(crate) use column_writers::CompatibleNumericalTypes;
11+
use common::json_path_writer::JSON_END_OF_PATH;
1112
use common::CountingWriter;
1213
pub(crate) use serializer::ColumnarSerializer;
1314
use stacker::{Addr, ArenaHashMap, MemoryArena};
@@ -247,6 +248,7 @@ impl ColumnarWriter {
247248
}
248249
pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> {
249250
let mut serializer = ColumnarSerializer::new(wrt);
251+
250252
let mut columns: Vec<(&[u8], ColumnType, Addr)> = self
251253
.numerical_field_hash_map
252254
.iter()
@@ -260,7 +262,7 @@ impl ColumnarWriter {
260262
columns.extend(
261263
self.bytes_field_hash_map
262264
.iter()
263-
.map(|(term, addr)| (term, ColumnType::Bytes, addr)),
265+
.map(|(column_name, addr)| (column_name, ColumnType::Bytes, addr)),
264266
);
265267
columns.extend(
266268
self.str_field_hash_map
@@ -287,6 +289,12 @@ impl ColumnarWriter {
287289
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
288290
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
289291
for (column_name, column_type, addr) in columns {
292+
if column_name.contains(&JSON_END_OF_PATH) {
293+
// Tantivy uses b'0' as a separator for nested fields in JSON.
294+
// Column names with a b'0' are not simply ignored by the columnar (and the inverted
295+
// index).
296+
continue;
297+
}
290298
match column_type {
291299
ColumnType::Bool => {
292300
let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr);

columnar/src/columnar/writer/serializer.rs

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::io;
22
use std::io::Write;
33

4+
use common::json_path_writer::JSON_END_OF_PATH;
45
use common::{BinarySerializable, CountingWriter};
56
use sstable::value::RangeValueWriter;
67
use sstable::RangeSSTable;
@@ -18,13 +19,8 @@ pub struct ColumnarSerializer<W: io::Write> {
1819
/// code.
1920
fn prepare_key(key: &[u8], column_type: ColumnType, buffer: &mut Vec<u8>) {
2021
buffer.clear();
21-
// Convert 0 bytes to '0' string, as 0 bytes are reserved for the end of the path.
22-
if key.contains(&0u8) {
23-
buffer.extend(key.iter().map(|&b| if b == 0 { b'0' } else { b }));
24-
} else {
25-
buffer.extend_from_slice(key);
26-
}
27-
buffer.push(0u8);
22+
buffer.extend_from_slice(key);
23+
buffer.push(JSON_END_OF_PATH);
2824
buffer.push(column_type.to_code());
2925
}
3026

@@ -97,18 +93,3 @@ impl<'a, W: io::Write> io::Write for ColumnSerializer<'a, W> {
9793
self.columnar_serializer.wrt.write_all(buf)
9894
}
9995
}
100-
101-
#[cfg(test)]
102-
mod tests {
103-
use super::*;
104-
105-
#[test]
106-
fn test_prepare_key_bytes() {
107-
let mut buffer: Vec<u8> = b"somegarbage".to_vec();
108-
prepare_key(b"root\0child", ColumnType::Str, &mut buffer);
109-
assert_eq!(buffer.len(), 12);
110-
assert_eq!(&buffer[..10], b"root0child");
111-
assert_eq!(buffer[10], 0u8);
112-
assert_eq!(buffer[11], ColumnType::Str.to_code());
113-
}
114-
}

src/core/json_utils.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
1+
use common::json_path_writer::{JSON_END_OF_PATH, JSON_PATH_SEGMENT_SEP};
22
use common::{replace_in_place, JsonPathWriter};
33
use rustc_hash::FxHashMap;
44

@@ -83,6 +83,9 @@ fn index_json_object<'a, V: Value<'a>>(
8383
positions_per_path: &mut IndexingPositionsPerPath,
8484
) {
8585
for (json_path_segment, json_value_visitor) in json_visitor {
86+
if json_path_segment.as_bytes().contains(&JSON_END_OF_PATH) {
87+
continue;
88+
}
8689
json_path_writer.push(json_path_segment);
8790
index_json_value(
8891
doc,

src/indexer/index_writer.rs

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -815,8 +815,9 @@ mod tests {
815815
use crate::indexer::NoMergePolicy;
816816
use crate::query::{QueryParser, TermQuery};
817817
use crate::schema::{
818-
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, NumericOptions,
819-
TextFieldIndexing, TextOptions, Value, FAST, INDEXED, STORED, STRING, TEXT,
818+
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
819+
NumericOptions, Schema, TextFieldIndexing, TextOptions, Value, FAST, INDEXED, STORED,
820+
STRING, TEXT,
820821
};
821822
use crate::store::DOCSTORE_CACHE_CAPACITY;
822823
use crate::{
@@ -2378,21 +2379,21 @@ mod tests {
23782379

23792380
#[test]
23802381
fn test_bug_1617_2() {
2381-
assert!(test_operation_strategy(
2382+
test_operation_strategy(
23822383
&[
23832384
IndexingOp::AddDoc {
23842385
id: 13,
2385-
value: Default::default()
2386+
value: Default::default(),
23862387
},
23872388
IndexingOp::DeleteDoc { id: 13 },
23882389
IndexingOp::Commit,
23892390
IndexingOp::add(30),
23902391
IndexingOp::Commit,
23912392
IndexingOp::Merge,
23922393
],
2393-
true
2394+
true,
23942395
)
2395-
.is_ok());
2396+
.unwrap();
23962397
}
23972398

23982399
#[test]
@@ -2490,4 +2491,46 @@ mod tests {
24902491

24912492
Ok(())
24922493
}
2494+
2495+
#[test]
2496+
fn test_bug_2442_reserved_character_fast_field() -> crate::Result<()> {
2497+
let mut schema_builder = schema::Schema::builder();
2498+
let json_field = schema_builder.add_json_field("json", FAST | TEXT);
2499+
2500+
let schema = schema_builder.build();
2501+
let index = Index::builder().schema(schema).create_in_ram()?;
2502+
let mut index_writer = index.writer_for_tests()?;
2503+
index_writer.set_merge_policy(Box::new(NoMergePolicy));
2504+
2505+
index_writer
2506+
.add_document(doc!(
2507+
json_field=>json!({"\u{0000}B":"1"})
2508+
))
2509+
.unwrap();
2510+
index_writer
2511+
.add_document(doc!(
2512+
json_field=>json!({" A":"1"})
2513+
))
2514+
.unwrap();
2515+
index_writer.commit()?;
2516+
2517+
Ok(())
2518+
}
2519+
2520+
#[test]
2521+
fn test_bug_2442_reserved_character_columnar() -> crate::Result<()> {
2522+
let mut schema_builder = Schema::builder();
2523+
let options = JsonObjectOptions::from(FAST).set_expand_dots_enabled();
2524+
let field = schema_builder.add_json_field("json", options);
2525+
let index = Index::create_in_ram(schema_builder.build());
2526+
let mut index_writer = index.writer_for_tests().unwrap();
2527+
index_writer
2528+
.add_document(doc!(field=>json!({"\u{0000}": "A"})))
2529+
.unwrap();
2530+
index_writer
2531+
.add_document(doc!(field=>json!({format!("\u{0000}\u{0000}"): "A"})))
2532+
.unwrap();
2533+
index_writer.commit().unwrap();
2534+
Ok(())
2535+
}
24932536
}

src/indexer/mod.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -145,15 +145,27 @@ mod tests_mmap {
145145
}
146146
}
147147
#[test]
148-
fn test_json_field_null_byte() {
149-
// Test when field name contains a zero byte, which has special meaning in tantivy.
150-
// As a workaround, we convert the zero byte to the ASCII character '0'.
151-
// https://github.com/quickwit-oss/tantivy/issues/2340
152-
// https://github.com/quickwit-oss/tantivy/issues/2193
153-
let field_name_in = "\u{0000}";
154-
let field_name_out = "0";
155-
test_json_field_name(field_name_in, field_name_out);
148+
fn test_json_field_null_byte_is_ignored() {
149+
let mut schema_builder = Schema::builder();
150+
let options = JsonObjectOptions::from(TEXT | FAST).set_expand_dots_enabled();
151+
let field = schema_builder.add_json_field("json", options);
152+
let index = Index::create_in_ram(schema_builder.build());
153+
let mut index_writer = index.writer_for_tests().unwrap();
154+
index_writer
155+
.add_document(doc!(field=>json!({"key": "test1", "invalidkey\u{0000}": "test2"})))
156+
.unwrap();
157+
index_writer.commit().unwrap();
158+
let reader = index.reader().unwrap();
159+
let searcher = reader.searcher();
160+
let segment_reader = searcher.segment_reader(0);
161+
let inv_indexer = segment_reader.inverted_index(field).unwrap();
162+
let term_dict = inv_indexer.terms();
163+
assert_eq!(term_dict.num_terms(), 1);
164+
let mut term_bytes = Vec::new();
165+
term_dict.ord_to_term(0, &mut term_bytes).unwrap();
166+
assert_eq!(term_bytes, b"key\0stest1");
156167
}
168+
157169
#[test]
158170
fn test_json_field_1byte() {
159171
// Test when field name contains a '1' byte, which has special meaning in tantivy.

src/indexer/path_to_unordered_id.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ impl PathToUnorderedId {
3838
#[cold]
3939
fn insert_new_path(&mut self, path: &str) -> u32 {
4040
let next_id = self.map.len() as u32;
41-
self.map.insert(path.to_string(), next_id);
41+
let new_path = path.to_string();
42+
self.map.insert(new_path, next_id);
4243
next_id
4344
}
4445

src/postings/json_postings_writer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
5959
/// The actual serialization format is handled by the `PostingsSerializer`.
6060
fn serialize(
6161
&self,
62-
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
62+
ordered_term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
6363
ordered_id_to_path: &[&str],
6464
ctx: &IndexingContext,
6565
serializer: &mut FieldSerializer,
@@ -69,7 +69,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
6969
term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0));
7070
let mut prev_term_id = u32::MAX;
7171
let mut term_path_len = 0; // this will be set in the first iteration
72-
for (_field, path_id, term, addr) in term_addrs {
72+
for (_field, path_id, term, addr) in ordered_term_addrs {
7373
if prev_term_id != path_id.path_id() {
7474
term_buffer.truncate_value_bytes(0);
7575
term_buffer.append_path(ordered_id_to_path[path_id.path_id() as usize].as_bytes());

src/schema/term.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -249,12 +249,8 @@ impl Term {
249249
#[inline]
250250
pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] {
251251
let len_before = self.0.len();
252-
if bytes.contains(&0u8) {
253-
self.0
254-
.extend(bytes.iter().map(|&b| if b == 0 { b'0' } else { b }));
255-
} else {
256-
self.0.extend_from_slice(bytes);
257-
}
252+
assert!(!bytes.contains(&JSON_END_OF_PATH));
253+
self.0.extend_from_slice(bytes);
258254
&mut self.0[len_before..]
259255
}
260256
}

0 commit comments

Comments
 (0)