The previous post described how custom script filter can be used to filter out duplicate documents in the result set. However, the approach would not work with multiple shards. To handle this case we need to have some plugin implementation to hook into Elasticsearch where we can filter out the duplicate documents fetched from multiple shards. But before that let's have a quick look at how distributed search works.
Elasticsearch documentation covers distributed search very well. Following is the summary:
- Distributed search is executed in two phases: Query and Fetch phase
- Query phase
- The coordinating node sends the search request to all the shards.
- Each shard executes the search and sends back list of document IDs and the sort values back to the coordinating node.
- For example, get documents where file.folder = 'c:\windows' sorted by file.name
- In above case, say, shard 1 matches three documents with file names as notepad.exe, textpad.exe, wordpad.exe. So the return values would be
- DocID: some_doc_id_1, Sort values: [notepad.exe]
- DocID: some_doc_id_2, Sort values: [textpad.exe]
- DocID: some_doc_id_3, Sort values: [wordpad.exe]
- All shards returns similar data back to the coordinating node.
- The coordinating node now has list of fields on which sorting needs to be done and also the actual values for these fields per document.
- This node then merges all the results into a single list.
- The list is sorted using the sort values returned by each shard for each document.
- The final list is prepared by applying pagination (from & size) parameters.
- Fetch phase
- Once the final list is prepared the coordinating node then sends the request to get the required documents from each shard using the document IDs i.e. send a multi-get request.
Remove duplicate documents in distributed search
As seen above if we need to filter out the duplicate documents for distributed search then we need to hook into the QUERY phase. Once the document IDs & sort values are retrieved for all the shards then we need to remove duplicate documents here. The steps to achieve this are as follows:
- Apart from document Ids & sort values we also need the values for primary keys identifying the document as unique in the result set.
- Once we have above information then we can remove duplicate documents using the primary keys.
Passing back the primary key values in query phase
In QUERY phase for a document only its ID and sort values are passed back. The _source value is not passed. So only option to pass back the primary key values is using the sort values. For this we can use custom script for sorting. This custom script will be similar to the custom filter mentioned in previous post.
- The script will take primary key field names as parameter.
- The script will form a key by concatenating the values of primary key fields.
- In the search request we will add this script based sort criteria at the end of existing sort fields, if any.
- This way, for each document, we will get the key value during the QUERY phase itself on the coordinating node.
Removing duplicate documents
Now, we need to hook into the QUERY phase using plugin approach. Thanks to the reference plugin implementation here. By looking at this sample plugin I was able to deduce that the final sorting of docs from all shards happens in org.elasticsearch.search.controller.SearchPhaseController's sortDocs() method. This class is used deep inside and there is no direct way to hook into it. So I had to follow the same approach as in the reference plugin implementation i.e. to implement a custom action. In the plugin, the required classes and methods were extended\overridden. Finally, the class extending SearchPhaseController overrides the sortDocs() method. In this we remove the duplicate documents from across the shards and then call the original sortDocs() method.
You can start with the junit tests.
- This approach extends the ES classes and methods which may change in future.
- One major limitation is that the totalHits returned by the search request will not be accurate when pagination is used. This is because each shard will return the totalHits as the total documents matching the filter but actual documents returned can be less if pagination is used e.g. if total matches could be 100 but pagination criteria could be to return only 10 results. In this case the totalHits will be 100. Now, we do not know the remaining 90 documents are duplicate or not. But this should not be an issue if we can use the totalHits as an approximate count.