Wednesday, December 2, 2015

What you see is NOT what you get !!!

Recently, I troubleshooted two common issues with ES queries. Suddenly, the queries stopped returning data using some filter criteria. But when we looked at all the records for the type then the filter should have evaluated to true and returned data. We often forget that ES mostly returns original document which was indexed. But while indexing the fields in document there are analyzers being applied. So the field value we see in document is not exactly same as indexed value. 

Consider the following simple example:
















Line 1: We have created an employee record with displayName as "Ajey Dudhe".
Line 6: We are retrieving all the employee records.

If you see on the right hand side under _source the document is returned as it is. But does Elasticsearch see the value of displayName as "Ajey Dudhe". The answer is no. In this example, when the document is inserted then default analyzers are used. By default the string value will be broken into tokens and lower-cased. And this is the value which Elasticsearch sees. In order to have a look at the actual value visible to Elasticsearch we need to use the fielddata_fields option while fetching the documents as follows:


















Line 22 on right hand side: Notice that the fielddata_fields now has values as seen by ES i.e. tokenized and lower-cased. In case the value is not at all indexed by ES then it will not appear under fields.

Wednesday, April 22, 2015

Taming the fielddata

Introduction

One of the bottlenecks in scaling-up Elasticsearch is fielddata. The term fielddata does not directly refer to data but represents the data structures & caching done by Elasticsearch for doing efficient look-ups. Apart from consuming considerable chunk of memory, fielddata also impacts queries. The impact is clearly visible when you have millions of documents to query. One of the recommended solutions is to scale-out. But even if you have multiple nodes, you need to make sure every single node is fine tuned otherwise you may have to keep adding considerable number of nodes as the data set increases. In following sections we will discuss how we can optimize the fielddata usage which, in turn, should help improve the memory usage & query performance for a single node.

Use doc_value = true

Main issue with fielddata is that it consumes huge memory. Using doc_value = true on a field in the mapping, tells Elasticsearch to use file system to store fielddata instead of memory. However, there is currently one limitation on string fields. Only non_analyzed string fields can have this option. What this means is that you cannot use doc_value = true with fields having analyzers like lower-case etc. defined on them. It becomes problem in some cases e.g. if you need to provide case-insensitive search for a field. Again, here the suggested approach it to use multi-fields mapping i.e. one analyzed and one non_analyzed field. But the moment we have analyzed field we cannot use doc_value = true on it which means that fielddata for it will be in-memory. To handle this we need to use transform script. In the transform script you need to do the operations like convert field to lower-case etc. In case you need to use analyzer then find out how you can invoke the analyzers explicitly from your transform script.
While the documentation says using doc_value improves memory usage, it can impact the performance. But in our testing it was observed that the aggregation query performance increased drastically. May be it was due to the fact that now fielddata was being queried from file system cache.

Handle sorting

Sorting is another feature where fielddata is used. When sort is specified on a particular field, Elasticsearch needs to find out the terms for the document i.e. it needs fielddata. So to avoid it we can use script based sorting. The script can take field name as parameter and return the field value from the _source. Even though there is overhead is accessing the source, we observed around 50% improvement using script based sorting.


Aggregation

Aggregation is the main feature where fielddata is required. Elasticsearch provides Scripted Metric Aggregation but using script did not help here. Best is to avoid aggregation queries if not required. For example, use script filter if you need to query for distinct documents.

Friday, January 30, 2015

Get distinct\unique values using search instead of aggregation - Part 2

The previous post described how custom script filter can be used to filter out duplicate documents in the result set. However, the approach would not work with multiple shards. To handle this case we need to have some plugin implementation to hook into Elasticsearch where we can filter out the duplicate documents fetched from multiple shards. But before that let's have a quick look at how distributed search works.

Distributed search

Elasticsearch documentation covers distributed search very well. Following is the summary:
  • Distributed search is executed in two phases: Query and Fetch phase
  • Query phase
    • The coordinating node sends the search request to all the shards.
    • Each shard executes the search and sends back list of document IDs and the sort values back to the coordinating node.
      • For example, get documents where file.folder = 'c:\windows' sorted by file.name
      • In above case, say, shard 1 matches three documents with file names as notepad.exe, textpad.exe, wordpad.exe. So the return values would be
        • DocID: some_doc_id_1, Sort values: [notepad.exe]
        • DocID: some_doc_id_2, Sort values: [textpad.exe]
        • DocID: some_doc_id_3, Sort values: [wordpad.exe]
    • All shards returns similar data back to the coordinating node.
    • The coordinating node now has list of fields on which sorting needs to be done and also the actual values for these fields per document.
    • This node then merges all the results into a single list.
    • The list is sorted using the sort values returned by each shard for each document.
    • The final list is prepared by applying pagination (from & size) parameters.
  • Fetch phase
    • Once the final list is prepared the coordinating node then sends the request to get the required documents from each shard using the document IDs i.e. send a multi-get request.

Remove duplicate documents in distributed search

As seen above if we need to filter out the duplicate documents for distributed search then we need to hook into the QUERY phase. Once the document IDs & sort values are retrieved for all the shards then we need to remove duplicate documents here. The steps to achieve this are as follows:
  • Apart from document Ids & sort values we also need the values for primary keys identifying the document as unique in the result set.
  • Once we have above information then we can remove duplicate documents using the primary keys. 

Passing back the primary key values in query phase

