Getting Started With Hive

This is a follow up to the last two posts I made about querying HBase and Hive. The set up for this was a bit trickier than I would have liked, so I’m documenting my entire process for three reasons.

  1. To remind myself for the next time I have to do this.
  2. To help someone else get started.
  3. In the hope that someone will know a better way and help me improve this.

Installing Hadoop and Hive

This was the easiest part of the installation process. I used the Cloudera distribution of Hadoop for everything in this stack. There are a few ways to go about getting the Cloudera Distribution of Hadoop (CDH for short) on your machine. The Hadoop Quick Start is a good place to get started.

Java

I’m pretty sure we’ve covered this before, but you need to have Java 1.6 installed in order to use any part of the Hadoop ecosystem.

Windows

If you’re using Windows, I recommend using the Hadoop VMWare image that Cloudera make available. If you want to run Hadoop natively on Windows, you need to set up Cygwin and creating a UNIX-like environment on your local machine. Setting. up Cygwin can be a bit tricky the first time you do it. Luckily, you’re not alone and documentation does exist: Setting up a Single-Node Hadoop “Cluster” on Windows XP.

Linux

Linux users have it the easiest out of any platform, in my opinion. Most of the documentation assumes that you’re running some UNIX-based operating system. If you’re willing to use the CDH version of Hadoop, you can tell your package manager to talk to the Cloudera repository. Cloudera have thoughtfully provided documentation for you in the installation guide. You issue a few commands and you’ll have a working Hadoop set up on your machine in no time.

Other *NIX

This is the route that I took with OS X. You could also do the same on Linux if you want to use the stock distribution of Hadoop. The Hadoop Quick Startguide covers this in some detail, but here’s what I did: wget http://archive.cloudera.com/cdh/3/hadoop-0.20.2+737.tar.gz wget http://archive.cloudera.com/cdh/3/hive-0.5.0+32.tar.gz wget http://archive.cloudera.com/cdh/3/pig-0.7.0+9.tar.gz I copied these three files into my /opt folder and uncompressed them. As you can see in the screenshot, I renamed the hadoop, hive, and pig folders to use acdh_ prefix. This makes it easier for me to identify where the installation came from, just in case I need to upgrade or raise a stink with someone.

Configuration

Nothing is ever quite as easy as it seems at first, right? Now that we have our software installed, we need to supply some configuration information. Hadoop has three modes.

  1. Standalone In standalone mode – nothing runs as a service or daemon. This requires no configuration on our part, but it does make things trickier with Hive because only one Hive database connection can exist at a time in standalone mode.
  2. Pseudo-distributed This is what we’re aiming for. In pseudo-distributed mode everything is running on one machine as a service process. What we’re actually doing (sort of) is configuring our Hadoop installation to a be a one node cluster. It’s not reliable or failure proof (or a good idea), but it does let us develop on our workstations and laptops instead of buying a lot of servers for the house.
  3. Fully distributed This is a real live cluster running on many servers. Books are being written about this.

Hadoop is, by default, set up to run in standalone mode. Having a standalone Hadoop/HDFS installation is nice, but it doesn’t do what we need in the long-term.

There are Linux packages for CDH that will set up your server to run in pseudo-distributed mode. If you used one of those, you can skip ahead. Personally, I’d stick around just so you can read up on what you’re missing and so you make me feel better (I’ll know if you skip ahead).

core-site.xml

The core-site.xml file is used for machine specific configuration details. The default configuration is set up in core-default.xml. Settings incore-default.xml are overridden by core-site.xml, so there’s no reason to go messing around in there. If you want to check out the contents of the file, it’s located in build/classes/core-default.xml. Once you’ve opened, or created your own core-site.xml, you want to put appropriate information in there. Mine looks like this:```

fs.default.name

This setting is telling Hadoop to use HDFS running on the local host for storage. This could also be a local file or something in Amazon S3.

mapred.tasktracker.tasks.maximum

We want to set this to a sane value because this is the maximum number of MapReduce tasks that Hadoop will fire up. If we set this value too high, Hadoop could theoretically start up a large number of tasks and overwhelm our CPU.

dfs.replication

Non-relational databases like Hadoop get their resiliency by replicating data throughout the database cluster. Since we’re running in pseudo-distributed mode (it’s a cluster with only one node) we want to set this to 1.

hadoop-env.sh

I changed a few variables in my hadoop-env.sh file based on settings I found in the Hadoop Wiki article Running Hadoop on OS X 10.5 64-bit. You can set these environment variables anywhere in your operating system, but since they’re Hadoop-specific, it’s probably best to set them in the hadoop-env.sh file. Based on my own experience, I did not change theHADOOP_CLASSPATH variable. Here’s what the first 15 lines of my hadoop-env.sh looks like:```

