Skip to content

Commit 06e24b7

Browse files
authored
Add quickwit client to benchmark (#1329)
### What problem does this PR solve? * add quickwit client to benchmark * add tantivy benchmark dataset wiki-articles ### Type of change - [x] Other (please describe): update benchmark
1 parent 3bd70ca commit 06e24b7

File tree

9 files changed

+399
-2
lines changed

9 files changed

+399
-2
lines changed

docs/references/benchmark.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ docker run -d --name qdrant --network host -v $HOME/qdrant/storage:/qdrant/stora
6262

6363
mkdir -p $HOME/infinity
6464
docker run -d --name infinity -v $HOME/infinity:/var/infinity --ulimit nofile=500000:500000 --network=host infiniflow/infinity:0.1.0
65+
66+
mkdir -p $HOME/quickwit
67+
docker run -d --rm -v $HOME/quickwit/qwdata:/quickwit/qwdata -p 127.0.0.1:7280:7280 quickwit/quickwit run
6568
```
6669

6770
4. Run Benchmark:
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
from typing import Any
2+
import json
3+
from typing import List
4+
import os
5+
import h5py
6+
import uuid
7+
import logging
8+
import requests
9+
import sys
10+
11+
from .base_client import BaseClient
12+
13+
14+
class WrapQuickwitClient:
15+
def __init__(self, base_url):
16+
self.base_url = base_url
17+
18+
def create_index(self, index_config):
19+
response = requests.post(
20+
f"{self.base_url}/api/v1/indexes",
21+
headers={'Content-Type': 'application/yaml'},
22+
data=index_config
23+
)
24+
if response.status_code == 201:
25+
return response.json()
26+
else:
27+
response.raise_for_status()
28+
29+
def index_exists(self, index):
30+
response = requests.get(f"{self.base_url}/api/v1/indexes/{index}")
31+
if response.status_code == 200:
32+
return True
33+
elif response.status_code == 404:
34+
return False
35+
else:
36+
response.raise_for_status()
37+
38+
def delete_index(self, index):
39+
response = requests.delete(f"{self.base_url}/api/v1/indexes/{index}")
40+
if response.status_code == 200:
41+
return response.json()
42+
elif response.status_code == 404:
43+
return {"error": "Index not found"}
44+
else:
45+
response.raise_for_status()
46+
47+
def upload_batch(self, index, data):
48+
bulk_url = f'{self.base_url}/api/v1/{index}/ingest?commit=force'
49+
response = requests.post(
50+
bulk_url,
51+
data=data
52+
)
53+
if response.status_code == 200:
54+
return response.json()
55+
else:
56+
response.raise_for_status()
57+
58+
def search(self, index, query):
59+
search_url = f"{self.base_url}/api/v1/_elastic/{index}/_search/"
60+
61+
response = requests.get(
62+
search_url,
63+
headers={'Content-Type': 'application/json'},
64+
data=json.dumps(query)
65+
)
66+
if response.status_code == 200:
67+
return response.json()
68+
else:
69+
response.raise_for_status()
70+
71+
72+
class QuickwitClient(BaseClient):
73+
def __init__(self, conf_path: str) -> None:
74+
"""
75+
The mode configuration file is parsed to extract the needed parameters, which are then all stored for use by other functions.
76+
"""
77+
BaseClient.__init__(self, conf_path)
78+
with open(conf_path, "r") as f:
79+
self.data = json.load(f)
80+
self.client = WrapQuickwitClient(base_url=self.data["connection_url"])
81+
self.table_name = self.data["name"]
82+
self.path_prefix = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
83+
logging.getLogger("quickwit_transport").setLevel(logging.WARNING)
84+
85+
def upload_batch(self, actions: List):
86+
self.client.upload_batch(self.table_name, actions)
87+
88+
def upload(self):
89+
"""
90+
Upload data and build indexes (parameters are parsed by __init__).
91+
"""
92+
if self.client.index_exists(index=self.table_name):
93+
self.client.delete_index(index=self.table_name)
94+
95+
index_config_path = os.path.join(self.path_prefix, self.data["index_config_path"])
96+
with open(index_config_path, 'rb') as file:
97+
yaml_data = file.read()
98+
self.client.create_index(index_config=yaml_data)
99+
100+
batch_size = self.data["insert_batch_size"]
101+
dataset_path = os.path.join(self.path_prefix, self.data["data_path"])
102+
if not os.path.exists(dataset_path):
103+
self.download_data(self.data["data_link"], dataset_path)
104+
_, ext = os.path.splitext(dataset_path)
105+
# quickwit import data every batch cannot exceed 10MB
106+
MAX_DATA_SIZE = 10 * 1024 * 1024
107+
if ext == ".json":
108+
with open(dataset_path, "r") as f:
109+
bulk_request = ""
110+
for i, line in enumerate(f):
111+
record = json.dumps(json.loads(line))
112+
record_str = f"{record}\n"
113+
if sys.getsizeof(bulk_request) + sys.getsizeof(record_str) >= MAX_DATA_SIZE:
114+
self.upload_batch(bulk_request)
115+
bulk_request = record_str
116+
else:
117+
bulk_request += record_str
118+
119+
if i % 1000000 == 0 and i != 0:
120+
logging.info(f"row {i}")
121+
if len(bulk_request) != 0:
122+
self.upload_batch(bulk_request)
123+
elif ext == ".hdf5" and self.data["mode"] == "vector":
124+
with h5py.File(dataset_path, "r") as f:
125+
actions = []
126+
for i, line in enumerate(f["train"]):
127+
if i % batch_size == 0 and i != 0:
128+
self.upload_batch(actions)
129+
actions = []
130+
record = {self.data["vector_name"]: line}
131+
actions.append(
132+
{
133+
"_index": self.table_name,
134+
"_id": uuid.UUID(int=i).hex,
135+
"_source": record,
136+
}
137+
)
138+
if actions:
139+
self.upload_batch(actions)
140+
elif ext == ".csv":
141+
custom_headers = []
142+
headers = self.data["index"]["doc_mapping"]["field_mappings"]
143+
for key in headers:
144+
custom_headers.append(key["name"])
145+
with open(
146+
dataset_path, "r", encoding="utf-8", errors="replace"
147+
) as data_file:
148+
bulk_request = ""
149+
cnt = 0
150+
for i, line in enumerate(data_file):
151+
row = line.strip().split("\t")
152+
if len(row) != len(headers):
153+
logging.info(
154+
f"row = {i}, row_len = {len(row)}, not equal headers len, skip"
155+
)
156+
continue
157+
row_dict = {header["name"]: value for header, value in zip(headers, row)}
158+
document = json.dumps(row_dict)
159+
bulk_request += f"{document}\n"
160+
cnt += 1
161+
if cnt >= batch_size:
162+
self.upload_batch(bulk_request)
163+
bulk_request = ""
164+
cnt = 0
165+
166+
if cnt != 0:
167+
self.upload_batch(bulk_request)
168+
cnt = 0
169+
else:
170+
raise TypeError("Unsupported file type")
171+
172+
def get_fulltext_query_content(self, query: str, is_and: bool = False) -> Any:
173+
ret = None
174+
if is_and:
175+
terms = query.split()
176+
ret = {
177+
"query": {
178+
"bool": {"must": [{"match": {"body": term}} for term in terms]}
179+
}
180+
}
181+
else:
182+
ret = {
183+
"query": {
184+
"query_string": {
185+
"query": query,
186+
"fields": [
187+
"body"
188+
]
189+
}
190+
},
191+
"sort": ["_score"]
192+
}
193+
return ret
194+
195+
def setup_clients(self, num_threads=1):
196+
self.clients = list()
197+
for i in range(num_threads):
198+
client = WrapQuickwitClient(self.data["connection_url"])
199+
self.clients.append(client)
200+
201+
def do_single_query(self, query_id, client_id) -> list[Any]:
202+
query = self.queries[query_id]
203+
client = self.clients[client_id]
204+
if self.data["mode"] == "fulltext":
205+
body = self.get_fulltext_query_content(query)
206+
body["size"] = self.data["topK"]
207+
208+
result = client.search(
209+
index=self.table_name,
210+
query=body,
211+
)
212+
result = [
213+
# todo add _id
214+
(0, hit["sort"][0])
215+
for hit in result["hits"]["hits"]
216+
]
217+
return result
218+
else:
219+
raise TypeError("Unsupported data mode {}".format(self.data["mode"]))
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{
2+
"name": "elasticsearch_tantivy",
3+
"app": "elasticsearch",
4+
"app_path": "servers/elasticsearch/",
5+
"connection_url": "http://localhost:9200",
6+
"data_path": "datasets/tantivy/wiki-articles.json",
7+
"insert_batch_size": 8192,
8+
"query_path": "datasets/tantivy/operations.txt",
9+
"result_path": "datasets/tantivy/elasticsearch_result.jsonl",
10+
"mode": "fulltext",
11+
"topK": 10,
12+
"index": {
13+
"settings": {
14+
"index.number_of_shards": 5,
15+
"index.number_of_replicas": 0,
16+
"index.requests.cache.enable": false
17+
},
18+
"mappings": {
19+
"_source": {
20+
"enabled": true
21+
},
22+
"dynamic": "strict",
23+
"properties": {
24+
"url": {
25+
"type": "text"
26+
},
27+
"title": {
28+
"type": "text"
29+
},
30+
"body": {
31+
"type": "text"
32+
}
33+
}
34+
}
35+
}
36+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"name": "infinity_tantivy",
3+
"app": "infinity",
4+
"host": "127.0.0.1:23817",
5+
"data_path": "datasets/tantivy/wiki-articles.json",
6+
"data_link": "http://192.168.200.183:8000/wiki-articles.json.bz2",
7+
"insert_batch_size": 8192,
8+
"query_path": "datasets/tantivy/operations.txt",
9+
"result_path": "datasets/tantivy/infinity_result.jsonl",
10+
"query_link": "to_be_set",
11+
"mode": "fulltext",
12+
"topK": 10,
13+
"use_import": false,
14+
"schema": {
15+
"url": {"type": "varchar", "default":""},
16+
"title": {"type": "varchar", "default":""},
17+
"body": {"type": "varchar", "default":""}
18+
},
19+
"index": {
20+
"url": {
21+
"type": "text"
22+
},
23+
"title": {
24+
"type": "text"
25+
},
26+
"body": {
27+
"type": "text"
28+
}
29+
}
30+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"name": "quickwit_enwiki",
3+
"app": "quickwit",
4+
"app_path": "servers/quickwit/",
5+
"connection_url": "http://localhost:7280",
6+
"data_path": "datasets/enwiki/enwiki.csv",
7+
"insert_batch_size": 8192,
8+
"query_path": "datasets/enwiki/operations.txt",
9+
"result_path": "datasets/enwiki/quickwit_result.jsonl",
10+
"mode": "fulltext",
11+
"topK": 10,
12+
"index_config_path": "configs/quickwit_enwiki_index.yaml",
13+
"index": {
14+
"version": "0.8",
15+
"index_id": "quickwit_enwiki",
16+
"doc_mapping": {
17+
"mode": "lenient",
18+
"field_mappings": [
19+
{ "name": "doctitle", "type": "text" },
20+
{ "name": "docdate", "type": "text" },
21+
{ "name": "body", "type": "text" }
22+
]
23+
}
24+
}
25+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
version: 0.8
2+
3+
index_id: quickwit_enwiki
4+
5+
doc_mapping:
6+
field_mappings:
7+
- name: doctitle
8+
type: text
9+
tokenizer: default
10+
record: position
11+
stored: true
12+
- name: docdate
13+
type: text
14+
tokenizer: default
15+
record: position
16+
stored: true
17+
- name: body
18+
type: text
19+
tokenizer: default
20+
record: position
21+
stored: true
22+
23+
24+
search_settings:
25+
default_search_fields: [body]
26+
27+
indexing_settings:
28+
commit_timeout_secs: 10
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"name": "quickwit_tantivy",
3+
"app": "quickwit",
4+
"app_path": "servers/quickwit/",
5+
"connection_url": "http://localhost:7280",
6+
"data_path": "datasets/tantivy/wiki-articles.json",
7+
"insert_batch_size": 8192,
8+
"query_path": "datasets/tantivy/operations.txt",
9+
"result_path": "datasets/tantivy/quickwit_result.jsonl",
10+
"mode": "fulltext",
11+
"topK": 10,
12+
"index_config_path": "configs/quickwit_tantivy_index.yaml",
13+
"index": {
14+
"version": "0.8",
15+
"index_id": "quickwit_tantivy",
16+
"doc_mapping": {
17+
"mode": "lenient",
18+
"field_mappings": [
19+
{ "name": "url", "type": "text" },
20+
{ "name": "title", "type": "text" },
21+
{ "name": "body", "type": "text" }
22+
]
23+
}
24+
}
25+
}

0 commit comments

Comments
 (0)