Writing On Ingest Parser

On Ingest Parser parses the file while it is being written onto the cluster. Use following guidelines to add an On Ingest parser

Create your own parser class implementing com.queryio.hadoop.hdfs.userdefinedtags.IUserDefinedParser interface. To implement this interface, include queryio-plugins.jar in your classpath. JAR file can be found at $INSTALL_HOME/services/hadoop-2.0.2-alpha/share/hadoop/common/queryio-plugins.jar

The definition of the interface is as follows:

	/**
	 * @return List of CustomTag
	 */
	public List<UserDefinedTag> getCustomTagList();
	
	/**
	 * Provide tableName, column names and column sql datatypes of fields being
	 * parsed by this parser for a particular filetype.
	 * 
	 * @return List of ColumnMetaData
	 */
	TableMetadata getTableMetaData(String fileExtension);

	/**
	 * Read InputStream and parse the content to extract CustomTags.
	 * 
	 * @param is
	 *            InputStream of file
	 * @throws Exception
	 */
	void parseStream(InputStream is, String fileExtension) throws Exception;

	/**
	 * Specifies if the framework needs to verify database schema for the tags
	 * being inserted.
	 * 
	 * @return
	 */
	boolean updateDbSchema();
	
	void parse(Reader reader, TableMetadata tableMetadata, Metadata metadata) throws Exception;
	

Create a jar file containing the class which you just implemented.

The example code below shows the parser which uses Apache Tika to extract metadata from various files.

import java.io.File;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.sax.BodyContentHandler;

public class FileContentParser implements IUserDefinedTagParser{
	List<UserDefinedTag> list = new ArrayList<UserDefinedTag>();
	
	@Override
	public void parseStream(InputStream is, String fileExtension) throws Exception {	
		TableMetadata tableMetadata = map.get(fileExtension);
    	DefaultHandler handler = new DefaultHandler();
    	
		Metadata metadata = new Metadata();
		LOG.debug("TikaCustomTagParser is now parsing document.");
		parser.parse(is, handler, metadata);	

		for(int i=0; i<metadata.names().length; i++){
			String key = metadata.names()[i];
			String value = metadata.get(key); 
			key = key.replaceAll("[^a-zA-Z0-9]+","_");
			key = key.replace("-", "_");
			key = key.replace(".", "_");
			key = key.replace(" ", "_");
			key = key.toUpperCase();
			if(tableMetadata.getColumnMetadataByColumnName(key) != null){
				if(value != null && !value.isEmpty()){
					this.list.add(new UserDefinedTag(key, value));
					LOG.debug("Parser found tag: " + key + "\t" + value);
				}
			}
		}
		LOG.debug("Total tags found by TikaCustomTagParser: " + this.list.size());
	}
	
	boolean completed = false;

	@Override
	public List<UserDefinedTag> getCustomTagList() {
		return list;
	}

	@Override
	public void setFilterExpression(String expression) {		
		
	}

	@Override
	public TableMetadata getTableMetaData(String fileExtension) {
		return map.get(fileExtension);
	}

	@Override
	public boolean updateDbSchema() {
		return false;
	}

You must provide metadata for all the filetypes you are going to support in your parser to getColumnMetaDataList(). If filetype on ingest table has same name as some table already present in QueryIO database and both tables have different schema, then you need to change table name or delete the table present in database.



Copyright © 2018 QueryIO Corporation. All Rights Reserved.

QueryIO, "Big Data Intelligence" and the QueryIO Logo are trademarks of QueryIO Corporation. Apache, Hadoop and HDFS are trademarks of The Apache Software Foundation.