Set Hadoop-specific environment variables here.

The only required environment variable is JAVA_HOME. All others are

optional. When running a distributed configuration it is best to

set JAVA_HOME in this file, so that it is correctly defined on

remote nodes.

The java implementation to use. Required.

export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home/

Extra Java CLASSPATH elements. Optional.

export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"

The maximum amount of heap to use, in MB. Default is 1000.

export HADOOP_HEAPSIZE=2000 I’ve also added some additional configuration to my `.profile` that make working with Hadoop and Hive a lot easier: export CLASSPATH=$CLASSPATH:/Library/PostgreSQL/9.0/share/java/postgresql-9.0-801.jdbc4.jar

export JAVA_HOME=$(/usr/libexec/java_home) export HADOOP_HOME=/opt/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/conf export PATH=$HADOOP_HOME/bin:$PATH export HIVE_HOME=/opt/hive export PATH=$HIVE_HOME/bin:$PATH export PIG_HOME=/opt/pig export PIGDIR=$PIG_HOME export PATH=$PIG_HOME/bin:$PATH export HIVE_AUX_JARS_PATH=/Library/PostgreSQL/9.0/share/java/

CLASSPATH variables don’t seem to like symbolic links on OS X

setting up the CLASSPATH here makes it easy to change things around

in the future as I upgrade between Hadoop/Hive/HBase/Pig versions

hadoop_dir=$HADOOP_HOME; hadoop_version=0.20.2+737; pig_dir=$PIG_HOME; pig_version=0.7.0+16;

export CLASSPATH=$CLASSPATH:${hadoop_dir}/hadoop-${hadoop_version}-core.jar:${hadoop_dir}/hadoop-${hadoop_version}-tools.jar:${hadoop_dir}/hadoop-${hadoop_version}-ant.jar:${hadoop_dir}/lib/commons-logging-1.0.4.jar:${pig_dir}/pig-${pig_version}.jar:${pig_dir}/pig-${pig_version}-core.jar


### One Last Step

Before we can start anything running on Hadoop we have to get our namenode ready. The namenode is part of Hadoop that manages information for the distributed filesystem. A namenode knows where all of the data is stored across our distributed filesystem. Without a running namenode, we can’t find the files we’re storing. Before we get started, we have to format our filesystem. This process creates empty directories to be used for the namenode’s data. Datanodes aren’t involved because they can be added and removed from the cluster dynamically.```
hadoop namenode -format 
```You’ll see some output from this command and, once it’s all said and done, you’ll have a formatted namenode that’s ready to go.

### Starting Hadoop