In QUERY phase for a document only its ID and sort values are passed back. The _source value is not passed. So only option to pass back the primary key values is using the sort values. For this we can use custom script for sorting. This custom script will be similar to the custom filter mentioned in previous post.
  • The script will take primary key field names as parameter.
  • The script will form a key by concatenating the values of primary key fields.
  • In the search request we will add this script based sort criteria at the end of existing sort fields, if any.
  • This way, for each document, we will get the key value during the QUERY phase itself on the coordinating node.

Removing duplicate documents

Now, we need to hook into the QUERY phase using plugin approach. Thanks to the reference plugin implementation here. By looking at this sample plugin I was able to deduce that the final sorting of docs from all shards happens in org.elasticsearch.search.controller.SearchPhaseController's sortDocs() method. This class is used deep inside and there is no direct way to hook into it. So I had to follow the same approach as in the reference plugin implementation i.e. to implement a custom action. In the plugin, the required classes and methods were extended\overridden. Finally, the class extending SearchPhaseController overrides the sortDocs() method. In this we remove the duplicate documents from across the shards and then call the original sortDocs() method.

Source code

You can start with the junit tests.

Limitations

  • This approach extends the ES classes and methods which may change in future.
  • One major limitation is that the totalHits returned by the search request will not be accurate when pagination is used. This is because each shard will return the totalHits as the total documents matching the filter but actual documents returned can be less if pagination is used e.g. if total matches could be 100 but pagination criteria could be to return only 10 results. In this case the totalHits will be 100. Now, we do not know the remaining 90 documents are duplicate or not. But this should not be an issue if we can use the totalHits as an approximate count.

Sunday, January 11, 2015

Get distinct\unique values using search instead of aggregation - Part 1

Problem statement

While dealing with NoSQL datastores the key aspect for schema design is de-normalization or, in other words, defining your schema as per the query requirements. But you cannot keep on de-normalizing for each and every use case. One of such examples is fetching unique records in a query. Relational databases handle this use case by providing DISTINCT or equivalent keyword support. Unfortunately Elasticsearch does not have such built-in support.

Use case

For example, say, you have a files type which store file information as well as information of the machine on which the file is seen i.e. file_hash, file_name, file_folder, file_size, machine_name, machine_ip etc.With above type defined in ES we can have following queries around it:
  1. Given a file_hash get all the machines on which it is seen.
  2. Given a file_hash get all the unique combination of (file_name & file_folder) across all machines.
One of the solutions is to de-normalize i.e. create additional types in ES to store the data such that we get unique records for a given file_hash using normal search request.
Other common approach is to use aggregation. An aggregation returns count of unique terms and we can also use nested aggregations to get unique values on multiple fields as required in #2.
Another option is to use Script Filter support from ES described below.

Using Script Filter

Elasticsearch supports using custom scripts as filters. To handle the above use case we can use the script support as follows:
  • Define a custom script filter. For this discussion assume it is called AcceptDistinctDocumentScriptFilter
  • This custom filter takes in a list of primary keys as input.
  • These primary keys are the fields whose values will be used to determine uniqueness of records.
  • Now, instead of using aggregation we use normal search request and pass the custom script filter to the request.
  • If the search already has a filter\query criteria defined then append the custom filter using logical AND operator.
  • Following is example using pseudo syntax
    • if the request is: 
      • select * from myindex where file_hash = 'hash_value'
    • then append the custom filter as:   
      • select * from myindex where file_hash = 'hash_value' AND AcceptDistinctDocumentScriptFilter(params= ['file_name', 'file_folder'])

Custom filter implementation

  • The filter derives from org.elasticsearch.script.AbstractSearchScript 
  • override the run() method.
  • Also define the factory for creating the instance of the custom script.
  • From the factory pass the list of primary keys to the custom filter.
  • The run() method is called for every record matching the original filter\query criteria. 
  • We get the values for the primary keys from the current document. 
  • We concatenate the values to form a key to be used to determine if the document was already seen. For example, say, we use java Set<String>. 
  • If the set.add() returns true then it means the document is seen for first time and we return true from the run() method. 
  • However, if set.add() returns false then the document was already in set and hence we return false. Returning false will cause the filter\query criteria to fail and the record will be rejected.

Pros & Cons

  • Pros
    • Works with existing filter and hence on filtered data.
    • Is fast compared to using aggregation function.
    • Takes care of pagination i.e. pagination is applied on filtered unique results.
  • Cons
    • Since we are storing key values in the script memory usage will be more. But this should not be an issue when we use filter along with pagination.

Limitation

Above approach will work only with single shard and not with multiple shards. For example, say there are 3 shards and each shard returns following unique values with page size as 3:
  • Shard 1: [A, B, C]
  • Shard 2: [P, B, R]
  • Shard 3: [X, C, Z]
Now when the results from all the shards are merged and sorted the result set would be [A, B, B, C, C, P, R, X, Z] and if we request for only 3 results in the page then we will get [A, B, B] i.e. duplicate results.
This can be handled by writing some ES plugin which hooks into the phase where results are merged from different shards. Next POC will be around this.

Source code


Summary

Even though currently above approach has limitation of multiple shards it is worth exploring this approach because of the performance gain seen when no. of documents are in millions. Also, another observation was around the field data (or the cached data). In case of aggregation, it seems like ES tries to load in memory almost all the fields specified in aggregation while the field data memory using filter is much low.

Part 2

Part 2 covers the approach to handle distributed search request.