On Ingest Parser parses the file while it is being written onto the cluster. Use following guidelines to add an On Ingest parser
Create your own parser class implementing com.queryio.hadoop.hdfs.userdefinedtags.IUserDefinedParser interface. To implement this interface, include queryio-plugins.jar in your classpath. JAR file can be found at $INSTALL_HOME/services/hadoop-2.0.2-alpha/share/hadoop/common/queryio-plugins.jar
The definition of the interface is as follows:
/** * @return List of CustomTag */ public List<UserDefinedTag> getCustomTagList(); /** * Provide tableName, column names and column sql datatypes of fields being * parsed by this parser for a particular filetype. * * @return List of ColumnMetaData */ TableMetadata getTableMetaData(String fileExtension); /** * Read InputStream and parse the content to extract CustomTags. * * @param is * InputStream of file * @throws Exception */ void parseStream(InputStream is, String fileExtension) throws Exception; /** * Specifies if the framework needs to verify database schema for the tags * being inserted. * * @return */ boolean updateDbSchema(); void parse(Reader reader, TableMetadata tableMetadata, Metadata metadata) throws Exception;
Create a jar file containing the class which you just implemented.
The example code below shows the parser which uses Apache Tika to extract metadata from various files.
import java.io.File; import java.io.InputStream; import java.util.ArrayList; import java.util.List; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.AutoDetectParser; import org.apache.tika.sax.BodyContentHandler; public class FileContentParser implements IUserDefinedTagParser{ List<UserDefinedTag> list = new ArrayList<UserDefinedTag>(); @Override public void parseStream(InputStream is, String fileExtension) throws Exception { TableMetadata tableMetadata = map.get(fileExtension); DefaultHandler handler = new DefaultHandler(); Metadata metadata = new Metadata(); LOG.debug("TikaCustomTagParser is now parsing document."); parser.parse(is, handler, metadata); for(int i=0; i<metadata.names().length; i++){ String key = metadata.names()[i]; String value = metadata.get(key); key = key.replaceAll("[^a-zA-Z0-9]+","_"); key = key.replace("-", "_"); key = key.replace(".", "_"); key = key.replace(" ", "_"); key = key.toUpperCase(); if(tableMetadata.getColumnMetadataByColumnName(key) != null){ if(value != null && !value.isEmpty()){ this.list.add(new UserDefinedTag(key, value)); LOG.debug("Parser found tag: " + key + "\t" + value); } } } LOG.debug("Total tags found by TikaCustomTagParser: " + this.list.size()); } boolean completed = false; @Override public List<UserDefinedTag> getCustomTagList() { return list; } @Override public void setFilterExpression(String expression) { } @Override public TableMetadata getTableMetaData(String fileExtension) { return map.get(fileExtension); } @Override public boolean updateDbSchema() { return false; }
You must provide metadata for all the filetypes you are going to support in your parser to getColumnMetaDataList(). If filetype on ingest table has same name as some table already present in QueryIO database and both tables have different schema, then you need to change table name or delete the table present in database.