In part1 I explained there are two options to have a scheduler in Java: SchedulerExecutorService
and DelayQueue
. I decided to go with the second one even though it is a bit lower level.
The reason is, in SolidInbox the jobs do not always have a fixed interval. The interval might change dynamically during program execution because it sometime can depend on Twitter API's rate limit status.
So, for example when SolidInbox checks user DMs, it needs to check them periodically. Ideally this should be done once every 60-90 seconds, but some times Twitter API may tell the app that "don't query for DMs for this user again for the next 2 minutes". This means that, if using SchedulerExecutorService
, I had to kill a task and re-create it with a new interval setting which was not efficient and flexible. Which is why I decided to go with DelayQueue.
When using DelayQueue
you need a base class to encapsulate tasks which implements Delayed
interface. This is how my implementation looks like for SolidInbox:
public class DelayedTask implements Delayed {
private final long availableAfter;
private final Runnable runnable;
private final int fingerprint;
private final List<Object> tags;
public DelayedTask(Runnable runnable, List<Object> tags, long delayInMilliseconds) {
this.availableAfter = System.currentTimeMillis() + delayInMilliseconds;
this.runnable = runnable;
this.fingerprint = tags.hashCode();
this.tags = tags;
}
private int saturatedCast(long value) {
if ( value > Integer.MAX_VALUE ) {
return Integer.MAX_VALUE;
}
if ( value < Integer.MIN_VALUE ) {
return Integer.MIN_VALUE;
}
return ( int ) value;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = availableAfter - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return saturatedCast(this.availableAfter - (( DelayedTask ) o).availableAfter);
}
//TODO: compare the whole fingerprint?
@Override
public boolean equals(Object o) {
if ( this == o ) return true;
if ( o == null || getClass() != o.getClass() ) return false;
DelayedTask that = ( DelayedTask ) o;
return fingerprint == that.fingerprint;
}
@Override
public int hashCode() {
return fingerprint;
}
public void execute() {
runnable.run();
}
}
Subsequently, I have this field in my TaskQueue
class which stores jobs to be executed in future:
private static final DelayQueue<DelayedTask> internalQueue = new DelayQueue<>();
So, whenever a new job is created, I just push it into the internalQueue
. And by job, I mean an instance of DelayedTask
which has a Runnable
inside. Pushing a new job into the queue is done via offer
method on internalQueue
.
In the next part I will go through the other side of the queue: Polling jobs out from the queue and running them.