In such situations, things get slightly more complicated. The main problem is the DefaultShutdownStrategy which when configured for graceful shut-down (this is the default) it will wait for all in-flight messages to finish before shutting down. Why would this be a problem? Well, if a route is trying to stop itself using the Route Controller we defined earlier, the Route Controller will call Stop, however there are still in-flight exchanges (the same exchange that is currently being processed by the Route Controller) which will cause the DefaultShutdownStrategy to wait until the time-out before shutting down the route. Don't get me wrong, the DefaultShutdownStrategy is doing its work correctly, it is the Route Controller which needs to handle itself better.
The Camel documentation suggests that in such situations, you should remove the current exchange from the list of current in-flight messages. This might seem at first glance to have solved the problem, but it will not always work well with all transports. Polling transports like the file transport will still cause the DefaultShutdownStrategy to wait because there is still a polling thread working (The same thread that is trying to do the stopping). Also, even if you wait for the time-out, the file which cause the time-out is not removed since technically the file has not yet been completely processed since the processing was stopped in mid way. In short, I could not see a clean way to do it from within the same thread.
The simplest solution that I found was to call the stop from a different thread. This will allow the ”current” flow to finish normally (the exchange processing is completed) while the stopping is done in a different thread. The simplest way would be to spawn off a new thread to do the stopping, and that is what I have done here.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void performAction(Exchange exchange) throws Exception { | |
//Skipping some lines here... | |
if (routeId.equals(exchange.getFromRouteId())) { | |
// Run the controlling in a separate thread in order for the current | |
// flow to finish | |
final String finalRouteId = routeId; | |
new Thread(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
performAction(finalRouteId, action, context); | |
} catch (Exception e) { | |
// Cannot do much with this exception except log it | |
logger.warn("Exception thrown while attempting to " | |
+ action + " route " + finalRouteId | |
+ " asynchronously.", e); | |
} | |
} | |
}, "RouteControllerSelf").start(); | |
} else { | |
performAction(routeId, action, context); | |
} | |
} |
Spawning off threads like this is not ideal since you could run in situations where you end up spawning threads out of control. A further improvement is to use the ExecutorService to do this for you (Similar to how the DefaultShutdownStrategy gives the task of shutting down to the ExecutorService in order to be able to support time-out).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void performAction(Exchange exchange) throws Exception { | |
//Skipping some lines here... | |
if (routeId.equals(exchange.getFromRouteId())) { | |
// Run the controlling in a separate thread in order for the current | |
// flow to finish | |
final String finalRouteId = routeId; | |
context.getExecutorServiceManager().newSingleThreadExecutor(this, "RouteControllerSelf").execute(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
performAction(finalRouteId, action, context); | |
} catch (Exception e) { | |
// Cannot do much with this exception except log it | |
logger.warn("Exception thrown while attempting to " | |
+ action + " route " + finalRouteId | |
+ " asynchronously.", e); | |
} | |
} | |
}); | |
} else { | |
performAction(routeId, action, context); | |
} | |
} |
After this change, a Route should be able to gracefully stop itself using the Route Controller
No comments:
Post a Comment