Tuesday, 31 January 2012

Rethrowing Caught Exceptions in Camel

Compared to the RouteController, this post will be much simpler. I had a number of situations where I wanted to perform some processing when an exception is thrown, however I did not want to simply catch the exception and "gobble it up". I wanted to catch the exception, do some processing and then re-throwing the exception so that the the exceptional flow continues. One example of such use was that when a specific exception is thrown, I wanted to stop the current route, give a job to quartz to restart this route withing 15minutes and then re-throw the exception so that the exceptional logging and flow continues (for example, rolling back the transaction).

For this we made use of the RethrowException bean which has a very simple implementation. A single void method which takes an Exchange as a parameter. The only slightly interesting thing about this post is how we get our hands on the Exception that Camel caught for us. That is quite straightforward because luckily for us, Camel sets the caught exception as a Property on the exchange with the key CamelExceptionCaught. So the code for the RethrowingException bean is:

package org.ic.camel.tools.bean.exception;
import org.apache.camel.Exchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RethrowException {
Logger logger=LoggerFactory.getLogger(RethrowException.class);
public void processExchange(Exchange exchange) throws Throwable
{
Throwable caughtException=(Throwable) exchange.getProperty(Exchange.EXCEPTION_CAUGHT);
if(caughtException!=null)
{
logger.debug("Rethrowing Exception "+caughtException.getClass());
throw caughtException;
}
}
}


Tuesday, 24 January 2012

Controlling Flows in Mule

After we saw how we can control routes in camel, we can now take a look at how to get the same behaviour in Mule. In mule instead of having routes we have flows. One thing to note is that a flow can be started or stopped but there is no notion of suspending and resuming flows. Apart from flows there are the deprecated Services which were use pre Mule 3. We should handle them as well and services do actually support pause and resume.

So what we will do is query the registry to get the service or flow. If it is a service then we can perform all 4 actions. If it is a flow, we can only perform stop and start. If we get a request for a pause, we will log a warning and perform a stop instead, whilst if we get a resume we will log a warning and perform a start.

The code of the FlowController is as follows:

package org.ic.mule.tools.processor.control;
import ...
public class FlowController implements MessageProcessor{
...
@Override
public MuleEvent process(MuleEvent event) throws MuleException {
MuleContext muleContext=event.getMuleContext();
MuleMessage message = event.getMessage();
// Check if action and flow name are set as header
String flowName=message.getInboundProperty(FLOW_NAME,getFlowName());
String action=message.getInboundProperty(ACTION,getAction());
FlowConstruct flow=muleContext.getRegistry().lookupFlowConstruct(flowName);
if(flow == null)
{
throw new DefaultMuleException("No flow found with name "+flowName);
}
String actionTaken=null;
logger.debug("Excecuting " + action + " on " + flowName);
if(flow instanceof Service)
{
Service service=(Service) flow;
actionTaken=performAction(service,action);
}else
{
//It must implement Lifecycle!
Lifecycle lifecycle=(Lifecycle) flow;
actionTaken=performAction(lifecycle,action);
}
message.setOutboundProperty(ACTION_TAKEN, actionTaken);
return event;
}
private String performAction(Lifecycle lifecycle, String action) throws MuleException {
if(action.equals(START))
{
lifecycle.start();
return action;
}else if (action.equals(STOP))
{
lifecycle.stop();
return action;
}else if (action.equals(PAUSE))
{
logger.warn("Stopping instead of Pausing since don't know how to pause "+lifecycle.getClass());
lifecycle.stop();
return STOP;
}else if (action.equals(RESUME))
{
logger.warn("Starting instead of Resuming since don't know how to pause "+lifecycle.getClass());
lifecycle.start();
return START;
}else
{
throw new UnsupportedOperationException("Unknown action " + action
+ ". Action should be STOP, START, PAUSE or RESUME");
}
}
private String performAction(Service service, String action) throws MuleException
{
if(action.equals(START))
{
service.start();
return action;
}else if (action.equals(STOP))
{
service.stop();
return action;
}else if (action.equals(PAUSE))
{
service.pause();
return action;
}else if (action.equals(RESUME))
{
service.resume();
return action;
}else
{
throw new UnsupportedOperationException("Unknown action " + action
+ ". Action should be STOP, START, PAUSE or RESUME");
}
}
...
}


One thing to note is that we are using the Lifecycle interface in order to control the flow. This actually can allow us to generalise our FlowController to start/stop not only flows and services but any component in the registry which implements the Lifecycle interface. This means that you will be able to control Connectors as well for example. For this we will make a minor change as follows:

