- Notifications
You must be signed in to change notification settings - Fork1
Contains the source code of the paper "Scalable Distributed Inverted List Indexes in Disaggregated Memory" published at SIGMOD'24.
License
DatabaseGroup/rdma-inverted-index
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Implementation of distributed inverted list indexing techniques under memory disaggregation.This is the source code of the paper "Scalable Distributed Inverted List Indexes in Disaggregated Memory" published and presented at SIGMOD'24.
Quick Start •Setup •Usage •Data Preprocessing
@article{10.1145/3654974, author = {Widmoser, Manuel and Kocher, Daniel and Augsten, Nikolaus}, title = {Scalable Distributed Inverted List Indexes in Disaggregated Memory}, journal = {Proc. ACM Manag. Data}, publisher = {Association for Computing Machinery}, year = {2024}, volume = {2}, number = {3}, url = {https://doi.org/10.1145/3654974}, doi = {10.1145/3654974}}
This section shows how to run a simple two-machine setup on a small example dataset and the expected program outputs.For a more detaileddescription of the commands andrequirements, we refer to the corresponding sections below.The following assumptions are made to run the code (on one compute node and one memory node):
- both machines are within the same InfiniBand network,
- therequired packages are installed on both machines,
clang++
is installed as C++ compiler,- the IP addresses are adjusted accordingly (cf.Nodes),
cluster1
is the compute node,cluster2
the memory node,- the directory
/mnt/dbgroup-share/example/
is accessible from both nodes (e.g., via NFS) and contains the files inexample/
from this repository (an exemplary binary index file and some random queries).
Clone and compile the code onboth machines (without hugepages; setting up hugepages is describedhere):
git clone https://github.com/DatabaseGroup/rdma-inverted-index.gitcd rdma-inverted-indexmkdir buildcd buildcmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_CXX_FLAGS="-DNOHUGEPAGES" ..make
Preprocessing: on one of the machines, partition the dataset upfront:
data_processing/partitioner -i /mnt/dbgroup-share/example/example-lists.dat -o /mnt/dbgroup-share/example/ -s block -n 1 -b 1024
On thememory node, run:
numactl --membind=1 --cpunodebind=1 ./block_index --is-server --num-clients 1
On thecompute node, run:
numactl --membind=1 ./block_index --initiator --index-dir /mnt/dbgroup-share/example/ --query-file /mnt/dbgroup-share/example/example-queries.txt --servers cluster2 --threads 4 --operation intersection --block-size 1024
The output on thecompute node should be similar to
============================================================ CLIENT============================================================connect to: [cluster2]is initiator: trueTCP port: 1234IB port: 1max outstanding CQEs: 16max send work requests: 1024max receive work requests: 1024============================================================index directory: /mnt/dbgroup-share/example/query file: /mnt/dbgroup-share/example/example-queries.txtoperation: intersectionnumber of threads: 4threads pinned: trueblock size: 1024============================================================1 device(s) foundSelected device: mlx4_0num_cores: 16physical cores per socket: 8hyperthreading disabledinterleaved policyclient id: 0number of clients: 1connect to server cluster2pairing: 533 -- 534[STATUS]: pinned main thread to core 8[STATUS]: exchange information with compute nodes[STATUS]: receive access tokens of remote memory regions[STATUS]: read meta data and assign remote pointers[STATUS]: read queriessize of queries: 24132 Bytes[STATUS]: allocate worker threads and read buffersallocated ALIGNED MEM (no hugepage) at 140104859615296 with buffer size 2621441 device(s) foundSelected device: mlx4_0pairing: 534 -- 5351 device(s) foundSelected device: mlx4_0pairing: 535 -- 5361 device(s) foundSelected device: mlx4_0pairing: 536 -- 5371 device(s) foundSelected device: mlx4_0pairing: 537 -- 538[STATUS]: run worker threads[STATUS]: pinned thread 1 to core 0[STATUS]: pinned thread 2 to core 9[STATUS]: pinned thread 3 to core 1query 0 [read] (len=2): [9 51]query 100 [read] (len=2): [536 683]query 200 [read] (len=3): [238 276 990]query 300 [read] (len=4): [68 124 707 791]query 400 [read] (len=4): [346 405 607 899]query 500 [read] (len=5): [90 187 425 575 739]query 600 [read] (len=5): [538 849 851 883 932]query 700 [read] (len=6): [182 439 664 875 878 931]query 800 [read] (len=7): [51 276 320 340 341 423 552]query 900 [read] (len=8): [92 112 240 371 537 552 700 788][STATUS]: join compute threadst0 processed queries: 337, READ lists: 0.387253, polling: 0, operation: 0t1 processed queries: 245, READ lists: 0.475905, polling: 0, operation: 0t2 processed queries: 257, READ lists: 0.374502, polling: 0, operation: 0t3 processed queries: 161, READ lists: 0.406743, polling: 0, operation: 0[STATUS]: gather query statistics[STATUS]: gather timingsstatistics:{ "allocated_read_buffers_size": 274432, "catalog_size": 8000, "mb_per_sec": 2907.1562137754227, "meta": { "algorithm": "block-based", "block_size": 1024, "compute_nodes": 1, "compute_threads": 4, "hyperthreading": "false", "index_directory": "example", "memory_nodes": 1, "operation": "intersection", "query_file": "example-queries.txt", "threads_pinned": "true" }, "num_insert_queries": 0, "num_queries": 1000, "num_read_queries": 1000, "num_result": 0, "queries_per_sec": 564081, "rdma_reads_in_bytes": 5153792, "timings": { "query_c0": 1.772795, "query_total": 1.772795 }, "total_index_buffer_size": 1024000, "total_initial_index_size": 1024000, "universe_size": 1000}
The output on thememory node should be similar to
============================================================ SERVER============================================================num clients: 1TCP port: 1234IB port: 1max outstanding CQEs: 16max send work requests: 1024max receive work requests: 1024============================================================1 device(s) foundSelected device: mlx4_0num_cores: 32physical cores per socket: 8hyperthreading enabledinterleaved policypairing: 534 -- 533[STATUS]: pinned main thread to core 8[STATUS]: receive index file locationindex file: /mnt/dbgroup-share/example/block1024_m1_of1_index.datindex file size: 1024000allocated ALIGNED MEM (no hugepage) at 140400725594176 with buffer size 1024000[STATUS]: read index into memoryindex size: 1024000total index buffer size: 1024000[STATUS]: register memory and distribute access token[STATUS]: connect QPs of compute threadspairing: 535 -- 534pairing: 536 -- 535pairing: 537 -- 536pairing: 538 -- 537[STATUS]: idle{"allocate_index_buffer":0.023072,"read_file":0.675024,"read_index_into_memory":0.685865}
All our experiments were conducted on an 9-node cluster with five compute nodes and four memory nodes.Each compute node has two Intel Xeon E5-2630 v3 2.40GHz processors with 16 cores (8 cores each; hyperthreading enabled) and the memory nodes have two physical Intel Xeon E5-2603 v4 1.70GHz processors with 12 cores (6 cores each).All machines run Debian 10 Buster with a Linux 4.19 kernel, and are equipped with 96GB of main memory and a Mellanox ConnectX-3 NIC connected to a 18-port SX6018/U1 InfiniBand switch (FDR 56Gbps).For RDMA support, we have installed theMLNX_OFED 4.9-x LTS
Linux RDMAdriver.
The following C++ libraries and Unix packages are required to compile the code.Note thatibverbs
(the RDMA library) is Linux-only.The code also compiles without InfiniBand network cards.
- ibverbs
- boost (to support
boost::program_options
forCLI parsing) - pthreads (for multithreading)
- oneTBB (for concurrent data structures)
- a C++ compiler that supports C++17 (we used
clang++-12
) - cmake
- numactl
For instance, to install the requirements on Debian, run the following command:
apt-get -y install clang libboost-all-dev libibverbs1 libibverbs-dev numactl cmake libtbb-dev git git-lfs python3-venv
Adjust the IP addresses of the cluster nodes accordingly inrdma-library/library/utils.cc
:
rdma-inverted-index/rdma-library/library/utils.cc
Lines 11 to 22 in8f251cb
std::map<str, str> node_to_ip{ | |
{"cluster1","10.10.5.20"}, | |
{"cluster2","10.10.5.21"}, | |
{"cluster3","10.10.5.22"}, | |
{"cluster4","10.10.5.23"}, | |
{"cluster5","10.10.5.24"}, | |
{"cluster6","10.10.5.25"}, | |
{"cluster7","10.10.5.26"}, | |
{"cluster8","10.10.5.27"}, | |
{"cluster9","10.10.5.28"}, | |
{"cluster10","10.10.5.29"}, | |
}; |
After cloning the repository and installing the requirements, the code must be compiled on all cluster nodes:
mkdir buildcd buildcmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER=clang++ ..make
The following index executables exist:term_index
(read-only term-based index),document_index
(read-onlydocument-based index),block_index
(read-only block-based index),dynamic_block_index
(block-based index thatsupports updates).A cluster node can either be a memory node or a compute node, exact one compute node must be an initiator.
To reduce the number of address translations, it is recommended to allocate hugepages on all the cluster nodes:
echo n> /sys/devices/system/node/node1/hugepages/hugepages-2048kB/nr_hugepages
wheren
is the number of hugepages.To run the index without hugepages, use-DNOHUGEPAGES
as an additional compiler flag.
Run on the initiator:
numactl --membind=1 <executable> --initiator --index-dir <index-binary-directory> --query-file <query-file> --servers <memory-nodes> --clients <compute-nodes> --threads <num-threads> --operation <operation> --block-size <block-size>
<executable>
is eitherterm_index
,document_index
,block_index
, ordynamic_block_index
<index-binary-directory>
is the directory where the index binary files are stored (cf.Data Preprocessing below)<query-file>
is the file that contains the queries (cf.Data Preprocessing below)<memory-nodes>
is a list of memory nodes (separated with white-spaces)<compute-nodes>
is a list of compute nodes (excluding the initiator)<num-threads
is the number of compute threads per compute node<operation>
is the operation performed for read queries: eitherintersection
orunion
<block-size>
is the size of a block (relevant only forblock_index
anddynamic_block_index
)
Run on the remaining compute nodes:
numactl --membind=1 <executable> --servers <memory-nodes>
numactl --membind=1 --cpunodebind=1 <executable> --is-server --num-clients <num-compute-nodes>
<num-compute-nodes>
is the number of compute nodes that will connect to the memory node
The following CLI options can be adjusted:
Allowed options: -h [ --help ] Show help message -s [ --is-server ] Program acts as server if set --servers arg A list of server nodes to which a client connects, e.g., "cluster3" --clients arg A list of client nodes to which the initiator connects, e.g., "cluster4 cluster5" -i [ --initiator ] Program acts as initiating client if set -c [ --num-clients ] arg (=1) Number of clients that connect to each server (relevant only for server nodes) --port arg (=1234) TCP port --ib-port arg (=1) Port of infiniband device --max-poll-cqes arg (=16) Number of outstanding RDMA operations allowed (hardware-specific) --max-send-wrs arg (=1024) Maximum number of outstanding send work requests --max-receive-wrs arg (=1024) Maximum number of outstanding receive work requests -d [ --index-dir ] arg Location of the partitioned index files. -q [ --query-file ] arg Input file containing queries for the index. -t [ --threads ] arg Number of threads per compute node -o [ --operation ] arg Operation performed on lists: either "intersection" or "union". -p [ --disable-thread-pinning ] Disables pinning compute threads to physical cores if set. -b [ --block-size ] arg (=1024) Block size in bytes (only used by [dynamic_]block_index).
⏰ For CCNEWS and TWITTER, we provide preprocessed binary files since processing them takes quite a while.Links and instructions can be found below.SSB and TOY can be easily reproduced by following the steps listed below.
We have used the following datasets in our experiments:
Download and extract the dataset fromhttps://twitter.mpi-sws.org/ and run
python3 scripts/twitter/extract.py links-anon.txt> twitter-lists.txtpython3 scripts/twitter/reassign_ids.py twitter-lists.txt> twitter-lists-reassigned.txt
to create the index file. For furtherbinary processing (see below), manually add the universe size and the number oflists to the top of the file.The queries are generated using thecreate_popular_queries.py
script.
⏰ The preprocessed binary index file and the corresponding queries can be foundhere (with statistics and a detailed description).To download them, run the following commands:
git clone https://frosch.cosy.sbg.ac.at/datasets/sets/twitter-mpi.gitcd twitter-mpigit lfs pulltar -xvf twitter-mpi.tar.zst
To create the tables, we provide a script that uses theSSB-DB generator (dbgen -s 1 -T a
):
cd scriptsbash ssb/create_tables.sh
To create the index file and the queries, run the following:
python3 -m venv venvsource venv/bin/activatepython3 -m pip install -r requirements.txtcd ssbpython3 ssb.py> ssb-lists.txtpython3 generate_ssb_queries.py<num-queries>> ssb-queries.txt\deactivate
The CCNEWS data can be downloaded fromhttps://doi.org/10.48610/1dcb974 (the compressed.ciff
file). With theciff tool, the lists can be extracted.The queries are given and must be converted.
⏰ The preprocessed binary index file and the corresponding queries can be foundhere (with statistics and a detailed description).To download them, run the following commands:
git clone https://frosch.cosy.sbg.ac.at/datasets/sets/ccnews.gitcd ccnewsgit lfs pulltar -xvf cc-news-en.tar.zst
We have used
python3 scripts/uniform.py 2000000 100000 50 100 200
to create 2M random documents containing 100 terms on average, thescripts/index_from_documents.py
script to convert the documents to lists, and
python3 scripts/uniform.py 100000 100000 2 5 10
to create uniform random queries with an average size of 5.The documents must be converted to lists, and the queries prefixed withr:
(see below).
To create a binary index file, the dataset file should have the followinginput format:
universe sizenumber of lists (is the same in many cases)list id: list entries separated via whitespace...
List ids should be consecutive. For instance:
29497691294976910: 345422 651571 915383 1: 494792 537875 1066751 1095655 1358056...
Usingdata_processing/serializer -i <input-file> -o <output-file>
, we get the followingoutput format (as binaryoutput, all 32bitintegers):
<universe-size><numer-of-lists><list-id><list-size><list-entry-1>...
Finally, we must partition the index upfront such that our algorithms can deal with it (they have to simply load thepartitioned binary index and meta files rather than partition the index each time on its own):
data_processing/partitioner -i <binary-input-file> -o <output-path> -s <strategy> -n <num-nodes> [-b <block-size>] [-a -q <query-file>]
<binary-input-file>
is the output ofserializer
, i.e., the entire serialized binary index.<output-path>
is a path to which the partitioned index binary files are written.<strategy>
is the partitioning strategy, can either beterm
,document
, orblock
.<num-nodes>
is the number of memory nodes, i.e., the number of partitions.<block-size>
is the block size in bytes (used only if the strategy isblock
, default is 2048).-a
partitions only accessed lists (given in the query file<query-file>
), currently only implemented forblock-based
The output directory (including its files) must be accessible by all compute (for meta data) and memory nodes (for indexdata), e.g., stored on a network file storage.
The content of a query file must be as follows:
r: <term_1> ... <term_n>i: <doc-id> <term>
r:
indicates a read query (computes the intersection between the lists given by the terms)i:
indicates an insert query (inserts the document id to the list represented by the term)
For inserts, we create 95% of the index and use the remaining 5% for index queries (drawn at random).First, re-create the documents out of the binary index file by using./create_documents > <doc_file>
.Then with./draw_documents_and_create_index
, we randomly draw 5% of the documents, store them in a separate file, andbuild a 95% binary index.Finally, the scriptmix_queries.py
mixes read and insert queries.Withsplit_inserts.py
, we can split the long insert queries into multiple single-term queries.Please note thatcreate_documents.cc
anddraw_documents_and_create_index.cc
must be adjusted, respectively (TODO: CLI options):
Dataset dataset = Dataset::SSDB; | |
str prefix ="/mnt/dbgroup-share/mwidmoser/data/"; |
rdma-inverted-index/src/data_processing/update_queries/draw_documents_and_create_index.cc
Lines 19 to 21 in8f251cb
Dataset dataset = Dataset::SSDB; | |
str prefix ="/mnt/dbgroup-share/mwidmoser/data/index/raw_index_and_document_files/"; | |
str binary_prefix ="/mnt/dbgroup-share/mwidmoser/data/index/serialized/"; |
About
Contains the source code of the paper "Scalable Distributed Inverted List Indexes in Disaggregated Memory" published at SIGMOD'24.