Monday, February 27, 2017

ThreadLocal in Java

ThreadLocal

ThreadLocal is a Java Class which allows to create a local variable for each thread which can be accessible by itself only. That means the value created or assigned by one thread is not visible to other threads.
Let's look at this using an example

Example:

Creating a thread with Class name ThreadExample. The class has a static ThreadLocal variable, which will be set inside the run() method and accessed couple of times. Check below.
package com.test;

public class ThreadExample implements Runnable
{
    private static ThreadLocal<String> t = new ThreadLocal<String>();

    private String i;

    ThreadExample(String i)
    {
        this.i = i;
    }

    public void run()
    {
        t.set(i + " " + Math.random());
        try
        {
            System.out.println(t.get());
            Thread.sleep(10000);
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        System.out.println(t.get());
    }
}

From the main(), two threads created of type ThreadExample and called. See below
package com.test;

public class Application
{
    public static void main(String... strings) throws Exception
    {
        ThreadExample thread = new ThreadExample("ThreadLocal");

        Thread t1 = new Thread(thread); // Creating first thread
        Thread t2 = new Thread(thread); // Creating second thread

        t1.start(); // Starting
        t2.start(); // Starting

        t1.join(); //Waiting for the thread to complete
        t2.join(); //Waiting for the thread to complete

    }
}

Output

Output of the above example is
ThreadLocal 0.7293783856054876
ThreadLocal 0.42852459712457325
ThreadLocal 0.7293783856054876
ThreadLocal 0.42852459712457325

Points to be noted

