Jan
6

Ingestion with Spring Integration

Posted on January 6, 2014 by James Ji

In these days it’s well known that big data platforms can help in terms of processing data in a scalable way.  But guess what, before you can enjoy this efficiency, you need to ingest the data.  There are a couple of tools out there in the market which are helpful with ingestion.  The most famous two are probably Storm and Flume. However, there are cases when you have to build your own ingestion system in order to fulfill all your requirements.  In this case, Spring Integration might be the right tool for the job.

If you have been in the Java world long enough, you must have heard of Spring, but maybe not Spring Integration.  I was first introduced to Spring Integration when I was designing an ingestion system for a client.  The goal of Spring Integration was this functionality: “lightweight messaging within Spring-based applications and supports integration with external systems with declarative adapters”[1].  The beauty of using Spring Integration to simplify our code amazed me.

To add Spring Integration into your project, simply put the following code snippet in your pom.xml

<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-core</artifactId>
  <version>2.2.6.RELEASE</version>
  </dependency>

At the time I am writing this, there were a couple different versions available. This article will be based on v2.2.6 since that was the version I worked with. Subsequent changes on the newer versions can be found in their reference documents.

Now, let’s get to know about the main components in Spring Integration.  To give an overview, Spring Integration is a typical pipes and filter design architecture.  There are three main components in Spring Integration.  They are message, message channel and endpoint.  Message is a generic wrapper for any Java object. It is a combination of the header and the payload, in which the header may be any metadata in key-value pairs format and the payload is the actual content you want to pass along.  Message channel represents the pipe in this architecture.  Generally, you don’t do much with the channel because the only responsibility it has is to pass the message from one endpoint to another.  Endpoint acts as the “filter” in the architecture, but it does more than that.  There are six types of endpoints defined in Spring Integration and they are transformer, filter, router, splitter, aggregator, service activator and channel adapter. I think their names fairly describe what they are supposed to be doing.  If you are looking for more details, visit this link .

The following is an example about how Spring Integration can save time when developing a simple ingestion system.  In this example, we have a folder that has a number of .zip and .tgz archives.  The ingestion system should pick up all these archives. The .zip archives will all be going into MongoDB[2]. The .tgz archives will be checked if the file is corrupted before ingestion.  For the sake of simplicity, we assume the corrupted file has the string “corrupt” embedded in its filename (e.g., myfile_corrupt.tgz).

The first step is to create a Maven project in Eclipse and put the following dependencies into its pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>ThinkBig</groupId>
 <artifactId>SIExample</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <dependencies>
  <dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-core</artifactId>
  <version>2.2.6.RELEASE</version>
  </dependency>
  <dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mongodb</artifactId>
  <version>2.2.6.RELEASE</version>
  </dependency>
  <dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-file</artifactId>
  <version>2.2.6.RELEASE</version>
  </dependency>
<!-- adding commons-io for reading the file content into a byte array-->
  <dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-io</artifactId>
  <version>1.3.2</version>
  </dependency>
 </dependencies>
</project>

The second step, which is similar to other Spring projects, is to set up the components that we’ll use in an XML file.  I created the XML file Spring-Integration.xml and put the following content in it.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-file="http://www.springframework.org/schema/integration/file"
   xmlns:context="http://www.springframework.org/schema/context"
   xmlns:int-mongo="http://www.springframework.org/schema/integration/mongodb"
   xsi:schemaLocation="
       http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
       http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/integration/mongodb http://www.springframework.org/schema/integration/mongodb/spring-integration-mongodb-2.2.xsd">

   <!-- file poller who keeps looking at a folder and picks up new landed files -->
   <int-file:inbound-channel-adapter id="filePoller" directory="/Users/jamesji/testFiles/samples" channel="filesChannel">
       <int:poller fixed-rate="1"/>
   </int-file:inbound-channel-adapter>

   <int:channel id="filesChannel"/>
   
   <!-- A router to route different files to different channels using expression-->
   <int:router input-channel="filesChannel" expression="payload.getName().substring(payload.getName().lastIndexOf('.'))">
       <int:mapping value=".tgz" channel="tgzFilterChannel"/>
       <int:mapping value=".zip" channel="toMongoDBAdapter"/>
   </int:router>
   
   <int:channel id="tgzFilterChannel"/>
   
   <!-- filter to check if it is a corrupted file -->
   <int:filter id="tgzFilter" input-channel="tgzFilterChannel" output-channel="toMongoDBAdapter" expression="!payload.getName().contains('corrupt')"></int:filter> 
           
   <int:channel id="toMongoDBAdapter"/>
   
   <!-- MongoDB adapter, read file contents and write them into MongoDB -->
   <int-mongo:outbound-channel-adapter id="mongoDB" channel="toMongoDBAdapter"
       collection-name="files" mongo-converter="converter" mongodb-factory="mongoDbFactory" 
   />
   
   <bean id="converter" class="com.TBA.SIExample.MongoConverterImple"/>
   <bean id="mongoDbFactory" class="org.springframework.data.mongodb.core.SimpleMongoDbFactory">
       <constructor-arg>
           <bean class="com.mongodb.Mongo"/>
       </constructor-arg>
       <constructor-arg value="newdb"/>
   </bean>
