A new way to build Trending filters using Elasticsearch

3 months ago 10

We’ve recently introduced a new feature to SecAlerts, the Trending filter. With this filter you can limit your search to vulnerabilities that are seeing the most activity in terms of references on social media or in the news (see the most trending vulnerabilities in the last month).

Our search index is built entirely with ElasticSearch and how this was achieved was a unique solution that I didn’t find in my research for building a Trending filter in ElasticSearch. As there seemed to be a lack of guides to solving this problem I thought I would write how we solved it rather elegantly.

The requirements we had were as following:

  • Requirement 1: The trending query must be a filter that returns a subset of documents rather than a ranking of all documents.

  • Requirement 2: Trending can be determined over a window of time from some arbitrary date until now.

  • Requirement 3: The number of documents returned should be fairly consistent (not too many not too few)

With this in mind we first look at what data we have to make the decision on if a document is trending. In our case we have references along with a timestamp. Here’s a made up example vulnerability to illustrate:

CVE-12345

  1. 2/5/2025 - 2 references

  2. 4/5/2025 - 1 reference

  3. 10/5/2025 - 1 reference

If our window of time was to look for trending vulnerabilities between 1st of May and the 7th of May, this vulnerability had 3 references which might be higher than usual. If we were to look for trending between the 7th of May and the 14th of May, this might no longer be trending as there was only one reference. Expand that filter to the entire month of May and it might be trending again with 4 references. Notice that the window of time plays a big factor in whether or not it might be trending.

A document is trending when the total number of references within a time window is greater than a threshold value.

What is the threshold value? That is where the complexity in our approach lies but thankfully it is a value that only needs to be calculated once per time window so filtering on each document can be very simple and quick. For now imagine that we have this threshold value calculated. How do we use it to query our documents and determine if it’s trending?

First lets take our data above and turn that into a mapping in our index that can be queried against.

{ "trends": { "type": "nested", "properties": { "ts": { "type": "long", }, "value": { "type": "integer", } } } }

Now you might assume our example vulnerability would be indexed like so where each timestamp is stored along with its value:

{ "trends": [ { "ts": 1743552000, "value": 2 }, { "ts": 1743724800, "value": 1 }, { "ts": 1744243200, "value": 1 } ] }

However notice that in our requirements the window of time should be from an arbitrary start time until now. We can precompute the sums going back in time so we never need to aggregate per document. Thus we index the document like so:

{ "trends": [ { "ts": 1743552000, "value": 4 }, { "ts": 1743724800, "value": 2 }, { "ts": 1744243200, "value": 1 } ] }

Starting from the last item, we traverse the list backward adding the previous value so that the first items value eventually becomes the total.

Now when we need to filter by trending given a start timestamp and a threshold value we simply need a nested range query:

{ "nested": { "path": "trends", "query": { "bool": { "must": [ { "range": { "trends.ts": { "gte": <START_TIMESTAMP> } } }, { "range": { "trends.value": { "gte": <THRESHOLD> } } } ] } } } }

This query is very quick to run as we don’t need any aggregations per document to determine the sum within a given time window.

In our example document, if we ran the query on the 10th of May 2025 and our start time was the 1st of May and our threshold was 3, the document would match because one of the items in the trends field has a ts AND value greater than the start time and threshold. However if the threshold was 6, it would not match or if the start time was instead the 8th of May the only matching trends object is { "ts": 1744243200, "value": 1 } and the value is below the threshold.

Now that we have the ability to query for trending documents given a start time and threshold we need to finally figure out what the threshold value is going to be.

As promised, this is where the complexity begins. We use a rather obscure ElasticSearch feature known as Scripted Metric Aggregation. This allows us to run an aggregation query with custom scripts. This is necessary because of the way we store the trends data in compounding fashion, any aggregation or summing would be skewed. What we want to aggregate is the first values in the trend array past a given start date, which no built in aggregation query can do.

The aggregation script comes in 4 phases:

  • init_script — executed first before any documents are considered

  • map_script — executed once per document

  • combine_script — executed per shard to combine the values from the previous script

  • reduce_script — executed once at the end to reduce the values from the previous script (combine the shards that combined the documents)

In our init_script we initialise the state data that will be used in future phases.

// used to store a list of largest values from the trends nested array state.counts = []; // store the start date from a script parameter state.start_date = params.start_date; // keep track of the previous largest value state.previous_value = 0;

These values will start to make more sense in the map_script.

def trend_date = doc['trends.ts'].value; def trend_value = doc['trends.value'].value; if (trend_date >= state.start_date) { if (trend_value >= state.previous_value) { state.counts.add(trend_value); } state.previous_value = trend_count; }

This script will execute per document and per item in the trends list. Because we only want to aggregate the first value past a given start date, we must keep track of the previous value the script sees. We only add the value to our list of counts if the current value is higher than the previous value meaning we’ve reached a new document rather than a new trend item within the same document (because the values in the trend array always go down not up).

As a result this script will give us an array of values from the first trend item greater than the given starting date. Here’s an example of what it would look like with a few more example documents being aggregated.

state.count = [ 3, 1, 4, 3, 2, 3, 1, 1, 1, 2, 1 ];

In the combine_script we want to turn this into a frequency table where the value is the key and the frequency becomes the value.

Map freq = new HashMap(); for (c in state.counts) { String k = c.toString(); if (freq.containsKey(k)) { freq.put(k, freq.get(k) + 1); } else { freq.put(k, 1); } } return freq;

We simply enumerate the list of values, converting the integer to a string to use as a key into the frequency table and if the key exists, increment, if not start it at 1. This then gets returned which will pass it onto the next phase where the real magic happens.

Before I show the code of the reduce_script I will talk about what we’re trying to achieve. We will receive a list of frequency tables from the previous script.

Our first task is to merge these into one big frequency table. We then want to take the biggest key numerically (not by frequency), then step backwards to see how many documents we would start to include by lowering our threshold (not too many and not too few as per Requirement 3). Once we feel our threshold lets in the right amount of documents, we return the threshold value from our aggregation script.

Map freq = new HashMap(); int total = 0; int largest = 0; // merge the frequency maps from the previous script for (state in states) { for (entry in state.entrySet()) { String k = entry.getKey(); if (freq.containsKey(k)) { freq.put(k, freq.get(k) + entry.getValue()); } else { freq.put(k, entry.getValue()); } } } // calculate the total frequencies and the largest possible threshold for (entry in freq.entrySet()) { total += entry.getValue(); int k = Integer.parseInt(entry.getKey()); if (k > largest) { largest = k; } } // aim for the top 25% of the documents to be considered trending double target = 0.25; double cumPct = 0; for (int i = largest; i > 0; i--) { double v = (double) freq.getOrDefault(Integer.toString(i), 0); double pct = v / total; // if we have reached our target, pick this threshold if (cumPct + pct >= target) { return i; } cumPct += pct; } return Math.max(1, largest);

Lets go over what the script does one more time. We have one big frequency table where the key is the reference count and the value is the frequency of documents with that exact reference count. We calculate the largest reference count and the total number of documents. We start with the largest and loop backwards testing to see if we lower the threshold how many documents will be considered trending. If we have gone over the target then return the current threshold. Otherwise return the largest threshold and ensure it is never less than 1.

This approach meets all our requirements and comes with the benefit of a very quick and simple query. The complexity of finding the threshold value given a start date can be cached so the hit is very infrequent. Hopefully this helps anyone looking to implement a similar trending filter in ElasticSearch and please reach out if you’d like any help ([email protected]).

Read Entire Article