Sunday, January 11, 2015

Get distinct\unique values using search instead of aggregation - Part 1

Problem statement

While dealing with NoSQL datastores the key aspect for schema design is de-normalization or, in other words, defining your schema as per the query requirements. But you cannot keep on de-normalizing for each and every use case. One of such examples is fetching unique records in a query. Relational databases handle this use case by providing DISTINCT or equivalent keyword support. Unfortunately Elasticsearch does not have such built-in support.

Use case

For example, say, you have a files type which store file information as well as information of the machine on which the file is seen i.e. file_hash, file_name, file_folder, file_size, machine_name, machine_ip etc.With above type defined in ES we can have following queries around it:
  1. Given a file_hash get all the machines on which it is seen.
  2. Given a file_hash get all the unique combination of (file_name & file_folder) across all machines.
One of the solutions is to de-normalize i.e. create additional types in ES to store the data such that we get unique records for a given file_hash using normal search request.
Other common approach is to use aggregation. An aggregation returns count of unique terms and we can also use nested aggregations to get unique values on multiple fields as required in #2.
Another option is to use Script Filter support from ES described below.

Using Script Filter

Elasticsearch supports using custom scripts as filters. To handle the above use case we can use the script support as follows:
  • Define a custom script filter. For this discussion assume it is called AcceptDistinctDocumentScriptFilter
  • This custom filter takes in a list of primary keys as input.
  • These primary keys are the fields whose values will be used to determine uniqueness of records.
  • Now, instead of using aggregation we use normal search request and pass the custom script filter to the request.
  • If the search already has a filter\query criteria defined then append the custom filter using logical AND operator.
  • Following is example using pseudo syntax
    • if the request is: 
      • select * from myindex where file_hash = 'hash_value'
    • then append the custom filter as:   
      • select * from myindex where file_hash = 'hash_value' AND AcceptDistinctDocumentScriptFilter(params= ['file_name', 'file_folder'])

Custom filter implementation

  • The filter derives from org.elasticsearch.script.AbstractSearchScript 
  • override the run() method.
  • Also define the factory for creating the instance of the custom script.
  • From the factory pass the list of primary keys to the custom filter.
  • The run() method is called for every record matching the original filter\query criteria. 
  • We get the values for the primary keys from the current document. 
  • We concatenate the values to form a key to be used to determine if the document was already seen. For example, say, we use java Set<String>. 
  • If the set.add() returns true then it means the document is seen for first time and we return true from the run() method. 
  • However, if set.add() returns false then the document was already in set and hence we return false. Returning false will cause the filter\query criteria to fail and the record will be rejected.

Pros & Cons

  • Pros
    • Works with existing filter and hence on filtered data.
    • Is fast compared to using aggregation function.
    • Takes care of pagination i.e. pagination is applied on filtered unique results.
  • Cons
    • Since we are storing key values in the script memory usage will be more. But this should not be an issue when we use filter along with pagination.


Above approach will work only with single shard and not with multiple shards. For example, say there are 3 shards and each shard returns following unique values with page size as 3:
  • Shard 1: [A, B, C]
  • Shard 2: [P, B, R]
  • Shard 3: [X, C, Z]
Now when the results from all the shards are merged and sorted the result set would be [A, B, B, C, C, P, R, X, Z] and if we request for only 3 results in the page then we will get [A, B, B] i.e. duplicate results.
This can be handled by writing some ES plugin which hooks into the phase where results are merged from different shards. Next POC will be around this.

Source code


Even though currently above approach has limitation of multiple shards it is worth exploring this approach because of the performance gain seen when no. of documents are in millions. Also, another observation was around the field data (or the cached data). In case of aggregation, it seems like ES tries to load in memory almost all the fields specified in aggregation while the field data memory using filter is much low.

Part 2

Part 2 covers the approach to handle distributed search request.

1 comment:

  1. Great Post Ajey,but can you provide the custom-scripts-0.0.1.jar file in your GitHub repository.