</beans>

This XML file defines a couple of components and how they connect with each other.  The source is a file adapter: it keeps looking at a folder defined in the directory attribute. It passes all files it sees in this folder to the next component, which is the router.  The router sends messages to different channels depending on the filename extension.  All .zip files are sent directly to the MongoDB outbound channel adapter, while .tgz files are first sent through a filter. In the filter, the filename is checked to see if it contains the string “corrupt”. Note that even though there are less than 50 lines of code here, it still includes several business logic rules and the data flow is very well explained.  Even better, if you have the Spring tool suite plugin installed in your Eclipse, there is an integration-graph tab you can use to graphically illustrate the pipe layout (as shown below).

screenshot

After all this XML, let’s get our hands on some Java code now.  In order to use the MongoDB outbound channel adapter, we need to implement the MongoConverter interface.  There are a couple methods in this interface, but since we only care about storing data in this example, only two methods are used.  For all other methods, we just return null for now.  The implementation is fairly simple. In the write method, the source is what we expect to be passed in and the sink is the object we will be storing in MongoDB. In this example, the source is a File object; the filename and file content are stored into MongoDB.

package com.TBA.SIExample;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

import org.apache.commons.io.IOUtils;
import org.springframework.core.convert.ConversionService;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
import org.springframework.data.mongodb.core.mapping.SimpleMongoMappingContext;

import com.mongodb.DBObject;
import com.mongodb.DBRef;

public class MongoConverterImple implements MongoConverter {

@Override
public MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> getMappingContext() {

return new SimpleMongoMappingContext();
}

@Override
public void write(Object source, DBObject sink) {

// TODO Auto-generated method stub
if(source instanceof File){
sink.put("filename", ((File)source).getName());
FileInputStream fin = null;

try{
fin = new FileInputStream((File)source);
byte[] bytes = IOUtils.toByteArray(fin);
sink.put("fileContent", bytes);
}catch(Exception e){
e.printStackTrace();
}finally{
if(fin != null)
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

}

The next Java class we need to write is the driver. This is a class I copied from the Spring-Integration-in-Action[3]. One thing we need to know is that it picks up the XML file we defined above and initializes each component. It adds a while loop inside so as to become a long running process and only stop when you type “q”.

package com.TBA.SIExample;
import java.util.Scanner;

import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public final class Main {

private Main() { }

public static void main(final String... args) {

final AbstractApplicationContext context =
new ClassPathXmlApplicationContext("Spring-Integration.xml");

context.registerShutdownHook();

final Scanner scanner = new Scanner(System.in);

while (!scanner.hasNext("q")) {
//Do nothing unless user presses 'q' to quit.
}

scanner.close();
context.close();

System.exit(0);

}
}

And that is it. We finished a simple ingestion system with two Java classes and one XML file.  The beauty of Spring Integration is that it provides a lot of working components right out of the box.  With the newer version, you can find more adapters to use as your data sources and sinks.  Please feel free to reach me if you have any questions.

James Ji
Data Engineer at Think Big

Reference

[1]http://projects.spring.io/spring-integration/

[2]http://www.mongodb.org/

[3]https://github.com/spring-projects/Spring-Integration-in-Action

Share Button



0 Comments

Leave a Reply