Sunday, September 27, 2015

Apache Camel-JPA

Camel JPA allows to route the Entities as Messages to read and write using Java Persistence Architecture (JPA). Let's quickly look at the configuration required to make Camel to route the messages (in fact entities) using camel-jpa.

Dependency

Add the following dependency along with camel-core to enable Camel JPA
<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-jpa</artifactId>
	<version>2.15.3</version>
</dependency> 

Component

Create JPA component using the class org.apache.camel.component.jpa.JpaComponent and inject Entity Manager Factory (and Transaction Manager if required)
<bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
	<property name="entityManagerFactory" ref="entityManagerFactory" />
	<property name="transactionManager" ref="transactionManager" />
</bean>

EndPoint

Format of the JPA Endpoint URI to be defined as (Component, Entity Class and Options)
jpa://<fully-classified-entity-class-name?option1=value1&option2=value2.."

Few Notable Parameters 

  • entityType: Default value is Class Name of the entity. The option will override the entity class name.
  • consumeDelete: The table row will be deleted after processing. In order to avoid deleting - set it to false. 
  • consumer.query: Custom query to consume the data from the table. 
  • consumer.namedQuery: Custom named query to consume the data from the table.
  • consumer.nativeQuery: Custom native query to consume the data form the table.
  • consumer.delay: Polling interval in milli-seconds between two polls.

Reading from Endpoint

Selection

By default, JPA will read all the rows in the table. To read specific set of rows in the table - use consumer.query (, consumer.nativeQuery or consumer.namedQuery) to set the selection. 

Preserve Rows

After the row is read from the table, the endpoint deletes the processed row. In order to avoid deleting the row, set the parameter consumeDelete to false. 
If we set this parameter - there is a possibility of reading the same row again because the row still exists on the table. In order to avoid the same row to be picked again - we can update the row by creating a method (in the entity and changing the specific value) by annotating it with @Consumed. 

Sending to Endpoint

Send entity by entity to an endpoint, it writes row by row into the table. We can use aggregator to write a list of entities into the endpoint. In this case, we need to set an extra parameter : entityType as java.util.List to make all the rows persisted to the table. 

Example

Let's read rows from a table and write into the same table by changing few parameters.

Table

mysql> desc camel_test;
+---------+----------+------+-----+---------+-------+
| Field   | Type     | Null | Key | Default | Extra |
+---------+----------+------+-----+---------+-------+
| seq_no  | int(11)  | NO   |     | NULL    |       |
| status  | char(4)  | NO   |     | NULL    |       |
| column1 | char(20) | YES  |     | NULL    |       |
| column2 | char(40) | YES  |     | NULL    |       |
| column3 | int(11)  | YES  |     | NULL    |       |
| column4 | datetime | YES  |     | NULL    |       |
+---------+----------+------+-----+---------+-------+

Configuration

We will see the example using Hibernate and Spring JPA with MYSQL

persitance.xml

<persistence xmlns="http://java.sun.com/xml/ns/persistence" version="1.0">
    <!--Persistence Unit for Mysql database-->
    <persistence-unit name="testMysql" transaction-type="RESOURCE_LOCAL">
        <provider>org.hibernate.ejb.HibernatePersistence</provider>
        <properties>
            <property name="hibernate.dialect" value="org.hibernate.dialect.MySQL5InnoDBDialect"/>
            <property name="hibernate.show_sql" value="true"/>
        </properties>
    </persistence-unit>
</persistence>

Datasource and Entity manager

<bean id="mysqltestDataSource"
	class="org.springframework.jdbc.datasource.DriverManagerDataSource">
	<property name="driverClassName" value="${jdbc.driverClassName}" />
	<property name="url" value="${jdbc.testurl}" />
	<property name="username" value="${jdbc.username}" />
	<property name="password" value="${jdbc.password}" />
</bean>

<context:property-placeholder location="classpath:jdbc.properties" />

<bean id="entityManagerFactory"
	class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
	<property name="dataSource" ref="mysqltestDataSource" />
	<property name="persistenceUnitName" value="testMysql" />
</bean>

Camel JPA

<bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
	<property name="entityManagerFactory" ref="entityManagerFactory" />
</bean>

Entity Class

package com.test.entity;
//Imports here.. 
@Entity
@Table(name = "camel_test")
@NamedQuery(name = "selectQuery", query = "select m from CamelTest m where m.status = 'P'")
public class CamelTest
{
    @Id
    @Column(name="seq_no")
    private Integer seq;
    
    @Column(name="status")
    private String status;
    