  • ThreadLocal class is a generic type. You can set any type of Object value in it. In the above example, its String.
  • You can set the value using .set() method and read it using .get() method.
  • Each thread has it's own value inside the variable in ThreadExample. (You can find that in the output above)
  • Even though the variable is static, is not changed by two threads because it's a type of ThreadLocal. Each thread had it's own value in t. It works in the same way, if it's not static also. Just to show it's importance, i made it static.
  • If the variable is not ThreadLocal, both threads might have updated and accessed same value.

InheritableThreadLocal

InheritableThreadLocal extends ThreadLocal, which works in the same way as ThreadLocal except that the value inside it can be accessible by the thread's child as well. 

Thanks for reading. Happy Learning!!!


Friday, January 1, 2016

Java 8 - Streams (Part - II)

In the previous post, We learnt - What are Streams, How to create, How to Use and What are the operations that can be performed against them. Now we will continue on operations to get insights about the Streams.

Terminal and Intermediate Operations

Operations on Streams are two types. One is Intermediate Operation which returns a stream on which we can perform another operation. Second is Terminal Operation which returns a result other than stream. In the last post, we saw forEach operation which is a terminal operation, whereas sort operation which is a intermediate operation because it returns an another stream. Now, we will see some other important operations (both terminal and intermediate operations).

Count

Count operation is a Terminal operation which returns the number of the elements in the current Stream.
List<String> list = Arrays.asList("a1","a2","b","b","c1","c2");
System.out.println(list.stream().collect()); // This prints the value of 6

Reduce

Reduce operation (Terminal operation) reduces the elements in a stream using an operation. See this
List<String> list = Arrays.asList("a1","a2","b","b","c1","c2");
 list.stream().reduce((s1,s2) -> s1 + "-"+ s2 ).ifPresent(System.out::println); //prints a1-a2-b-b-c1-c2

Filter

Filter operation (Intermediate Operation) filters some of the elements in the stream based on the operation passed to it.
List<String> list = Arrays.asList("a1","a2","b","b","c1","c2");
list.stream().filter(a -> a.startsWith("a")).forEach(System.out::println); //prints a1 and a2

Match

Match Operation (Terminal Operation) returns boolean value based on the operation associated with it. We can do three match operations : anyMatch, allMatch and noneMatch. Each of these returns a value (true or false) based on the match criteria.
List<String> list = Arrays.asList("a1","a2","b","b","c1","c2");
System.out.println(list.stream().allMatch((a) -> a.startsWith("a")));  // false
System.out.println(list.stream().anyMatch((a) -> a.startsWith("a")));  // true
System.out.println(list.stream().noneMatch((a) -> a.startsWith("d"))); // true

Map

Map Operation (Intermediate Operation) converts each of the element in stream into another object via the operation passed to it. When this map operation combined with other operations like filter, reduce, forEach etc.. gives trivial results.
See below an example of how to make all the elements in the stream which start with "b" to upper case.
List<String> list = Arrays.asList("apple","biscuit","blah","cupcake","cat");
list.stream().filter(a -> a.startsWith("b")).map(String::toUpperCase).forEach(System.out::println);

Collect

Collect is one of the most important feature/method of the Stream class. Collect is used to convert the Stream into a List, Set or Map. Collect method accepts a Collector which can perform operations like Supplier, Combiner, Accumulator or Finisher. Let's look at some of them.
List<String> list = Arrays.asList("a1","a2","b","b","c1","c2");
Set<String> set = list.stream().collect(Collectors.toSet());
set.stream().forEach(System.out::println);
Above, we are trying to convert a given stream into a Set (As we know the property of Set - it doesn't hold duplicate values. So b will be dropped when it prints the values on the console.
Look at, another example how to group the stream using collect method
List<String> list = Arrays.asList("a1","a2","b","b","c1","c2");
Map<Integer,List<String>> map = list.stream().collect(Collectors.groupingBy((String p) -> p.length()));
System.out.println(map);
The above code prints the data to console as
{1=[b, b], 2=[a1, a2, c1, c2]}
This also helps to summarize the operations like min, max, average, count etc. Let's see how to do that
List<String> list = Arrays.asList("a1","a2","b","b","c1","c2");
IntSummaryStatistics stats = list.stream().collect(Collectors.summarizingInt((String t) -> t.length()));
System.out.println(stats); // This will print IntSummaryStatistics{count=6, sum=10, min=1, average=1.666667, max=2}
So far we have seen some built-in Collectors, now we will see how to create a collector with all of it's features (Supplier, Combiner, Accumulator and Finisher)
List<String> list = Arrays.asList("a1","a2","b","b","c1","c2"); 
Collector<String, StringJoiner, String> sCollector = 
      Collector.of(() -> new StringJoiner(" | "),        // supplier
      (j, p) -> j.add(p),                               // accumulator
      (j1, j2) -> j1.merge(j2),                         // combiner
      StringJoiner::toString);                          // finisher
        
String str = list.stream().collect(sCollector);
System.out.println(str); // Prints a1 | a2 | b | b | c1 | c2
The output simply explains what is being done using the streams (merging the strings using StringJoiner with pipe as a delimiter). This works more better if we are using it on objects with some calculations on it instead of strings.

Parallel Streams

Parallel Operations in Java made easier after the introduction of Fork/Join framework. To do that, we need to implement the same in our programming. In Streams it's very easy, just use .parallelStream to get the parallel stream instead of .stream in each of the examples above and make parallel operations. So, depending on the type of operation and requirement either we can call a sequential stream (using stream() method) or parallel stream (using parallelStream() method). Try replacing all the above examples with parallelStream and see the results.

Happy Learning!!!

Saturday, November 28, 2015

Java 8 - Streams (Part - I)

Stream represents a sequence of elements which can calculate or compute on-demand. Stream is an interface like an Iterator but it can do parallel processing. In other words, we can say Stream is a lazy collection where the values are computed on-demand.

How to Create

Streams are defined in java.util.stream package. Stream can be obtained using various options but a simple way is to get Stream is from Collection Interface. A default method defined as stream in Collection Interface to get it from the respective collection built. This is one of the best example to explain why the default methods are introduced in Java 8. See my previous post for more details on default methods. Collection Interface has two default methods for streams
  • stream(): Returns a stream associated with the collection for processing
  • parallelStream(): Same as stream method but returns for parallel processing. 
Stream here is the generic type. There are few streams defined for the primitive types as IntStream, DoubleStream, LongStream etc.

How to Use

As mentioned earlier - Streams are like Iterators or more than Iterators. Streams can be used to find an element, get first element, sort the elements etc. Apart from using streams on Collections, they can be used to operate on Paths, files, range of numbers etc. We will see few of them with examples

Streaming Files

BufferedReader class has got a new method lines() to return the lines in that file as Stream. See below
        try (FileReader fr = new FileReader("/tmp/sample.txt");
                BufferedReader br = new BufferedReader(fr))
        {
            br.lines().forEach(System.out::println);
        }
The above code returns the Stream which represents the sequence of lines in the file /tmp/sample.txt and, prints them. To add to it, Stream has got a method forEach to iterate over it. The Files class has also has a method lines() to read the Path as stream.
        try (Stream<String> st = Files.lines(Paths.get("/tmp/sample.txt")))
        {
            st.forEach(System.out::println);
        }

Streaming Patterns

Pattern can also return a Stream object with the matched values. Lets see that
        Pattern p = Pattern.compile("-");
        p.splitAsStream("5-13-93").forEach(System.out::println);
This will split the pattern 5-13-93 into three integers (5,13 and 93) and prints them.

Streaming Range

Stream interface has method to find range of primitive types (Int, Double or Long) as a Stream itself. We can use the respective Stream Interface (IntStream, DoubleStream and LongStream) to get the range associated with it. See - for example
IntStream.range(10, 20).forEach(System.out::println);

Stream Functions

Stream provides various methods to operate on Collections. We will discuss few of them here with examples. Most of the methods of Stream interface take lambda expression as argument (i.e. a method name - See Lambda Expressions for more details).

of:

This is a static method defined in Stream Interface to create a generic Stream. The method exists in other specific Streams like Double, Long etc to create its own type.
Stream<String> stream = Stream.of("Veeresh", "Blog", "Stream")

forEach:

The function will iterate through the Stream. All the above example has got forEach with prints to console

sort:

This method sorts the Stream elements associated with it in natural order (i.e. ascending order). See here
Stream.of("Veeresh", "Blog", "Stream").sorted().forEach(System.out::println);
The above code prints the sorted names. sort() is a very lazy function, means it doesn't effect until you call any other method after/before sort. In the above example - When the sort method was executed nothing changed on the Stream. Once the foreach method was called - the sorting was done.

Apart from these - stream has few other methods which are more powerful and useful which reduces developer effort, Increases readability and reduces LOC. We will discuss those in the next post. For now - this is it.

Happy Learning!!!!

Sunday, October 11, 2015

Default Methods in Java

Default Method is one of the features of Java 8. Default Method is
  • A method - which has a keyword default before it
  • Has to be defined only in Interface
  • Must be implementation. 
  • Can be overridden in Implemented Classes but has to be defined without default keyword.
  • An Interface can have multiple default methods
Default method may break the contract of an Interface because technically interface should consists of all abstract methods and no implementations. The main reason behind the concept of default method is to add steam method to collections (especially to interface) without breaking the implementation classes.

Example

  • As mentioned earlier, default method is a method with implementation in an Interface
public interface AInterface 
{
    public default void print()
    {
        System.out.println("Default Method in Interface A");
    }
}
  • If a class implements it, then it may or may not override the default method along with it's own methods.
public class AImplementation implements AInterface
{
    public void otherMethod()
    {
        System.out.println("New Method Defined in AImplementation");
    }
}
  • Calling a default is same as like other methods as below
AImplementation a = new AImplementation();
a.print();
a.otherMethod();
  • As multiple interfaces can be implemented by one class, but there is a probability of same default method with the same name, defined in both interfaces. Let's say another interface with a default method print like below
public interface BInterface
{
    default void print()
    {
        System.out.println("Default Method in Interface B"); 
    }
}
  • Now, if a class implements both AInterface and BInterface, which has same default method print, then compiler throws an error saying "Duplicate default methods named print with the parameters () and () are inherited from the types BInterface and AInterface". So, print method has to be overridden in the implemented Class like below
public class CImplementation implements AInterface, BInterface
{
    @Override
    public void print()
    {
        System.out.println("Default Method in Interface A");
    }
}

More details on the default methods and its use in Java will be explained in the next post.

Happy Learning!!!!

Sunday, September 27, 2015

Apache Camel-JPA

Camel JPA allows to route the Entities as Messages to read and write using Java Persistence Architecture (JPA). Let's quickly look at the configuration required to make Camel to route the messages (in fact entities) using camel-jpa.

Dependency

Add the following dependency along with camel-core to enable Camel JPA
<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-jpa</artifactId>
	<version>2.15.3</version>
</dependency> 

Component

Create JPA component using the class org.apache.camel.component.jpa.JpaComponent and inject Entity Manager Factory (and Transaction Manager if required)
<bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
	<property name="entityManagerFactory" ref="entityManagerFactory" />
	<property name="transactionManager" ref="transactionManager" />
</bean>

EndPoint

Format of the JPA Endpoint URI to be defined as (Component, Entity Class and Options)
jpa://<fully-classified-entity-class-name?option1=value1&option2=value2.."

Few Notable Parameters 

  • entityType: Default value is Class Name of the entity. The option will override the entity class name.
  • consumeDelete: The table row will be deleted after processing. In order to avoid deleting - set it to false. 
  • consumer.query: Custom query to consume the data from the table. 
  • consumer.namedQuery: Custom named query to consume the data from the table.
  • consumer.nativeQuery: Custom native query to consume the data form the table.
  • consumer.delay: Polling interval in milli-seconds between two polls.

Reading from Endpoint

Selection

By default, JPA will read all the rows in the table. To read specific set of rows in the table - use consumer.query (, consumer.nativeQuery or consumer.namedQuery) to set the selection. 

Preserve Rows

After the row is read from the table, the endpoint deletes the processed row. In order to avoid deleting the row, set the parameter consumeDelete to false. 
If we set this parameter - there is a possibility of reading the same row again because the row still exists on the table. In order to avoid the same row to be picked again - we can update the row by creating a method (in the entity and changing the specific value) by annotating it with @Consumed. 

Sending to Endpoint

Send entity by entity to an endpoint, it writes row by row into the table. We can use aggregator to write a list of entities into the endpoint. In this case, we need to set an extra parameter : entityType as java.util.List to make all the rows persisted to the table. 

Example

Let's read rows from a table and write into the same table by changing few parameters.

Table

mysql> desc camel_test;
+---------+----------+------+-----+---------+-------+
| Field   | Type     | Null | Key | Default | Extra |
+---------+----------+------+-----+---------+-------+
| seq_no  | int(11)  | NO   |     | NULL    |       |
| status  | char(4)  | NO   |     | NULL    |       |
| column1 | char(20) | YES  |     | NULL    |       |
| column2 | char(40) | YES  |     | NULL    |       |
| column3 | int(11)  | YES  |     | NULL    |       |
| column4 | datetime | YES  |     | NULL    |       |
+---------+----------+------+-----+---------+-------+

Configuration

We will see the example using Hibernate and Spring JPA with MYSQL

persitance.xml

<persistence xmlns="http://java.sun.com/xml/ns/persistence" version="1.0">
    <!--Persistence Unit for Mysql database-->
    <persistence-unit name="testMysql" transaction-type="RESOURCE_LOCAL">
        <provider>org.hibernate.ejb.HibernatePersistence</provider>
        <properties>
            <property name="hibernate.dialect" value="org.hibernate.dialect.MySQL5InnoDBDialect"/>
            <property name="hibernate.show_sql" value="true"/>
        </properties>
    </persistence-unit>
</persistence>

Datasource and Entity manager

<bean id="mysqltestDataSource"
	class="org.springframework.jdbc.datasource.DriverManagerDataSource">
	<property name="driverClassName" value="${jdbc.driverClassName}" />
	<property name="url" value="${jdbc.testurl}" />
	<property name="username" value="${jdbc.username}" />
	<property name="password" value="${jdbc.password}" />
</bean>

<context:property-placeholder location="classpath:jdbc.properties" />

<bean id="entityManagerFactory"
	class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
	<property name="dataSource" ref="mysqltestDataSource" />
	<property name="persistenceUnitName" value="testMysql" />
</bean>

Camel JPA

<bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
	<property name="entityManagerFactory" ref="entityManagerFactory" />
</bean>

Entity Class

package com.test.entity;
//Imports here.. 
@Entity
@Table(name = "camel_test")
@NamedQuery(name = "selectQuery", query = "select m from CamelTest m where m.status = 'P'")
public class CamelTest
{
    @Id
    @Column(name="seq_no")
    private Integer seq;
    
    @Column(name="status")
    private String status;
    
    @Column(name="column1")
    private String column1;
    
    @Column(name="column2")
    private String column2;
    
    @Column(name="column3")
    private Integer column3;
    
    @Column(name="column4")
    private Date column4;

    //Getters and Setters here
    
    @Consumed
    public void afterConsume()
    {
        setStatus("D");
    }
}

Points to be noted

  • The class is a JPA entity with one Named Query which will be used by JPA endpoint to read the data from the Table.
    • It reads the data from table whose status is 'P'
  • Only one extra method added which annotated with @Consumed (This will be executed after the entity is processed.
    •  After Camel JPA processing the record, the entity is changing the status to 'D' to avoid it from further reading

Finally Camel Route

<camel:camelContext id="camelContext">
	<camel:route>
		<camel:from uri="jpa://com.test.entity.CamelTest?consumer.namedQuery=selectQuery&amp;consumer.delay=10000&amp;consumeDelete=false" />
		<camel:to uri="direct:sample" />
	</camel:route>
</camel:camelContext>

Points to be noted

  • jpa is the JPA Component of Camel JPA
  • com.test.entity.CamelTest is the entity class created corresponding to camel_test table (as above)
  • Three options are set
    • consumeDelete to false to make the entity preserved after processing
    • consumer.dealy is set to 10 seconds to poll the table for every 10 seconds
    • consumer.namedQuery is set to read a specific set of rows (see NamedQuery annotation on the Entity). 
The above route just reads the rows from the table using the named query (selectQuery) with a polling delay of 10 secs, maps the entities, executes the @Consumed method (which modifies the status column from P to D so that the next poll will not pick the same row)  in the entity class after processing and finally sends the processed message to direct:sample (one by one)

Read in a batch

To read the rows in a batch, use aggregator

Aggreagor

package com.test.camel.aggregator;
//Imports
public class SampleAggregator implements AggregationStrategy
{
    @SuppressWarnings("unchecked")
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
    {
        List<CamelTest> list = null;
        if(oldExchange == null || oldExchange.getIn() == null)
        {
            list = new ArrayList<CamelTest>();
            list.add(newExchange.getIn().getBody(CamelTest.class));
        }
        else
        {
            list = oldExchange.getIn().getBody(List.class);
            list.add(newExchange.getIn().getBody(CamelTest.class));
        }
        newExchange.getIn().setBody(list);
        return newExchange;
    }
}

Modified Route as below with Aggregator

<camel:route>
	<camel:from uri="jpa://com.test.entity.CamelTest?consumer.namedQuery=selectQuery&amp;consumer.delay=10000" />
	<camel:aggregate strategyRef="aggregateStrategy" completionInterval="2000" completionSize="20" >
		<camel:correlationExpression>
			<camel:constant>true</camel:constant>	
		</camel:correlationExpression>
		<camel:to uri="direct:sample" />
	</camel:aggregate>
</camel:route>

Write to Table in Batch

Modify the same route to write a duplicate entry into the table by changing the Sequence Number. See the processor below

Processor

package com.test.camel.processor;
//Imports
public class SampleProcessor implements Processor
{
    public void process(Exchange exchange) throws Exception
    {
        //Processing goes here. Set the output to same as input. (Dummy Processing) 
        List<CamelTest> list = exchange.getIn().getBody(List.class);
        
        for(CamelTest t : list)
        {
            t.setSeq(t.getSeq() + 1000); 
        }
        exchange.getOut().setBody(list);     
    }
}

  • Processor - processor is used here to change the primary key of the entities to write it back (Other we get primary key violation error) - this is just for the example purpose - added 1000 for each sequence number.

Modified Route

<camel:route>
	<camel:from uri="jpa://com.test.entity.CamelTest?consumer.namedQuery=selectQuery&amp;consumer.delay=10000" />
	<camel:aggregate strategyRef="aggregateStrategy" completionInterval="2000" completionSize="20" >
		<camel:correlationExpression>
			<camel:constant>true</camel:constant>	
		</camel:correlationExpression>
		<camel:process ref="processor" />	
		<camel:to uri="jpa://com.test.entity.CamelTest?entityType=java.util.List" />
	</camel:aggregate>
</camel:route>
  • URI on the route (camel:to) has a parameter called entityType as java.util.List to save a list of entities back to table. (See Sending to Endpoint above)
Happy Learning!!!