Sunday, September 27, 2015

Apache Camel-JPA

Camel JPA allows to route the Entities as Messages to read and write using Java Persistence Architecture (JPA). Let's quickly look at the configuration required to make Camel to route the messages (in fact entities) using camel-jpa.

Dependency

Add the following dependency along with camel-core to enable Camel JPA
<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-jpa</artifactId>
	<version>2.15.3</version>
</dependency> 

Component

Create JPA component using the class org.apache.camel.component.jpa.JpaComponent and inject Entity Manager Factory (and Transaction Manager if required)
<bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
	<property name="entityManagerFactory" ref="entityManagerFactory" />
	<property name="transactionManager" ref="transactionManager" />
</bean>

EndPoint

Format of the JPA Endpoint URI to be defined as (Component, Entity Class and Options)
jpa://<fully-classified-entity-class-name?option1=value1&option2=value2.."

Few Notable Parameters 

  • entityType: Default value is Class Name of the entity. The option will override the entity class name.
  • consumeDelete: The table row will be deleted after processing. In order to avoid deleting - set it to false. 
  • consumer.query: Custom query to consume the data from the table. 
  • consumer.namedQuery: Custom named query to consume the data from the table.
  • consumer.nativeQuery: Custom native query to consume the data form the table.
  • consumer.delay: Polling interval in milli-seconds between two polls.

Reading from Endpoint

Selection

By default, JPA will read all the rows in the table. To read specific set of rows in the table - use consumer.query (, consumer.nativeQuery or consumer.namedQuery) to set the selection. 

Preserve Rows

After the row is read from the table, the endpoint deletes the processed row. In order to avoid deleting the row, set the parameter consumeDelete to false. 
If we set this parameter - there is a possibility of reading the same row again because the row still exists on the table. In order to avoid the same row to be picked again - we can update the row by creating a method (in the entity and changing the specific value) by annotating it with @Consumed. 

Sending to Endpoint

Send entity by entity to an endpoint, it writes row by row into the table. We can use aggregator to write a list of entities into the endpoint. In this case, we need to set an extra parameter : entityType as java.util.List to make all the rows persisted to the table. 

Example

Let's read rows from a table and write into the same table by changing few parameters.

Table

mysql> desc camel_test;
+---------+----------+------+-----+---------+-------+
| Field   | Type     | Null | Key | Default | Extra |
+---------+----------+------+-----+---------+-------+
| seq_no  | int(11)  | NO   |     | NULL    |       |
| status  | char(4)  | NO   |     | NULL    |       |
| column1 | char(20) | YES  |     | NULL    |       |
| column2 | char(40) | YES  |     | NULL    |       |
| column3 | int(11)  | YES  |     | NULL    |       |
| column4 | datetime | YES  |     | NULL    |       |
+---------+----------+------+-----+---------+-------+

Configuration

We will see the example using Hibernate and Spring JPA with MYSQL

persitance.xml

<persistence xmlns="http://java.sun.com/xml/ns/persistence" version="1.0">
    <!--Persistence Unit for Mysql database-->
    <persistence-unit name="testMysql" transaction-type="RESOURCE_LOCAL">
        <provider>org.hibernate.ejb.HibernatePersistence</provider>
        <properties>
            <property name="hibernate.dialect" value="org.hibernate.dialect.MySQL5InnoDBDialect"/>
            <property name="hibernate.show_sql" value="true"/>
        </properties>
    </persistence-unit>
</persistence>

Datasource and Entity manager

<bean id="mysqltestDataSource"
	class="org.springframework.jdbc.datasource.DriverManagerDataSource">
	<property name="driverClassName" value="${jdbc.driverClassName}" />
	<property name="url" value="${jdbc.testurl}" />
	<property name="username" value="${jdbc.username}" />
	<property name="password" value="${jdbc.password}" />
</bean>

<context:property-placeholder location="classpath:jdbc.properties" />

<bean id="entityManagerFactory"
	class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
	<property name="dataSource" ref="mysqltestDataSource" />
	<property name="persistenceUnitName" value="testMysql" />
</bean>

Camel JPA

<bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
	<property name="entityManagerFactory" ref="entityManagerFactory" />
</bean>

Entity Class

package com.test.entity;
//Imports here.. 
@Entity
@Table(name = "camel_test")
@NamedQuery(name = "selectQuery", query = "select m from CamelTest m where m.status = 'P'")
public class CamelTest
{
    @Id
    @Column(name="seq_no")
    private Integer seq;
    