    @Column(name="column1")
    private String column1;
    
    @Column(name="column2")
    private String column2;
    
    @Column(name="column3")
    private Integer column3;
    
    @Column(name="column4")
    private Date column4;

    //Getters and Setters here
    
    @Consumed
    public void afterConsume()
    {
        setStatus("D");
    }
}

Points to be noted

  • The class is a JPA entity with one Named Query which will be used by JPA endpoint to read the data from the Table.
    • It reads the data from table whose status is 'P'
  • Only one extra method added which annotated with @Consumed (This will be executed after the entity is processed.
    •  After Camel JPA processing the record, the entity is changing the status to 'D' to avoid it from further reading

Finally Camel Route

<camel:camelContext id="camelContext">
	<camel:route>
		<camel:from uri="jpa://com.test.entity.CamelTest?consumer.namedQuery=selectQuery&amp;consumer.delay=10000&amp;consumeDelete=false" />
		<camel:to uri="direct:sample" />
	</camel:route>
</camel:camelContext>

Points to be noted

  • jpa is the JPA Component of Camel JPA
  • com.test.entity.CamelTest is the entity class created corresponding to camel_test table (as above)
  • Three options are set
    • consumeDelete to false to make the entity preserved after processing
    • consumer.dealy is set to 10 seconds to poll the table for every 10 seconds
    • consumer.namedQuery is set to read a specific set of rows (see NamedQuery annotation on the Entity). 
The above route just reads the rows from the table using the named query (selectQuery) with a polling delay of 10 secs, maps the entities, executes the @Consumed method (which modifies the status column from P to D so that the next poll will not pick the same row)  in the entity class after processing and finally sends the processed message to direct:sample (one by one)

Read in a batch

To read the rows in a batch, use aggregator

Aggreagor

package com.test.camel.aggregator;
//Imports
public class SampleAggregator implements AggregationStrategy
{
    @SuppressWarnings("unchecked")
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
    {
        List<CamelTest> list = null;
        if(oldExchange == null || oldExchange.getIn() == null)
        {
            list = new ArrayList<CamelTest>();
            list.add(newExchange.getIn().getBody(CamelTest.class));
        }
        else
        {
            list = oldExchange.getIn().getBody(List.class);
            list.add(newExchange.getIn().getBody(CamelTest.class));
        }
        newExchange.getIn().setBody(list);
        return newExchange;
    }
}

Modified Route as below with Aggregator

<camel:route>
	<camel:from uri="jpa://com.test.entity.CamelTest?consumer.namedQuery=selectQuery&amp;consumer.delay=10000" />
	<camel:aggregate strategyRef="aggregateStrategy" completionInterval="2000" completionSize="20" >
		<camel:correlationExpression>
			<camel:constant>true</camel:constant>	
		</camel:correlationExpression>
		<camel:to uri="direct:sample" />
	</camel:aggregate>
</camel:route>

Write to Table in Batch

Modify the same route to write a duplicate entry into the table by changing the Sequence Number. See the processor below

Processor

package com.test.camel.processor;
//Imports
public class SampleProcessor implements Processor
{
    public void process(Exchange exchange) throws Exception
    {
        //Processing goes here. Set the output to same as input. (Dummy Processing) 
        List<CamelTest> list = exchange.getIn().getBody(List.class);
        
        for(CamelTest t : list)
        {
            t.setSeq(t.getSeq() + 1000); 
        }
        exchange.getOut().setBody(list);     
    }
}

  • Processor - processor is used here to change the primary key of the entities to write it back (Other we get primary key violation error) - this is just for the example purpose - added 1000 for each sequence number.

Modified Route

<camel:route>
	<camel:from uri="jpa://com.test.entity.CamelTest?consumer.namedQuery=selectQuery&amp;consumer.delay=10000" />
	<camel:aggregate strategyRef="aggregateStrategy" completionInterval="2000" completionSize="20" >
		<camel:correlationExpression>
			<camel:constant>true</camel:constant>	
		</camel:correlationExpression>
		<camel:process ref="processor" />	
		<camel:to uri="jpa://com.test.entity.CamelTest?entityType=java.util.List" />
	</camel:aggregate>
</camel:route>
  • URI on the route (camel:to) has a parameter called entityType as java.util.List to save a list of entities back to table. (See Sending to Endpoint above)
Happy Learning!!!

Sunday, September 13, 2015

Apache Camel - Simple Routing Example

This post is continuation of the previous post Apache Camel overview. In this post, we will see how to define a sample route using Apache Camel (with Spring Integration).

Dependencies

Apache Camel can work easily with Spring. The following dependencies are required to make Apache Camel to work with Spring.
  <dependency>
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-core</artifactId>
       <version>2.15.3</version>
  </dependency>
  <dependency>
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-spring</artifactId>
       <version>2.15.3</version>
  </dependency>
camel-core is the actual dependency and camel-spring is required for the spring integration.

Spring Configuration

To add camel to Spring Configuration, add the following

Namespace

xmlns:camel="http://camel.apache.org/schema/spring"

Schema Location

xsi:schemaLocation="http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd"

Combined Spring XML

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
              http://www.springframework.org/schema/beans/spring-beans.xsd
              http://camel.apache.org/schema/spring 
              http://camel.apache.org/schema/spring/camel-spring.xsd">

</beans>         

Camel Context and Route

Camel context is the camel runtime. In Spring, we need to define the Camel Context to define the Route. Within the context, route must be defined.
Problem Statement: Read a file from a directory, process it and copy the processed file to a destination folder.

Define the Processor

package com.test.camel.processor;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SampleProcessor implements Processor
{
    private static final Logger LOG = LoggerFactory.getLogger(SampleProcessor.class);

    public void process(Exchange exchange) throws Exception
    {
        LOG.info("Input is "+exchange.getIn().getBody());
        //Processing goes here. Set the output to same as input. (Dummy Processing) 
        exchange.setOut(exchange.getIn());
    }

}

Points to be noted:

  • Processor is a class which implements org.apache.camel.Processor. 
  • Processor implements a method which takes only one argument (Exchange) - which consists of three messages (in, out and exception) and few other properties related to the route and it's camel context.

Define the Bean

     <bean id="processor" class="com.test.camel.processor.SampleProcessor" />

Define the Route

 <camel:camelContext id="camelContext">
      <camel:route id="file-route">
            <camel:from uri="file:/tmp/input.test" />
            <camel:process ref="processor" />
            <camel:to uri="file:/tmp/output.test" />
      </camel:route>
  </camel:camelContext>

Points to be noted:

  • Route is defined inside the Spring Config (using DSL). It also be defined using RouteBuilder. (We will concentrate only on DSL).
  • Define the route inside the Camel Context 
  • A very basic route consists of From (Where to fetch from), Processor (Which processor the input) and To (Where to write the processed input). 
  • Processor is optional - if we don't want to do any processing. 
  • Here in this route (file-route), files will be read from directory /tmp/input.test, and will be written into /tmp/output.test directory after processing.
  • Camel Context and Route(s) will be automatically started once the spring application context is loaded and started. So, once spring configuration file is loaded, camel looks for the files in the input directory. 
  • Camel Context and Route(s) will be shutdown once the application context is closed.

Exception Handling

Apache Camel provides try, catch and finally (optional) blocks to be defined within a route to handle the exceptions while routing or processing. The above route can be re-defined with try-catch blocks to check for Exception 
 <camel:camelContext id="camelContext">
      <camel:route id="file-route">
            <camel:from uri="file:/tmp/input.test" />
            <camel:doTry>
                 <camel:process ref="processor" />
                 <camel:to uri="file:/tmp/output.test" />
                 <camel:doCatch>
                      <camel:exception>java.lang.Exception</camel:exception>
                      <camel:log message="Exception while processing "></camel:log>
                      <camel:to uri="file:/tmp/error.test" />
                 </camel:doCatch>
            </camel:doTry>
      </camel:route>
  </camel:camelContext>
In the above case, if an exception is occurred while routing/processing, the exception will be caught, a log will be written and the file will be moved to another directory /tmp/output.test.

Full Spring Configuration

Complete spring configuration file is as below
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:camel="http://camel.apache.org/schema/spring" 
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans 
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://camel.apache.org/schema/spring 
           http://camel.apache.org/schema/spring/camel-spring.xsd">

     <bean id="processor" class="com.test.camel.processor.SampleProcessor" />

     <camel:camelContext id="camelContext">
         <camel:route id="file-route">
              <camel:from uri="file:/tmp/input.test" />
              <camel:doTry>
                   <camel:process ref="processor" />
                   <camel:to uri="file:/tmp/output.test" />
                   <camel:doCatch>
                        <camel:exception>java.io.IOException</camel:exception>
                        <camel:log message="Exception while processing "></camel:log>
                        <camel:to uri="file:/tmp/error.test" />
                   </camel:doCatch>
              </camel:doTry>
         </camel:route>
     </camel:camelContext>
</beans>
         
In the next post, we will see more details about routing. Happy Learning!!!!