Lifecycle lifecycle=muleContext.getRegistry().lookupObject(flowName);
if(lifecycle == null)
{
throw new DefaultMuleException("No flow found with name "+flowName);
}
String actionTaken=null;
logger.debug("Excecuting " + action + " on " + flowName);
if(lifecycle instanceof Service)
{
Service service=(Service) lifecycle;
actionTaken=performAction(service,action);
}else
{
actionTaken=performAction(lifecycle,action);
}
view raw gistfile1.java hosted with ❤ by GitHub


Even though now our FlowController can do more than just control Flows and Services, I will still leave it called the FlowController since that is its main use, LifecycleController does not really cut it in my opinion ;).

As always you can browse the FlowController code and for usage examples you can look at the test implementation and test configuration.

Tuesday, 17 January 2012

Routes Stopping and Suspending themselves in Camel

As we seen in the post "Controlling Routes" in Camel it is quite straightforward to control other routes, but what about controlling ”yourself”? What if you want to stop yourself?

In such situations, things get slightly more complicated. The main problem is the DefaultShutdownStrategy which when configured for graceful shut-down (this is the default) it will wait for all in-flight messages to finish before shutting down. Why would this be a problem? Well, if a route is trying to stop itself using the Route Controller we defined earlier, the Route Controller will call Stop, however there are still in-flight exchanges (the same exchange that is currently being processed by the Route Controller) which will cause the DefaultShutdownStrategy to wait until the time-out before shutting down the route. Don't get me wrong, the DefaultShutdownStrategy is doing its work correctly, it is the Route Controller which needs to handle itself better.

The Camel documentation suggests that in such situations, you should remove the current exchange from the list of current in-flight messages. This might seem at first glance to have solved the problem, but it will not always work well with all transports. Polling transports like the file transport will still cause the DefaultShutdownStrategy to wait because there is still a polling thread working (The same thread that is trying to do the stopping). Also, even if you wait for the time-out, the file which cause the time-out is not removed since technically the file has not yet been completely processed since the processing was stopped in mid way. In short, I could not see a clean way to do it from within the same thread.

The simplest solution that I found was to call the stop from a different thread. This will allow the ”current” flow to finish normally (the exchange processing is completed) while the stopping is done in a different thread. The simplest way would be to spawn off a new thread to do the stopping, and that is what I have done here.

public void performAction(Exchange exchange) throws Exception {
//Skipping some lines here...
if (routeId.equals(exchange.getFromRouteId())) {
// Run the controlling in a separate thread in order for the current
// flow to finish
final String finalRouteId = routeId;
new Thread(new Runnable() {
@Override
public void run() {
try {
performAction(finalRouteId, action, context);
} catch (Exception e) {
// Cannot do much with this exception except log it
logger.warn("Exception thrown while attempting to "
+ action + " route " + finalRouteId
+ " asynchronously.", e);
}
}
}, "RouteControllerSelf").start();
} else {
performAction(routeId, action, context);
}
}
view raw gistfile1.java hosted with ❤ by GitHub


Spawning off threads like this is not ideal since you could run in situations where you end up spawning threads out of control. A further improvement is to use the ExecutorService to do this for you (Similar to how the DefaultShutdownStrategy gives the task of shutting down to the ExecutorService in order to be able to support time-out).

public void performAction(Exchange exchange) throws Exception {
//Skipping some lines here...
if (routeId.equals(exchange.getFromRouteId())) {
// Run the controlling in a separate thread in order for the current
// flow to finish
final String finalRouteId = routeId;
context.getExecutorServiceManager().newSingleThreadExecutor(this, "RouteControllerSelf").execute(new Runnable() {
@Override
public void run() {
try {
performAction(finalRouteId, action, context);
} catch (Exception e) {
// Cannot do much with this exception except log it
logger.warn("Exception thrown while attempting to "
+ action + " route " + finalRouteId
+ " asynchronously.", e);
}
}
});
} else {
performAction(routeId, action, context);
}
}
view raw gistfile1.java hosted with ❤ by GitHub


After this change, a Route should be able to gracefully stop itself using the Route Controller

Tuesday, 10 January 2012

Controlling Routes in Camel

I have had the need to be able to control routes from different routes quite a number of times (both in Mule and Camel). Typically this is useful if you would like to be able to remotely control your routes using any of the available transports. Yes, some might say that you have JMX available to do this and even if your administrators do not want to open up any new ports on the test environment you can always overload a port already being used by the ESB and proxy for example the MX4J interface, however that is a different story.

