Tuesday, April 12, 2016

HDFS File Watcher

What is File Watcher?

File watcher is a mechanism, which provides you facility of continuously monitoring one or more folders for new files.
When new file arrives, file and folder watcher then triggers a custom action such as executing another program/job with newly arrived files.

What is HDFS File Watcher?

A File Watcher, which gives us the same feature and functionality of monitoring file/folder for HDFS (Hadoop Distributed File System).

Why HDFS File Watcher?

There are number of File Watchers already in place which allows us to monitor files and folders on different File-System/Environment. When we talk about HDFS there is no as such File Watcher.
I found that many of us faced this scenario where we wish to have File Watcher for HDFS.
Today we are going to discuss the same.
Hadoop 2.6 introduced DFSInotifyEventInputStream that you can use for writing custom File Watcher. You can get an instance of it from HdfsAdmin and then just call .take() or.poll() to get all the events. Event types include DELETE, APPEND and CREATE which should cover what you’re looking for.

How to create HDFS File Watcher?

To create HDFS File Watcher, I’m creating a maven project and will be discussing around that.
Prerequisites:
  1. Basic knowledge of JAVA
  2. Basic understanding of MAVEN
  3. JAVA 1.6 or above
  4. HDFS
  5. Any IDE (eclipse)

Steps to create HDFS File Watcher

  1. Open eclipse IDE and go to File -> New -> Other and select maven project

Screen Shot 2015-10-27 at 8.05.10 AM
Click on Next ->
  1. Select maven archetype and

archetype
Click on Next ->
  1. Name your project and

artifact-id
Click on Finish.
  1. Update App.java using below code

package com.bectortapu.HdfsFileWatcher;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.Event.CreateEvent;
import org.apache.hadoop.hdfs.inotify.Event.UnlinkEvent;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;

public class App {

public static void main(String[] args) throws IOException, InterruptedException, MissingEventsException { 

long lastReadTxid = 0;

if (args.length > 1) {

        lastReadTxid = Long.parseLong(args[1]);
     }

System.out.println("lastReadTxid = " + lastReadTxid);
HdfsAdmin admin = new HdfsAdmin(URI.create(args[0]), new Configuration()); 
DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream(lastReadTxid);

while (true) {
EventBatch batch = eventStream.take();
System.out.println("TxId = " + batch.getTxid());

for (Event event : batch.getEvents()) {
System.out.println("event type = " + event.getEventType());
switch (event.getEventType()) {

case CREATE:
CreateEvent createEvent = (CreateEvent) event;
System.out.println(" path = " + createEvent.getPath());
System.out.println(" owner = " + createEvent.getOwnerName());
System.out.println(" ctime = " + createEvent.getCtime());
break;
case UNLINK:
UnlinkEvent unlinkEvent = (UnlinkEvent) event;
System.out.println(" path = " + unlinkEvent.getPath());
System.out.println(" timestamp = " + unlinkEvent.getTimestamp());
break;
case APPEND:
case CLOSE:
case RENAME:
default:
break;
                        }
                 }
           }
     }
}

  1. After updating App.java please make sure to add repository dependency in your POM.XML

<dependencies>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>3.8.1</version>
    <scope>test</scope>
   </dependency>
   <dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-common</artifactId>
     <version>2.6.0-cdh5.4.4</version>
   </dependency>
   <dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-hdfs</artifactId>
     <version>2.6.0-cdh5.4.4</version>
   </dependency>
   <dependency>
     <groupId>commons-logging</groupId>
     <artifactId>commons-logging</artifactId>
     <version>1.1.3</version>
   </dependency>
   <dependency>
     <groupId>log4j</groupId>
     <artifactId>log4j</artifactId>
     <version>1.2.16</version>
   </dependency> 
   <dependency>
     <groupId>org.slf4j</groupId>
     <artifactId>slf4j-api</artifactId>
     <version>1.7.5</version>
   </dependency> 
   <dependency>
     <groupId>org.slf4j</groupId>
     <artifactId>slf4j-log4j12</artifactId>
     <version>1.7.5</version>
   </dependency>
</dependencies> 
<build> 
  <pluginManagement>
   <plugins>
    <plugin>
     <groupId>org.codehaus.mojo</groupId>
     <artifactId>exec-maven-plugin</artifactId>
     <version>1.2.1</version>
    </plugin>
   </plugins>
  </pluginManagement> 
 <plugins>
  <plugin>
   <groupId>org.apache.maven.plugins</groupId>
   <artifactId>maven-compiler-plugin</artifactId>
   <version>2.3.2</version>
 <configuration>
   <source>1.7</source>
   <target>1.7</target>
 </configuration>
 </plugin>
<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-jar-plugin</artifactId>
  <version>2.4</version>
 <configuration>
  <outputDirectory>${basedir}/target</outputDirectory>
 </configuration>
</plugin> 
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>target/hdfs-file-watcher-uber.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
   <excludes>
     <exclude>META-INF/*.SF</exclude>
     <exclude>META-INF/*.DSA</exclude>
     <exclude>META-INF/*.RSA</exclude>
   </excludes>
 </filter>
 </filters>
</configuration>
 <executions>
  <execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
    <configuration>
      <transformers> 
       <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> 
       <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.bectortapu.HdfsFileWatcher.App</mainClass>
       </transformer>
      </transformers>
     </configuration>
    </execution>
   </executions>
  </plugin>
 </plugins>
</build> 
<repositories>
    <repository>
<id>cdh.repo</id>
<url>https://repository.cloudera.com/cloudera/cloudera-repos/</url>
<name>Cloudera Repositories</name>
    <snapshots>
      <enabled>false</enabled>
    </snapshots>
    </repository>
    <repository>
<id>cdh.snapshots.repo</id>
<url>https://repository.cloudera.com/artifactory/libs-snapshot-local</url>
<name>Cloudera Snapshots Repository</name>
       <snapshots>
         <enabled>true</enabled>
       </snapshots>
       <releases>
         <enabled>false</enabled>
       </releases>
     </repository>
    <repository>
<id>central</id>
<url>http://repo1.maven.org/maven2/</url>
       <releases>
         <enabled>true</enabled>
       </releases>
       <snapshots>
         <enabled>false</enabled>
       </snapshots>
    </repository>
</repositories>

You have created your HDFS File Watcher successfully, Its time to deploy and test it.

6. Right click on your project and build it as mentioned in image
Screen Shot 2015-10-27 at 8.29.52 AM

 7. Define your goal while building the project like -

Goals
Click on Run and verify the jar created in target folder
target-jar

8. Here we Gooooo. Its time to run the jar and see the output, to do this use below command

java -jar HdfsFileWatcher-0.0.1-SNAPSHOT.jar <hdfs:URI> <Txn_id>
Here Txn_id is optional. If you didn’t mention, it  will start reading from Txn_id=0.
Make sure you run it as the hdfs user as the admin interface requires HDFS root.

2 comments:

  1. hi images are not getting displayed plz solve that issue

    ReplyDelete
  2. hi your post on hdfs file watcher was useful in my coding thanks for posting .. Hadoop Training in Velachery | Hadoop Training .
    Hadoop Training in Chennai | Hadoop .

    ReplyDelete