Issue
I am getting data from a queue server and I need to process it and send an acknowledgement. Something like this:
while (true) {
queueserver.get.data
ThreadPoolExecutor //send data to thread
queueserver.acknowledgement
I don't fully understand what happens in threads but I think this program gets the data, sends it the thread and then immediately acknowledges it. So even if I have a limit of each queue can only have 200 unacknowledged items, it will just pull as fast as it can receive it. This is good when I write a program on a single server, but if I'm using multiple workers then this becomes an issue because the amount of items in the thread queue are not a reflection of the work its done but instead of how fast it can get items from the queue server.
Is there anything I can do to somehow make the program wait if the thread queue is full of work?
Solution
How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?
Instead of an open-ended queue, you can use a BlockingQueue
with a limit on it:
BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200);
In terms of jobs submitted to an ExecutorService
, instead of using the default ExecutorService
s created using Executors
, which use an unbounded queue, you can create your own:
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(200));
Once the queue fills up, it will cause it to reject any new tasks that are submitted. You will need to set a RejectedExecutionHandler
that submits to the queue. Something like:
final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// this will block if the queue is full
executor.getQueue().put(r);
// check afterwards and throw if pool shutdown
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
}
});
I think it's a major miss that Java doesn't have a ThreadPoolExecutor.CallerBlocksPolicy
.
Answered By - Gray
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.