    @Column(name="status")
    private String status;
    
    @Column(name="column1")
    private String column1;
    
    @Column(name="column2")
    private String column2;
    
    @Column(name="column3")
    private Integer column3;
    
    @Column(name="column4")
    private Date column4;

    //Getters and Setters here
    
    @Consumed
    public void afterConsume()
    {
        setStatus("D");
    }
}

Points to be noted

  • The class is a JPA entity with one Named Query which will be used by JPA endpoint to read the data from the Table.
    • It reads the data from table whose status is 'P'
  • Only one extra method added which annotated with @Consumed (This will be executed after the entity is processed.
    •  After Camel JPA processing the record, the entity is changing the status to 'D' to avoid it from further reading

Finally Camel Route

<camel:camelContext id="camelContext">
	<camel:route>
		<camel:from uri="jpa://com.test.entity.CamelTest?consumer.namedQuery=selectQuery&amp;consumer.delay=10000&amp;consumeDelete=false" />
		<camel:to uri="direct:sample" />
	</camel:route>
</camel:camelContext>

Points to be noted

  • jpa is the JPA Component of Camel JPA
  • com.test.entity.CamelTest is the entity class created corresponding to camel_test table (as above)
  • Three options are set
    • consumeDelete to false to make the entity preserved after processing
    • consumer.dealy is set to 10 seconds to poll the table for every 10 seconds
    • consumer.namedQuery is set to read a specific set of rows (see NamedQuery annotation on the Entity). 
The above route just reads the rows from the table using the named query (selectQuery) with a polling delay of 10 secs, maps the entities, executes the @Consumed method (which modifies the status column from P to D so that the next poll will not pick the same row)  in the entity class after processing and finally sends the processed message to direct:sample (one by one)

Read in a batch

To read the rows in a batch, use aggregator

Aggreagor

package com.test.camel.aggregator;
//Imports
public class SampleAggregator implements AggregationStrategy
{
    @SuppressWarnings("unchecked")
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
    {
        List<CamelTest> list = null;
        if(oldExchange == null || oldExchange.getIn() == null)
        {
            list = new ArrayList<CamelTest>();
            list.add(newExchange.getIn().getBody(CamelTest.class));
        }
        else
        {
            list = oldExchange.getIn().getBody(List.class);
            list.add(newExchange.getIn().getBody(CamelTest.class));
        }
        newExchange.getIn().setBody(list);
        return newExchange;
    }
}

Modified Route as below with Aggregator

<camel:route>
	<camel:from uri="jpa://com.test.entity.CamelTest?consumer.namedQuery=selectQuery&amp;consumer.delay=10000" />
	<camel:aggregate strategyRef="aggregateStrategy" completionInterval="2000" completionSize="20" >
		<camel:correlationExpression>
			<camel:constant>true</camel:constant>	
		</camel:correlationExpression>
		<camel:to uri="direct:sample" />
	</camel:aggregate>
</camel:route>

Write to Table in Batch

Modify the same route to write a duplicate entry into the table by changing the Sequence Number. See the processor below

Processor

package com.test.camel.processor;
//Imports
public class SampleProcessor implements Processor
{
    public void process(Exchange exchange) throws Exception
    {
        //Processing goes here. Set the output to same as input. (Dummy Processing) 
        List<CamelTest> list = exchange.getIn().getBody(List.class);
        
        for(CamelTest t : list)
        {
            t.setSeq(t.getSeq() + 1000); 
        }
        exchange.getOut().setBody(list);     
    }
}

  • Processor - processor is used here to change the primary key of the entities to write it back (Other we get primary key violation error) - this is just for the example purpose - added 1000 for each sequence number.

Modified Route

<camel:route>
	<camel:from uri="jpa://com.test.entity.CamelTest?consumer.namedQuery=selectQuery&amp;consumer.delay=10000" />
	<camel:aggregate strategyRef="aggregateStrategy" completionInterval="2000" completionSize="20" >
		<camel:correlationExpression>
			<camel:constant>true</camel:constant>	
		</camel:correlationExpression>
		<camel:process ref="processor" />	
		<camel:to uri="jpa://com.test.entity.CamelTest?entityType=java.util.List" />
	</camel:aggregate>
</camel:route>
  • URI on the route (camel:to) has a parameter called entityType as java.util.List to save a list of entities back to table. (See Sending to Endpoint above)
Happy Learning!!!