Sunday, September 7, 2014

Resource Pooling in Java

Resource pooling is the most important when we are dealing with multi-threaded, concurrent applications or multi-tier application which deal with limited resources to effectively manage them. We have APIs for pooling different resources like database connections, messages processing etc. Here, we see very simple object pooling with an example using Apache Common Pooling (ACP).

Context

A multi-threaded application accessing an resource (limited - only 3 in this case). This takes a bit long time to complete the action. I will simulate the case with a thread which takes 8s to execute.
package com.test.thread;

public class MyThread implements Runnable
{
    private boolean running = false;

    public void run()
    {
        int time = 4;   
        while(time > 0)
        {
            try
            {
                Thread.sleep(2000);
            } catch (InterruptedException e)
            {
                e.printStackTrace();
            } 
            time--;
        }
    }

    public boolean isRunning()
    {
        return running;
    }

    public void setRunning(boolean running)
    {
        this.running = running;
    }
}

Without Pooling

Consumer

The consumer uses this resources for completing a task. These are real-time consumers (not countable) which access the resource at any time.
package com.test.thread;

public class Consumer implements Runnable
{

    int i;

    public Consumer(int i)
    {
        this.i = i;
    }
    public void run()
    {
        MyThread m = null;
        try
        {
            m = new MyThread();
            System.out.println("Started " + i);
            Thread t = new Thread(m);
            t.start();
            while (t.isAlive());
            System.out.println("Completed " + i);
        } catch (Exception e)
        {
            System.out.println("Exception while getting the object " + i);
            e.printStackTrace();
        }
    }
}

Application

Assume that at any point of time we have 10 consumers accessing the MyThread. As one consumer is using one MyThread, there will be 10 MyThreads' created.
package com.test.thread.appl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.test.thread.Consumer;
import com.test.thread.MyThread;

public class Main
{

    public static void main(String[] args) throws Exception
    {
        ExecutorService e = Executors.newFixedThreadPool(10);
        List<Future> f = new ArrayList<Future>();
        
        for(int i=0;i<10;i++)
        {
            System.out.println("Getting the object "+i);
            Future a = e.submit(new Consumer(i));
            f.add(a);
        }
        
        label: for(Future s : f)
        {
            if(!s.isDone())
                continue label;
        }
    }
}

With Pooling

This will work as long as we don't have a limit on number of MyThread. Suppose if we limit number of MyThread to 3. Our application may run into problems because of resource shortage. To effectively handle this, we use Resource Pooling. There will be no change in the Consumer except that it gets the object from the pool instead of creating.

Pool Factory

package com.test.thread;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;

public class MyThreadFactory extends BasePooledObjectFactory<MyThread>
{

    @Override
    public MyThread create() throws Exception
    {
        return new MyThread();
    }

    @Override
    public PooledObject<MyThread> wrap(MyThread arg0)
    {
        return new DefaultPooledObject<MyThread>(arg0);
    }
    
    @Override
    public boolean validateObject(PooledObject<MyThread> p)
    {
        return p.getObject().isRunning();
    }
    
    @Override
    public void activateObject(PooledObject<MyThread> p) throws Exception
    {
        p.getObject().setRunning(true);
    }
    
    @Override
    public void passivateObject(PooledObject<MyThread> p) throws Exception
    {
        p.getObject().setRunning(false);
    }

}

Consumer

Now change the consumer to load MyThread from pool.
package com.test.thread;

import org.apache.commons.pool2.impl.GenericObjectPool;

public class Consumer implements Runnable
{

    GenericObjectPool<MyThread> pool;
    int i;

    public Consumer(int i, GenericObjectPool<MyThread> pool)
    {
        this.i = i;
        this.pool = pool;
    }

    public void run()
    {
        MyThread m = null;
        try
        {
            m = pool.borrowObject();
            System.out.println("Started " + i);
            Thread t = new Thread(m);
            t.start();
            while (t.isAlive());
            System.out.println("Completed " + i);
        } catch (Exception e)
        {
            System.out.println("Exception while getting the object " + i);
            e.printStackTrace();
        }
        if (m != null)
            pool.returnObject(m);
    }
}

Points to remember: 

  • We need to get the object from the pool using borrowObject. 
  • When we finished using, it should be returned to the pool using returnObject. Otherwise we may run into resource unavailability. 
  • borrowObject can wait for specific time if the resource is not free. borrowObject(millis)

Application

package com.test.thread.appl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import com.test.thread.Consumer;
import com.test.thread.MyThread;
import com.test.thread.MyThreadFactory;

public class Main
{

    public static void main(String[] args) throws Exception
    {
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxTotal(3);
        config.setBlockWhenExhausted(true);
        config.setMaxWaitMillis(30 * 1000);
        
        MyThreadFactory factory = new MyThreadFactory();
        GenericObjectPool<MyThread> pool = new GenericObjectPool<MyThread>(factory, config);
        
        ExecutorService e = Executors.newFixedThreadPool(10);
        List<Future> f = new ArrayList<Future>();
        
        for(int i=0;i<10;i++)
        {
            System.out.println("Getting the object "+i);
            Future a = e.submit(new Consumer(i,pool));
            f.add(a);
        }
        
        label: for(Future s : f)
        {
            if(!s.isDone())
                continue label;
        }
    }

}

Points to remember

  • Configuration of the pool can be set using GenericObjectPoolConfig (max Total, max time to wait etc.) 
  • Factory is used to retrieve and put back the resource. 
  • activateObject is called once the object is retrieved from the pool 
  • passivateObject reset the object after the object returns to the pool. 
  • Pool validates the object each time before fetching it. 
  • If all the resources are busy, GenericObjectPool waits for maxWaitMillis to get a resource. If not, throws an exception. The pool can be blocked by setting : blockWhenExhausted

Execution

When we run the above example, GenericObjectPool will block after 3 MyThread's are created. Once one of them returns to the object, will be resumed.
Getting the object 0
Getting the object 1
Getting the object 2
Getting the object 3
Getting the object 4
Getting the object 5
Getting the object 6
Getting the object 7
Getting the object 8
Getting the object 9
Started 0
Started 1
Started 2
Completed 0
Started 3
Completed 1
Completed 2
Started 5
Started 4
Exception while getting the object 6
Exception while getting the object 8
Exception while getting the object 7
Exception while getting the object 9
java.util.NoSuchElementException: Timeout waiting for idle object
 .....
Completed 3
Completed 5
Completed 4

Points to remember

When we look at the output, 
  • Only 3 objects were returned from the pool and it's blocked while getting next. 
  • Once one of them is completed, next is started. 
  • If the wait time is completed and no object found, throws an exception NoSuchElementException 
  • Most important point is we should clearly identify the maximum wait time when we are writing
Please read through the javadoc and examples for more details on the official site.

Happy Learning!!!