Querying Riak – Key Filters and MapReduce

A while back we talked about getting faster writes with Riak. Since then, I’ve been quiet on the Riak front. Let’s take a look at how we can get data out of Riak, especially since I went to great pains to throw all of that data into Riak as fast as my little laptop could manage. Key filtering is a new feature in Riak that makes it much easier to restrict queries to a subset of the data. Prior to Riak 0.13, it was necessary to write MapReduce jobs that would scan through all of the keys in a bucket. The problem is that the MapReduce jobs end up loading both the key and the value into memory. If we have a lot of data, this can cause a huge performance hit. Instead of loading all of the data key filtering lets us look at the keys themselves. We’re pre-processing the data before we get to our actually query. This is good because 1) software should do as little as possible and 2) Riak doesn’t have secondary indexing to make querying faster. Here’s how it works: Riak holds all keys in memory, but the data remains on disk. The key filtering code scans the keys in memory on the nodes in our cluster. If any keys match our criteria, Riak will pass them along to any map phases that are waiting down the pipe. I’ve written the sample code in Ruby but this functionality is available through any client.

The Code

We’re using data loaded with load_animal_data.rb. The test script itself can be found in mr_filter.rb. Once again, we’re using the taxoboxes data set.

The Results

 user     system      total        real
mr       0.060000   0.030000   0.090000 ( 20.580278)
filter   0.000000   0.000000   0.000000 (  0.797387) 

MapReduce

First, the MapReduce query:{"inputs":"animals", "query":[{"map":{"language":"javascript", "keep":false, "source":"function(o) { if (o.key.indexOf('spider') != -1) return [1]; else return []; }"}}, {"reduce":{"language":"javascript", "keep":true, "name":"Riak.reduceSum"}}]} We’re going to iterate over every key value pair in the animals bucket and look for a key that contains the word ‘spider’. Once we find that key, we’re going to return a single element array containing the number 1. Once the map phase is done, we use the built-in function Riak.reduceSum to give us a sum of the values from the previous map phase. We’re generating a count of the records that match our data – how many spiders do we really have?

Key Filtering

The key filtering query doesn’t look that much different:{"inputs":{"bucket":"animals", "key_filters":[["matches","spider"]]}, "query":[{"map":{"language":"javascript", "keep":false, "source":"function(o) { return [1]; }"}}, {"reduce":{"language":"javascript", "keep":true, "name":"Riak.reduceSum"}}]} It’s not that much different – the map query has been greatly simplified to just return [1] on success and the search criteria has been moved into theinputs portion of the query. The big difference is in the performance: the key filter query is 26 times faster. This is a simple example, but a 26x improvement is nothing to scoff at. What it really means is that the rest of our MapReduce needs to work on a smaller subset of the data which, ultimately, makes things faster for us.

A Different Way to Model Data

Now that we have our querying basics out of the way, let’s look at this problem from a different perspective; let’s say we’re tracking stock performance over time. In a relational database we might have a number of tables, notably a table to track stocks and a table to track daily_trade_volume. Theoretically, we could do the same thing in Riak with some success, but it would incur a lot of overhead. Instead we can use a natural key to locate our data. Depending on how we want to store the data, this could look something like YYYY-MM-DD-ticker_symbol. I’ve created a script to load data fromstock exchange data. For my tests, I only loaded the data for stocks that began with Q. There’s an a lot of data in this data set, so I kept things to a minimum in order to make this quick.

Since our data also contains the stock exchange identifier, we could even go one step further and include the exchange in our key. That would be helpful if we were querying based on the exchange.

If you take a look at [mr_stocks.rb][8] you’ll see that we’re setting up a query to filter stocks by the symbol QTM and then aggregate the total trade volume by month. The map phase creates a single cell array with the stock volume traded in the month and returns it. We use theRiak.mapValuesJson function to map the raw data coming in from Riak to a proper JavaScript object. We then get the month that we’re looking at by parsing the key. This is easy enough to do because we have a well-defined key format.function(o, keyData, arg) { var data = Riak.mapValuesJson(o)[0]; var month = o.key.split('-').slice(0,2).join('-'); var obj = {}; obj[month] = data.stock_volume; return [ obj ]; } If we were to look at this output we would see a lot of rows of unaggregated data. While that is interesting, we want to look at trending for stock trades for QTM over all time. To do this we create a reduce function that will sum up the output of the map function. This is some pretty self explanatory JavaScript:``` function(values, arg) { return [ values.reduce(function(acc, item) { for (var month in item) { if (acc[month]) { acc[month] += parseInt(item[month]); } else { acc[month] = parseInt(item[month]); } }

           return acc;
         })

] } ```Okay, so that might not actually be as self-explanatory as anyone would like. The JavaScript reduce method is a newer one. It will accumulate a single result (the acc variable) for all elements in the array. You could use this to get a sum, an average, or whatever you want. One other thing to note is that we use parseInt. We probably don’t have to use it, but it’s a good idea. Why? Riak is not aware of our data structures. We just store arrays of bytes in Riak – it could be a picture, it could be text, it could be a gzipped file – Riak doesn’t care. JavaScript only knows that it’s a string. So, when we want to do mathematical operations on our data, it’s probably wise to use parseInt and parseFloat.

Where to Now?

Right now you probably have a lot of data loaded. You have a couple of options. There are two scripts on github to remove the stock data and theanimal data from your Riak cluster. That’s a pretty boring option. What can you learn from deleting your data and shutting down your Riak cluster? Not a whole lot. You should open up mr_stocks.rb and take a look at how it works. It should be pretty easy to modify the map and reduce functions to output total trade volume for the month, average volume per day, and average price per day. Give it a shot and see what you come up with. If you have questions or run into problems, you can hit up the comments, theRiak Developer Mailing List, or hit up the #riak IRC room on irc.freenode.net if you need immediate, real time help with your problem.