Once you have everything set up and installed, you should be able to start Hadoop by running `start-all.sh` from the command prompt. You should see some messages that tell you a bunch of nodes are being started with output going to a specific directory. [![](http://facility9.com/wp-content/uploads/2010/12/Starting-Hadoop.jpg "Starting Hadoop")](http://facility9.com/wp-content/uploads/2010/12/Starting-Hadoop.jpg)

### Verifying That It’s Running

You can verify that everything is configured and running `hadoop dfs -ls /`on the command line. You should see a basic directory listing that looks something like this image. [![](http://facility9.com/wp-content/uploads/2010/12/Verifying-Hadoop.jpg "Verifying Hadoop")](http://facility9.com/wp-content/uploads/2010/12/Verifying-Hadoop.jpg)

### What About Hive?

Astute readers will notice that we’ve only configured Hadoop and haven’t done anything to get Hive up and running. This is, unfortunately, a separate task. Hadoop and Hive are separate tools, but they’re part of the same tool chain.

#### Configuring the Metastore

Hive, like Hadoop, uses XML configuration files. But, since Hive is a database, it also needs a place to store metadata about the database – it’s called the metastore. By default, Hive uses what’s called an _embedded metastore_. The problem with the default configuration is that it only allows for one user or process at a time to connect to a Hive database. This isn’t that good for production databases. It’s also no good if you’re like me and forget which virtual desktop you left that query window open on. We’re going to fix this. It’s possible to store the metastore’s information in a persistent external database. Hive comes configured to use the Derby database, however any JDBC-compliant database can be used by setting a few options. This is precisely what we’re going to do. There’s a delicious irony in storing one database’s configuration settings in another database. Since I’m a huge fan of PostgreSQL, that’s what we’re going to use to get started.

##### Creating the Metastore User

I don’t want Hive talking to PostgreSQL as me, or any other random user account, so we’re going to set up a new user.```
CREATE USER hadoop WITH PASSWORD 'hadoop'; 
Creating the Metastore Database

Now that we have our user, let’s create the database:``` CREATE DATABASE hadoop WITH OWNER = hadoop;


##### Database Driver

Of course, we also need a database driver. Up in the changes to my`.profile` I set up a `HIVE_AUX_JARS_PATH` variable. This is a `:` delimited path where we can tell Hive to look for extra functionality to load. This folder specifically contains the PostgreSQL JDBC driver. You can download the appropriate PostgreSQL JDBC driver from the [PostgreSQL JDBC driver site](http://jdbc.postgresql.org/documentation/head/index.html). Appropriate documentation is also there. It’s pretty much as simple as copying the JAR file into an appropriate directory.

#### hive-site.xml

Now that we’ve created our metastore user and database as well as installed a JDBC driver, the last thing to do is to make some changes in `hive-site.xml`. The `hive-site.xml` overrides the settings in `hive-default.xml` – just like the Hadoop configuration we did earlier. One thing to keep in mind is that you can also override Hadoop configuration in this file as well. We’re not going to, but it’s good to know that it’s possible.```
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>hive.root.logger</name>
        <value>INFO,console</value>
    </property>
    <property>
        <name>org.jpox.autoCreateSchema</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.metastore.local</name>
        <value>true</value>
        <description>controls whether to connect to remote metastore server or open a new metastore server in Hive Client JVM</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:postgresql://localhost:5432/hadoop</value>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>org.postgresql.Driver</value>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>hadoop</value>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>hadoop</value>
    </property>
</configuration> 
```The `javax.dbo.option.*` settings are where we set up our metastore database connection. Everything else should be, largely, self-explanatory.

### Creating Tables

Let’s go ahead and create a table in Hive:```
CREATE TABLE leagues (
  league_id STRING,
  league_name STRING
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'; 
```This creates a table named `leagues` with two columns: `league_id` and`league_name`. The last two lines might look a bit funny to people familiar with SQL. They are specific to the Hive Query Language (HiveQL). These lines say that each row in the underlying data file will be stored as tab-delimited text.

### Getting Data Into Hive

We’ve configured Hadoop and Hive and we’ve created a table. How do we get data into the table? That’s a trickier matter. We’re going to be using sample data from [Retrosheet](http://www.retrosheet.org/gamelogs/index.html) to populate our Hive database. You should get started downloading and keep reading, it takes a bit of time.

### Loading Data With Pig

[Pig](http://pig.apache.org/) is a way to execute data flows written in a language called _Pig Latin_. In this use case, I’ve written very primitive data clean-up scripts using Pig Latin (it would have been just as easy, if not easier, to write them in Ruby or Perl). The first section of this Pig Latin script registers the [PiggyBank](http://wiki.apache.org/pig/PiggyBank) contributed functionality and creates convenient aliases for the functions. We use `DEFINE`to set up the aliases so we don’t have to type the fully qualified function name every time we want to use it. It’s a lot easier to type `replace(state, 'Ohio', 'OH')` than it is to type`org.apache.pig.piggybank.evaluation.string.REPLACE(state, 'Ohio', 'OH')`.```
REGISTER /opt/pig/contrib/piggybank/java/piggybank.jar;
-- Create a function alias because nobody wants to type in the full function 
-- name over and over again.
DEFINE replace org.apache.pig.piggybank.evaluation.string.REPLACE(); 
DEFINE substring org.apache.pig.piggybank.evaluation.string.SUBSTRING(); 
DEFINE s_split org.apache.pig.piggybank.evaluation.string.Split(); 
DEFINE reverse org.apache.pig.piggybank.evaluation.string.Reverse();



-- Read the first CSV file and dump it as a tab-separated file
league_codes = LOAD 'league-codes.csv'
USING PigStorage(',')
AS (league_id:chararray, league_name);

STORE league_codes 
INTO 'league-codes' 
USING PigStorage('\t');



names = LOAD 'retrosheet-CurrentNames.csv'
USING PigStorage(',')
AS (franchise_id:chararray,
    historical_franchise_id:chararray,
    league_id:chararray,
    division:chararray,
    location_name:chararray,
    nickname:chararray,
    alt_nickname:chararray,
    first_game_date:chararray,
    last_game_date:chararray,
    city:chararray,
    state:chararray);

-- We don't need to explicitly specify the PigStorage mechanism.
-- Pig will use tab separated files by default.
STORE names
INTO 'team_names';



-- It's our first transformation:
-- 1) Load the data from the text file.
-- 2) Remove double-quote characters from the data.
-- 3) Write the data to the filesystem.
ballparks = LOAD 'retrosheet-ballparkcodes.txt'
USING PigStorage(',')
AS (park_id:chararray,
    park_name:chararray,
    park_nickname:chararray,
    city:chararray,
    state:chararray,
    start_date:chararray,
    end_date:chararray,
    league_id:chararray,
    notes:chararray);

formatted_parks = foreach ballparks GENERATE
  park_id,
  park_name,
  park_nickname,
  city,
  state,
  start_date,
  end_date,
  league_id,
  replace(notes, '"', '');

STORE formatted_parks
INTO 'ballparks';



personnel = LOAD 'retrosheet-umpire.coach.ids.txt'
USING PigStorage(',')
AS (last_name:chararray,
    first_name:chararray,
    person_id:chararray,
    debut_date:chararray
    );

-- the debut date is the only weirdly formatted date in here, so we'll reformat
-- it to be sortable in YYYYMMDD format
--
-- I had originally intended to do something with this, but I never
-- got around to writing it. As it stands, it's a good way to show 
-- how the string splitting function works (it returns an array)
-- and how that can be combined with flatten to turn it into a tuple
split_personnel = FOREACH personnel GENERATE
  last_name,
  first_name,
  person_id,
  flatten(s_split(debut_date, '/'))
AS (  
    month:chararray,
    day:chararray,
    year:chararray);

-- These subsequent x_, y_, z_ stages load our personnel 
-- relations into intermediate relations while we're doing
-- work on the data. This could have been done in a single
-- step, but it's much easier to read this way.
x_personnel = FOREACH split_personnel GENERATE
  last_name,
  first_name,
  person_id,
  reverse(CONCAT('000', month)) AS month, 
  reverse(CONCAT('000', day)) AS day, 
  year;

y_personnel = FOREACH x_personnel GENERATE
  last_name,
  first_name,
  person_id,
  reverse(substring($3, 0, 2)) AS month,
  reverse(substring($4, 0, 2)) AS day,
  year;

formatted_personnel = FOREACH y_personnel GENERATE
  last_name,
  first_name,
  person_id,
  CONCAT(CONCAT(year, month), day) AS debut;

STORE formatted_personnel
INTO 'personnel'; 
```You might think that each of the `STORE` statements would produce a file named`personnel` or `ballparks`. Unfortunately, you’d be wrong. You see, Pig turns our Pig Latin script into a MapReduce job on the fly. The final output is written into the appropriately named folder in a part file. The part files have names like `part-m-00000`. This tells us that it’s part of a MapReduce job, specifically the **m**ap phase, and it’s the first part. If there were multiple map operations that were performed (if we had a full cluster), there would be multiple files with incrementing part numbers. There is a second Pig Latin program that will load the standings, I’ve left that out of the article due to the sheer length of the code. However, it’s a relatively trivial script and should be easy enough to follow. The interesting thing to note is that the Pig Latin program to load the standings makes use of variable substitution. The driver script for this whole process, `load_standings.sh`, passes parameters to Pig on the command line and they are interpreted at run time. For more information on Pig and Pig Latin, you should check out the [Pig project page](http://pig.apache.org/) or the [Pig Latin Cookbook](http://pig.apache.org/docs/r0.7.0/cookbook.html). Since we’ve formatted our data, how do we load it? It’s pretty simple to do with a Hive `LOAD` statement:```
LOAD DATA LOCAL INPATH 'personnel/part-m-00000'
OVERWRITE INTO TABLE personnel ;

LOAD DATA LOCAL INPATH 'team_names/part-m-00000'
OVERWRITE INTO TABLE teams ;

LOAD DATA LOCAL INPATH 'ballparks/part-m-00000'
OVERWRITE INTO TABLE ballparks ;

LOAD DATA LOCAL INPATH 'league-codes/part-m-00000'
OVERWRITE INTO TABLE leagues ; 
```Because the data for the standings is separated out by year in the [Retrosheet](http://www.retrosheet.org/gamelogs/index.html)data set, we have to drive the insertion through a shell script:```
echo "  loading Box Scores..."
for d in `ls -d stats/*`
  do
    year=${d:6:4}

    echo "    Loading box scores for $year"
    for f in `ls $d/part*`
    do
      `hive -e "LOAD DATA LOCAL INPATH '$f' INTO TABLE raw_standings PARTITION (year='$year'); "`;
    done;
  done; 
```This loops through the output folders for the standings one at a time and loads the data into Hive. The `raw_standings` uses partitioning to separate out the data by year. Similar to partitioning in a relational database, this lets us store data with the same partition key in the same physical file. This improves query performance when we’re querying on a limited range of data. In this case, we’re only partitioning by year, but it’s possible to partition on multiple columns for greater granularity. To improve long term performance further, we’ve gone ahead and created a second table, `rc_standings`. The `rc_standings` table is partitioned by year, but it’s also using a `ROW FORMAT` setting to create an _RCFile_ – a columnar files. The data in this table will be stored in a column-oriented layout. This makes it possible to skip reading from columns that aren’t used in our query. With a table as wide as the `rc_standings` table, that’s going to be beneficial for our query performance. In order to get the data into the RCFile table, we have to copy it using a second driver in the shell script:```
for d in `ls -d stats/*`
  do
    year=${d:6:4}
    # This only overwrites the $year partition in the standings table
    # We also make use of the REGEX Column Specification to avoid pulling back the
    #  partitioning column(s) in the query. 
    #  See http://wiki.apache.org/hadoop/Hive/LanguageManual/Select#REGEX_Column_Specification
    #  for more about the REGEX Column Specification.
    hive -S -e "INSERT OVERWRITE TABLE rc_standings PARTITION (year=$year) SELECT \`(year)?+.+\` FROM raw_standings WHERE year = $year;"
  done; 
```Loading the standings took an incredible amount of time on my test system, despite using an SSD and having 8GB of memory available. I’m not sure why, and I would love to know what I did wrong, so feel free to sound off in the comments so we can make this process faster for people to get started.

### Querying Hive

Once we have the data loaded, querying becomes possible. To connect to hive, open up a command prompt and type `hive`. You should see blank hive prompt waiting for your commands. [![](http://facility9.com/wp-content/uploads/2010/12/hive-prompt.jpg "hive prompt")](http://facility9.com/wp-content/uploads/2010/12/hive-prompt.jpg) Let’s take a look at the contents of a single table. Type `select * from leagues;` and hit enter. You should see something like the following: [![](http://facility9.com/wp-content/uploads/2010/12/query-01.jpg "query 01")](http://facility9.com/wp-content/uploads/2010/12/query-01.jpg) Let’s try something trickier and use a `JOIN`.```
select l.league_name, t.franchise_id, t.first_game_date, t.last_game_date, t.nickname
from leagues l
join teams t on l.league_id = t.league_id; 
```[![](http://facility9.com/wp-content/uploads/2010/12/query-02.jpg "query 02")](http://facility9.com/wp-content/uploads/2010/12/query-02.jpg) So far, so good. These have both been very small queries. What if we want to look at some aggregated data. What then? Turns out that it’s just as easy to do as any other query.```
select l.league_name, s.year, avg(s.home_score)
from rc_standings s
join leagues l on s.home_league_id = l.league_id
group by l.league_name, s.year; 
```[![](http://facility9.com/wp-content/uploads/2010/12/query-03.jpg "query 03")](http://facility9.com/wp-content/uploads/2010/12/query-03.jpg) Once again, Hive is responding exactly as we would expect. That’s because HiveQL was designed to be as close to SQL as possible. The Hive engine is transforming our HiveQL into MapReduce jobs in the back end. Depending on the type of queries that we’re running, Our queries may execute on as few as one node in the cluster or as many as all nodes in the cluster. While HiveSQL looks like SQL, some additions have been made that make sense for Hive – `SORT BY` guarantees that the order of results within a single reducer while `ORDER BY` guarantees the order of results in the total output. There is also a `DISTRIBUTE BY` command determines how our data will be distributed to the reducers that are handling the query.

### Resources

The pig latin, shell script, and HiveSQL files used in this script can be found[here](http://facility9presentations.s3.amazonaws.com/hive-tutorial.zip), on Amazon S3. The source data can be found [here](http://facility9presentations.s3.amazonaws.com/hive-tutorial-source-data.7z). Please note that you’ll need [7-zip](http://www.7-zip.org/) to get to the source data. Which, by the way, was obtained free of charge and is copyrighted by Retrosheet. For more information, please contact[www.retrosheet.org](http://www.retrosheet.org/).