Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Cover image for Vinted Search Scaling Chapter 5: Divide and Conquer
Vinted profile imageEvaldas Buinauskas
Evaldas Buinauskas forVinted

Posted on • Originally published atvinted.engineering

     

Vinted Search Scaling Chapter 5: Divide and Conquer

Elasticsearch is really fast out of the box, but it doesn’t know your data model or access patterns. It’s no secret that Elasticsearch powers our catalogue, but with the ever-increasing amount of listings and queries, we’ve had a number of problems to solve.

In the middle of 2020, we noticed an increasing amount of failing search requests during peak hours. Looking at node-exporter metrics in Grafana indicated nothing was really wrong and that the cluster load was fairly balanced, but it clearly wasn’t. Only after logging to the actual servers did we realise that the load distribution was uneven across all the Elasticsearch nodes we ran there.

Server load distribution

Following documented guidelines for optimizing cluster and JVM settings, we experimented with shard sizes and the number of replicas while following thenumber_of_shards * (number_of_replicas + 1) < #data_nodes recommendation. But this gave us no significant improvements, so we had to approach the issue with a different strategy.

We’ve decided to apply the ‘divide and conquer’ strategy by creating separate indices for specific types of queries. These could exist in a different cluster, have different numbers of shards or replicas, or different index-time sorting. We started by redirecting all of our/_count queries to another cluster, which significantly reduced the portion of failing requests. We then realized that we've discovered something important:

Vinted supports multilingual search and uses an index per language strategy. We also allow users to browse our catalogue by applying filters without requiring any search text. We’ve created a version of the listings index to support such kinds of queries. By doing that, we could put all of the listings into a single index, no longer analyse textual fields and redirect all browsing queries to them.

Surprisingly, it helped to distribute cluster load more evenly despite the fact that the new indices were in the same cluster. The extra disk space and increased complexity was a worthy tradeoff. We decided to go even further and partition this index by date to see if we can control data growth easier, improve query performance, and reduce refresh times. Splitting the index by date allowed us to leverage the shards skipping technique. For example, say search queries last for 2 weeks of data; Elasticsearch would optimise them by skipping shards, querying only the last month’s index partition and skipping the rest.

You may already know that we useKafka Connect for our data ingestion and it comes with Single Message Transforms (SMTs), allowing us to transform messages as they flow through Connect. Two of these SMTs were particularly interesting:

Timestamp Router allows the construction of a new topic field using the record’s topic name and timestamp field with the ability to format it.

Regex Router allows updating the record’s topic using the configured regular expression and replacement string.

To use these routers, our records – and tombstone messages – must contain a fixed timestamp. Otherwise, we will never be able to ingest data to Elasticsearch consistently. By default, each new record is produced with the current timestamp, butruby-kafka allows it to be overwritten with any value. We believed that overwriting it with the listing’s creation time would work. We also considered tryingILM anddata streams but quickly rejected them, as both of these techniques target append-only data.

We built a small application that mimics our ingestion pipeline, but with a couple of changes:

  1. Overridescreate_time with a fixed value for each key
  2. Uses Timestamp Router to partition records into monthly partitions
{"transforms":"TimestampRouter","transforms.TimestampRouter.timestamp.format":"yyyy.MM","transforms.TimestampRouter.topic.format":"${topic}-${timestamp}","transforms.TimestampRouter.type":"org.apache.kafka.connect.transforms.TimestampRouter"}
Enter fullscreen modeExit fullscreen mode

The config change is fairly simple but very powerful – it’ll suffix the record’s topic with the year and month using its timestamp.

Timestamp Router results

This is great, but we have lots of listings from the past, and creating a monthly bucket for very few records would be expensive. To combat this, we’ve used the Regex Router to put all the partitions up to 2019 into a singleold one. It wouldn’t touch newer items.

{"transforms":"TimestampRouter,MergeOldListings","transforms.TimestampRouter.timestamp.format":"yyyy.MM","transforms.TimestampRouter.topic.format":"${topic}-${timestamp}","transforms.TimestampRouter.type":"org.apache.kafka.connect.transforms.TimestampRouter","transforms.MergeOldListings.regex":"(?-mix:(.+)-((2010|2011|2012|2013|2014|2015|2016|2017|2018).*))","transforms.MergeOldListings.replacement":"$1-old","transforms.MergeOldListings.type":"org.apache.kafka.connect.transforms.RegexRouter"}
Enter fullscreen modeExit fullscreen mode

Timestamp Router partitioned results

Timestamp Router partitioned results

After confirming this works as expected, we implemented this into our application, which was fairly simple:

  1. Create new topics for our listings (we don’t want to affect the current ingestion pipeline)
  2. Write to both old and new topics at the same time
  3. Use the listing to create time when sending records to the new topics
  4. Test ingestion with the new topics and routing strategy
  5. Switch to new topics once we confirm that ingestion works as expected

The initial switch to partitioned indices didn’t work out, so we’ve had to tailor our queries for them, tweak the number of shards and the number of replicas. After plenty of unsuccessful attempts, we finally have a version that works great.

Metrics after implementation

We’ve managed to not only shed 20ms off the P99 but have had other great results too:

  1. Query and request cache hit ratio is above 90% due to older indices refreshing less often
  2. Having load split between multiple indices results in a more even load across the whole cluster, so we no longer have hot nodes
  3. We’ve hit fewer index shards when querying for the top-k latest items

In retrospect, this abnormal behavior was most likely caused bybug in the Adaptive Replica Selection formula. But back in the day, we mitigated this bug by creatively applying the divide and conquer strategy, which helps us to this day.

Top comments(0)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

More fromVinted

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp