Hadoop Hive UDTF Tutorial – Extending Apache Hive with Table Functions
Source: http://beekeeperdata.com/posts/hadoop/2015/07/26/Hive-UDTF-Tutorial.html
Author: Matthew Rathbone
Co-author: Elena Akhmatova
Article
Hadoop Hive UDTF Tutorial – Extending Apache Hive with Table Functions
While working with both Primitive types and Embedded Data Structures was discussed in part one, the UDF
interfaces are limited to a single output.
In this post we will look at user defined table functions represented by org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
interface. This function type is more complex, but allows us to output multiple rows and multiple columns for a single input (nifty!).
Code
All code and data used in this post can be found in my hive examples GitHub repository.
Demonstration Data
The table that will be used for demonstration is called people
. It has one column – name, which contains names of individuals and couples.
It is stored in a file called people.txt
~$ cat ./people.txt John Smith John and Ann White Ted Green Dorothy
We can upload this to Hadoop to a directory called people
:
hadoop fs -mkdir people hadoop fs -put ./people.txt people
Then load up the hive
shell, and create the hive table
CREATE EXTERNAL TABLE people (name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ESCAPED BY '' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION '/user/matthew/people';
The Value of UDTF
The UDF and GenericUDF functions from the previous article manipulate a single row of data. They return one element, and they must return a value.
This is not convenient for all data processing tasks. As Hive can store data of many kinds sometimes we do not want to have exactly one row of output for a given input. Perhaps we wish to output a few rows per input row, or output no rows at all. As an example, think what the function explode
(a Hive Built-In function) can do.
Similarly, perhaps we also wish to output several columns of data, instead of simply returning a single value.
Both these things we can accomplish with a UDTF
.
A Practical Example
Lets suppose that we would like to create a cleaner table of peoples’ names. The new table will have:
- Separate columns for First Name and Surname.
- No records that do not contain both first and last names (have no separating white space).
- Separate rows for each person in a couple (eg Nick and Nicole Smith).
To accomplish this goal, we will implement the org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
API.
We have to override 3 methods:
// in this method we specify input and output parameters: input ObjectInspector and an output struct abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException; // here we process an input record and write out any resulting records abstract void process(Object[] record) throws HiveException; // this function is Called to notify the UDTF that there are no more rows to process. Clean up code or additional output can be produced here. abstract void close() throws HiveException;
Full Implementation
public class NameParserGenericUDTF extends GenericUDTF { private PrimitiveObjectInspector stringOI = null; @Override public StructObjectInspector initialize(ObjectInspector[] args) UDFArgumentException { if (args.length != 1) { throw new UDFArgumentException("NameParserGenericUDTF() takes exactly one argument"); } if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { throw new UDFArgumentException("NameParserGenericUDTF() takes a string as a parameter"); } // input inspectors stringOI = (PrimitiveObjectInspector) args[0]; // output inspectors -- an object with two fields! List<String> fieldNames = new ArrayList<String>(2); List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2); fieldNames.add("name"); fieldNames.add("surname"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } public ArrayList<Object[]> processInputRecord(String name){ ArrayList<Object[]> result = new ArrayList<Object[]>(); // ignoring null or empty input if (name == null || name.isEmpty()) { return result; } String[] tokens = name.split("\\s+"); if (tokens.length == 2){ result.add(new Object[] { tokens[0], tokens[1] }); }else if (tokens.length == 4 && tokens[1].equals("and")){ result.add(new Object[] { tokens[0], tokens[3] }); result.add(new Object[] { tokens[2], tokens[3] }); } return result; } @Override public void process(Object[] record) throws HiveException { final String name = stringOI.getPrimitiveJavaObject(record[0]).toString(); ArrayList<Object[]> results = processInputRecord(name); Iterator<Object[]> it = results.iterator(); while (it.hasNext()){ Object[] r = it.next(); forward(r); } } @Override public void close() throws HiveException { // do nothing } }
Please check the github directory for the code.
Code Walkthrough
The UDTF takes string as a parameter and returns a struct with two fields. Similarly to the GenericUDF
, we have to manually configure all of the input and output object inspectors Hive needs in order to understand the inputs and outputs.
We identify a PrimitiveObjectInspector
for the input string.
stringOI = (PrimitiveObjectInspector) args[0]
Defining the output object inspectors requires us to define both field names, and the object inspectors required to read each field (in our case, both fields are strings).
List<String> fieldNames = new ArrayList<String>(2); fieldNames.add("name"); fieldNames.add("surname"); List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
The bulk of our logic resides in the processInputRecord
function which is fairly straightforward. Separating our logic allows easier testing without having to struggle with object inspectors.
Finally, once we have the result we can forward it, this registers that object as an output record for Hive to process.
while (it.hasNext()){ Object[] r = it.next(); forward(r); } }
Using our function
We can build our function and use it in Hive
mvn package cp target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar ./ext.jar
Then use it from hive
ADD JAR ./ext.jar; CREATE TEMPORARY FUNCTION process_names as 'com.matthewrathbone.example.NameParserGenericUDTF'; SELECT adTable.name, adTable.surname FROM people lateral view process_names(name) adTable as name, surname;
OK John Smith John White Ann White Ted Green
Testing
It is best to divide testing of a UDTF into two parts. Testing the data processing itself, and then testing the function as a whole in Hive. Testing in Hive is always recommended due to the complexity of the different elements, input formats, and data.
Below is an example unit test for splitting person’s name into name and surname, again this can found in full on GitHub:
public class NameParserGenericUDTFTest { @Test public void testUDTFOneSpace() { // set up the models we need NameParserGenericUDTF example = new NameParserGenericUDTF(); ObjectInspector[] inputOI = {PrimitiveObjectInspectorFactory.javaStringObjectInspector}; // create the actual UDF arguments String name = "John Smith"; // the value exists try{ example.initialize(inputOI); }catch(Exception ex){ } ArrayList<Object[]> results = example.processInputRecord(name); Assert.assertEquals(1, results.size()); Assert.assertEquals("John", results.get(0)[0]); Assert.assertEquals("Smith", results.get(0)[1]); } }
Finishing up
By now you should be a pro at customizing Hive functions.
If you need more resources you can check out my personal blog post for a walkthrough of building regular user defined functions, or take a look at the Apache Hive Book.