But why not control your routes using the same way as you interact with your application. If you are sending JMS messages to your application, why not have a queue which receives control messages. Or if you are using web services, why not expose a web service which will allow you to control the routes. This is one beauty of an ESB, you have so much flexibility when it comes to transports to use.

So what I did for camel was create a simple bean, which will perform the control action (START, STOP, RESUME, SUSPEND) on a specific route depending on the Route Id.

package org.ic.camel.tools.bean.control;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.ic.camel.tools.IntegrationCocktailConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RouteController {
public static final String START = "START";
public static final String STOP = "STOP";
public static final String SUSPEND = "SUSPEND";
public static final String RESUME = "RESUME";
public static final String PREFIX = IntegrationCocktailConstants.PREFIX
+ ".RouteController";
public static final String ROUTE_ID = PREFIX + ".RouteId";
public static final String ACTION_ID = PREFIX + ".ActionId";
private String action = null;
private String routeId = null;
Logger logger = LoggerFactory.getLogger(RouteController.class);
public void performAction(Exchange exchange) throws Exception {
CamelContext context = exchange.getContext();
// Check if action and method are set as headers
String routeId = (String) exchange.getIn().getHeader(ROUTE_ID,
getRouteId());
String action = (String) exchange.getIn().getHeader(ACTION_ID,
getAction());
performAction(routeId, action, context);
}
private void performAction(String routeId, String action,
CamelContext context) throws Exception {
logger.debug("Excecuting " + action + " on " + routeId);
if (action.equals(STOP)) {
context.stopRoute(routeId);
} else if (action.equals(START)) {
context.startRoute(routeId);
} else if (action.equals(SUSPEND)) {
context.suspendRoute(routeId);
} else if (action.equals(RESUME)) {
context.resumeRoute(routeId);
} else {
throw new UnsupportedOperationException("Unknown action " + action
+ ". Action should be STOP, START, SUSPEND or RESUME");
}
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getRouteId() {
return routeId;
}
public void setRouteId(String routeId) {
this.routeId = routeId;
}
}

The bean has two attributes, routeId and action which can be set to give the bean a default value and a method which takes an Exchange as parameter which is the entry point. From the exchange we attempt to get headers "IntegrationCocktail.RouteController.RouteId" and "IntegrationCocktail.RouteController.Action". If these headers are not set, we use the defaults set directly on the bean. Finally, depending on the action we simply call context.startRoute/stopRoute/resumeRoute/suspendRoute(routeId).

To use this bean, you can choose to either give it static behaviour by setting the attributes on the bean, or dynamically by setting the appropriate message headers. The source code is availale on the integration-cocktail github repository and I created a camel-tools project where I will be putting such tit bits. For examples for how to use this bean you can look at the tests.

Finally, a question that inquisitive readers might ask is, will it be possible to stop or suspend the current route? Can you use this bean to for example stop the current route when an exception is thrown? Technically, it will work but not elegantly since by default the DefaultShutdownStrategy kicks in to enforce a graceful shutdown which will wait for all in flight exchanges to finish. However, the current exchange which initiated the stop request is still in flight so you will end up waiting until the timeout. We will polish the behaviour of the RouteController for such situations in another post.

Until then, feel free to to suggest other possible ”tools” for both Camel and Mule either by commenting on the post or at integrationcocktail@google.com.

Tuesday, 3 January 2012

Testing that Service methods are Secured

Recently I had to quickly secure the service layer of a web application I was working on. I resorted to using AspectJ in order not to have my security related code scattered across the service layer and to also not to touch the service layer code since it was being used in multiple places. Furthermore, Spring was not being used, cause otherwise probably I would have used Spring Security and their annotations. Another benefit that using AspectJ gave us was that we could weave in the security aspects whenever needed. However, this is not the topic of this blog. Since I was the person who wrote the point cuts and the aspects however was not the only one working on the service layer, I wanted to make sure that nobody would add a new method which did not match any of the point cuts and thus is not advised with the security aspects. To make things slightly more complicated, not every method needed to be secured. This approach also helped me while I was defining the point cuts since it automatically identified entry points that have not yet been secured.

What I did was created an abstract test which given a set of interfaces and the service object, will call all the methods on this service object, and fail if any method does not throw a security exception or throws some other exception. I also added the facility to pass a string array of methods that should be ignored. So all I had to do was to extend this test for every service. I still needed to test each service separately to make sure that the methods could be invoked with the right credentials but the abstract test would ensure that all the methods are secured.

If any new method is added which is not captured by any of the security point cuts, then this test will fail and will also identify the method that is not secured.

You can find the code on github here. What I created to illustrate this here is a simple Spring project with a simple service which I accidentally forgot to secure one method. In this project I am using spring security and annotations for ease. The core of the implementation is the AbtractSecurity test.
package org.ic.service;
import static org.junit.Assert.assertEquals;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import org.junit.Test;
import org.springframework.security.authentication.AuthenticationCredentialsNotFoundException;
public abstract class AbstractSecurityTest {
@Test
/**
* This is the test method which I will be extending and which
* will check that all the methods in the interfaces are secured
*/
public void testAllMethodsInInterface() throws Throwable {
// Get all the methods from the interface classes
ArrayList<Method> methods = new ArrayList<Method>();
for (Class<?> clazz : getInterfaceClasses()) {
methods.addAll(Arrays.asList(clazz.getMethods()));
}
// Define an list which will contain the method names which are not
// secured
ArrayList<String> errorMethodNames = new ArrayList<String>();
// Loop and invoke all the methods
for (Method method : methods) {
// If the method is not one of the ignored methods, invoke
if (!isMethodIgnored(method.getName())) {
// Get parameters and create a new ObjectArray which will
// contain the parameters we are passing
Class<?>[] parameters = method.getParameterTypes();
Object[] params = new Object[parameters.length];
// Loop through the parameters in order to initialise certain
// types, especially non object types like long and int as you
// see below. Please note that this is NOT an exhaustive list
for (int i = 0; i < parameters.length; i++) {
if (parameters[i].equals(long.class)) {
params[i] = 0L;
} else if (parameters[i].equals(int.class)) {
params[i] = 0;
}
}
// try invoking the method. If the method is successfull, it
// means that it is NOT secured, so add the method to the
// errorMethodNames.
try {
System.out.println("Invoking :" + method.toGenericString());
method.invoke(getService(), params);
errorMethodNames.add(method.toGenericString());
} catch (InvocationTargetException e) {
// the more common scenario is that an exception is thrown.
// If it is an AuthenticationCredentialsNotFoundException,
// that is exactly what we are looking for, so that means
// that the method is secured and we are getting the
// exception from the security aspect. If we get any other
// exception, it means that the method is not secure. In my
// case I am re-throwing an exception, however you could
// also add the method name to the errorMethodNames. It is
// normal that if a method that is not secured, an exception
// is thrown since the parameters that we are passing are
// mostly null or invalid.
if (!(e.getTargetException() instanceof AuthenticationCredentialsNotFoundException)) {
throw new Exception("The method " + method.getName()
+ " is not secured and a "
+ e.getTargetException().getClass().getName()
+ " was thrown!", e.getTargetException());
}
}
}
}
// Logging the error method names for convenince.
System.out.println("The following public methods are NOT adviced: ");
for (String s : errorMethodNames) {
System.out.println(" " + s);
}
// If there are errorMethodNames, then fail.
assertEquals(
"The following public methods are NOT adviced: "
+ Arrays.toString(errorMethodNames.toArray()), 0,
errorMethodNames.size());
}
/**
* Utility method to check if the method name is contained
*/
public boolean isMethodIgnored(String methodName) {
for (String s : getIgnoredMethods()) {
if (s.equals(methodName)) {
return true;
}
}
return false;
}
/**
* Method to be overloaded if you want certain methods to be ignored
*/
public String[] getIgnoredMethods() {
return new String[0];
}
/**
* Abstract method which returns the interfaces that we want to test since a
* service might implement more than one
*/
public abstract Class<?>[] getInterfaceClasses();
/**
* Abstract method which returns the service object on which we will invoke
* the methods.
*/
public abstract Object getService();
}


Then we have a simple test that was implemented which extends the AbstractSecurity test and implements the two required methods, getService which returns the service to be tested and getInterfaceClasses which returns an array of classes.
package org.ic.service;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:/META-INF/spring/applicationContext*.xml")
public class CocktailServiceTest extends AbstractSecurityTest {
@Autowired
CocktailService service;
@Override
public Class<?>[] getInterfaceClasses() {
return new Class<?>[]{CocktailService.class};
}
@Override
public Object getService() {
return service;
}
}


This might seem like a simple approach, and well, it is and there are a number of ways we could improve on this. For example try to move away from test extension but somehow do this declaratively.

In short, whenever you are developing, think of ways that tests can help you. One should look at tests as tools which empower the developer and aid the developer and not some pesky thing you have to do in order to increase the code coverage report on Jenkins!