Tuesday, 24 January 2012

Controlling Flows in Mule

After we saw how we can control routes in camel, we can now take a look at how to get the same behaviour in Mule. In mule instead of having routes we have flows. One thing to note is that a flow can be started or stopped but there is no notion of suspending and resuming flows. Apart from flows there are the deprecated Services which were use pre Mule 3. We should handle them as well and services do actually support pause and resume.

So what we will do is query the registry to get the service or flow. If it is a service then we can perform all 4 actions. If it is a flow, we can only perform stop and start. If we get a request for a pause, we will log a warning and perform a stop instead, whilst if we get a resume we will log a warning and perform a start.

The code of the FlowController is as follows:

package org.ic.mule.tools.processor.control;
import ...
public class FlowController implements MessageProcessor{
...
@Override
public MuleEvent process(MuleEvent event) throws MuleException {
MuleContext muleContext=event.getMuleContext();
MuleMessage message = event.getMessage();
// Check if action and flow name are set as header
String flowName=message.getInboundProperty(FLOW_NAME,getFlowName());
String action=message.getInboundProperty(ACTION,getAction());
FlowConstruct flow=muleContext.getRegistry().lookupFlowConstruct(flowName);
if(flow == null)
{
throw new DefaultMuleException("No flow found with name "+flowName);
}
String actionTaken=null;
logger.debug("Excecuting " + action + " on " + flowName);
if(flow instanceof Service)
{
Service service=(Service) flow;
actionTaken=performAction(service,action);
}else
{
//It must implement Lifecycle!
Lifecycle lifecycle=(Lifecycle) flow;
actionTaken=performAction(lifecycle,action);
}
message.setOutboundProperty(ACTION_TAKEN, actionTaken);
return event;
}
private String performAction(Lifecycle lifecycle, String action) throws MuleException {
if(action.equals(START))
{
lifecycle.start();
return action;
}else if (action.equals(STOP))
{
lifecycle.stop();
return action;
}else if (action.equals(PAUSE))
{
logger.warn("Stopping instead of Pausing since don't know how to pause "+lifecycle.getClass());
lifecycle.stop();
return STOP;
}else if (action.equals(RESUME))
{
logger.warn("Starting instead of Resuming since don't know how to pause "+lifecycle.getClass());
lifecycle.start();
return START;
}else
{
throw new UnsupportedOperationException("Unknown action " + action
+ ". Action should be STOP, START, PAUSE or RESUME");
}
}
private String performAction(Service service, String action) throws MuleException
{
if(action.equals(START))
{
service.start();
return action;
}else if (action.equals(STOP))
{
service.stop();
return action;
}else if (action.equals(PAUSE))
{
service.pause();
return action;
}else if (action.equals(RESUME))
{
service.resume();
return action;
}else
{
throw new UnsupportedOperationException("Unknown action " + action
+ ". Action should be STOP, START, PAUSE or RESUME");
}
}
...
}


One thing to note is that we are using the Lifecycle interface in order to control the flow. This actually can allow us to generalise our FlowController to start/stop not only flows and services but any component in the registry which implements the Lifecycle interface. This means that you will be able to control Connectors as well for example. For this we will make a minor change as follows:

Lifecycle lifecycle=muleContext.getRegistry().lookupObject(flowName);
if(lifecycle == null)
{
throw new DefaultMuleException("No flow found with name "+flowName);
}
String actionTaken=null;
logger.debug("Excecuting " + action + " on " + flowName);
if(lifecycle instanceof Service)
{
Service service=(Service) lifecycle;
actionTaken=performAction(service,action);
}else
{
actionTaken=performAction(lifecycle,action);
}
view raw gistfile1.java hosted with ❤ by GitHub


Even though now our FlowController can do more than just control Flows and Services, I will still leave it called the FlowController since that is its main use, LifecycleController does not really cut it in my opinion ;).

As always you can browse the FlowController code and for usage examples you can look at the test implementation and test configuration.

No comments:

Post a Comment