A simple task queue
I'm working on a little sample framework - really only to keep my sanity and practice my chosen craft - that allows you to string together tasks in a pipeline for processing. To exercise that framework - to flush out the pros and cons of the implementation - I'm writing a sample application.
The basic idea is that each task is interested in an event. That event could be the arrival of a file, or the completion of another task etc. To string tasks together, I've created a simple database queue system - as a task completes, it writes to the queue and then other tasks which are interested will see that event and then begin. Tasks complete either successfully (by returning no errors) or unsuccessfully (by returning more than one error).
An important concept is to be able to add (or remove) tasks from the pipeline programmatically and without changing the database schema. So, quite simply, when a task completes, that event is written to the queue - but how do we find which events another task is interested in? And how do we do this in a way that won't take longer over time?
So, lets assume Job2 is interested in the successful completion of Job1. We need to find all successful Job1 events which haven't already been processed by Job2 (successfully or otherwise):
[sourcecode lang='sql']
SELECT * FROM QUEUE where job='job1' and result='SUCCEEDED' and task not in (SELECT task FROM QUEUE where job='job2' and result is not null)
[/sourcecode]
I'm no SQL ninja, so this is my first simple solution. It works, but it doesn't scale. With a thousand rows, it completes pretty quickly, but with tens of thousands of rows it takes way too long (nearly 8 minutes with 40000 rows running on my Dell Inspiron laptop).
Pragmatically, we can restrict the query based on time - we don't need to search the whole table, just the more recent events:
[sourcecode lang='sql']
SELECT * FROM QUEUE where tstamp >= ? and job='job1' and result='SUCCEEDED' and task not in (SELECT task FROM QUEUE where tstamp >= ? and job='job2' and result is not null)
[/sourcecode]
You could easily just look over the events in the last hour if your tasks are short running and you get less than a thousand events an hour - it'd probably perform okay. To give an indication of performance I've written a little groovy script to show some basic trends - included below.
Notice though, that this is a great example of where indexes really help - without the index times get larger as the rowcount gets larger:
Its a pity I have to worry about the time restriction, but it does make the solution workable and at least in my intended application, appropriate. I was hoping to use the SQL MINUS function, but it appears MYSQL doesn't support it.
Here's the groovy script I used to generate the results. The difference between using the time restriction and the index is so dramatic and obvious, it is a great example.
[sourcecode lang='java']
import groovy.sql.Sql
import groovy.grape.Grape
Grape.grab(group:'mysql', module:'mysql-connector-java', version:'5.1.10', classLoader: this.class.classLoader.rootLoader)
enum STATUS { SUCCEEDED, FAILED }
def go() {
def sql = Sql.newInstance("jdbc:mysql://localhost:3306/spike", "spike","password", "com.mysql.jdbc.Driver")
try {
sql.execute("drop table QUEUE")
} catch(Exception e){}
sql.execute("CREATE TABLE QUEUE (id INTEGER NOT NULL,job varchar (20) NOT NULL,task varchar (20) NOT NULL,result VARCHAR (20) NOT NULL, tstamp timestamp, PRIMARY KEY (id)) ENGINE = MyISAM")
sql.execute("create index idx1 on QUEUE(tstamp, job, result)")
(1..10).each() {
def d = createHistory(sql,1000*it)
findJobsToProcess(sql, d)
}
}
def findJobsToProcess(sql, d) {
def rowcount = 0
sql.eachRow("select count(*) from QUEUE") { row ->
rowcount = row[0]
}
long start, end
start = System.currentTimeMillis()
// now find the 'job1' items that haven't been processed by 'job2'
sql.eachRow("SELECT * FROM QUEUE where tstamp >= ? and job='job1' and result=? and task not in (SELECT task FROM QUEUE where tstamp >= ? and job='job2' and result is not null) order by tstamp asc", [d,STATUS.SUCCEEDED.name(),d]) {
// println "${it.id} ${it.job} ${it.task} ${it.result} ${it.tstamp}"
}
end = System.currentTimeMillis()
println "${rowcount} rows took: ${end-start}"
}
def createHistory(sql, count) {
sql.execute("truncate table QUEUE")
int id = 1
int task = 1
// create a "history" of previous jobs
STATUS.each() { status ->
(1..count/2).each() {
sql.execute("insert into QUEUE (id,job,task,result,tstamp) values (?,?,?,?,?)", [id++,'job1',task.toString(),status.name(), new Date()])
sql.execute("insert into QUEUE (id,job,task,result,tstamp) values (?,?,?,?,?)", [id++,'job2',task.toString(),status.name(), new Date()])
task++
}
}
sleep(1000)
def d = new Date()
// insert some successful 'job1' items which 'job2' hasn't processed
(1..5).each() {
sql.execute("insert into QUEUE (id,job,task,result,tstamp) values (?,?,?,?,?)", [id++,'job1',task.toString(),STATUS.SUCCEEDED.name(), new Date()])
task++
}
return d
}
go()
[/sourcecode]
In running this test, I used:
The basic idea is that each task is interested in an event. That event could be the arrival of a file, or the completion of another task etc. To string tasks together, I've created a simple database queue system - as a task completes, it writes to the queue and then other tasks which are interested will see that event and then begin. Tasks complete either successfully (by returning no errors) or unsuccessfully (by returning more than one error).
An important concept is to be able to add (or remove) tasks from the pipeline programmatically and without changing the database schema. So, quite simply, when a task completes, that event is written to the queue - but how do we find which events another task is interested in? And how do we do this in a way that won't take longer over time?
So, lets assume Job2 is interested in the successful completion of Job1. We need to find all successful Job1 events which haven't already been processed by Job2 (successfully or otherwise):
[sourcecode lang='sql']
SELECT * FROM QUEUE where job='job1' and result='SUCCEEDED' and task not in (SELECT task FROM QUEUE where job='job2' and result is not null)
[/sourcecode]
I'm no SQL ninja, so this is my first simple solution. It works, but it doesn't scale. With a thousand rows, it completes pretty quickly, but with tens of thousands of rows it takes way too long (nearly 8 minutes with 40000 rows running on my Dell Inspiron laptop).
Pragmatically, we can restrict the query based on time - we don't need to search the whole table, just the more recent events:
[sourcecode lang='sql']
SELECT * FROM QUEUE where tstamp >= ? and job='job1' and result='SUCCEEDED' and task not in (SELECT task FROM QUEUE where tstamp >= ? and job='job2' and result is not null)
[/sourcecode]
You could easily just look over the events in the last hour if your tasks are short running and you get less than a thousand events an hour - it'd probably perform okay. To give an indication of performance I've written a little groovy script to show some basic trends - included below.
paul@paul-laptop:~$ groovy sqltest
2005 rows took: 4
4005 rows took: 5
6005 rows took: 3
8005 rows took: 3
10005 rows took: 2
12005 rows took: 3
14005 rows took: 3
16005 rows took: 4
18005 rows took: 5
20005 rows took: 3
Notice though, that this is a great example of where indexes really help - without the index times get larger as the rowcount gets larger:
paul@paul-laptop:~$ groovy sqltest
2005 rows took: 13
4005 rows took: 34
6005 rows took: 123
8005 rows took: 134
10005 rows took: 229
12005 rows took: 237
14005 rows took: 330
16005 rows took: 342
18005 rows took: 428
20005 rows took: 442
Its a pity I have to worry about the time restriction, but it does make the solution workable and at least in my intended application, appropriate. I was hoping to use the SQL MINUS function, but it appears MYSQL doesn't support it.
Here's the groovy script I used to generate the results. The difference between using the time restriction and the index is so dramatic and obvious, it is a great example.
[sourcecode lang='java']
import groovy.sql.Sql
import groovy.grape.Grape
Grape.grab(group:'mysql', module:'mysql-connector-java', version:'5.1.10', classLoader: this.class.classLoader.rootLoader)
enum STATUS { SUCCEEDED, FAILED }
def go() {
def sql = Sql.newInstance("jdbc:mysql://localhost:3306/spike", "spike","password", "com.mysql.jdbc.Driver")
try {
sql.execute("drop table QUEUE")
} catch(Exception e){}
sql.execute("CREATE TABLE QUEUE (id INTEGER NOT NULL,job varchar (20) NOT NULL,task varchar (20) NOT NULL,result VARCHAR (20) NOT NULL, tstamp timestamp, PRIMARY KEY (id)) ENGINE = MyISAM")
sql.execute("create index idx1 on QUEUE(tstamp, job, result)")
(1..10).each() {
def d = createHistory(sql,1000*it)
findJobsToProcess(sql, d)
}
}
def findJobsToProcess(sql, d) {
def rowcount = 0
sql.eachRow("select count(*) from QUEUE") { row ->
rowcount = row[0]
}
long start, end
start = System.currentTimeMillis()
// now find the 'job1' items that haven't been processed by 'job2'
sql.eachRow("SELECT * FROM QUEUE where tstamp >= ? and job='job1' and result=? and task not in (SELECT task FROM QUEUE where tstamp >= ? and job='job2' and result is not null) order by tstamp asc", [d,STATUS.SUCCEEDED.name(),d]) {
// println "${it.id} ${it.job} ${it.task} ${it.result} ${it.tstamp}"
}
end = System.currentTimeMillis()
println "${rowcount} rows took: ${end-start}"
}
def createHistory(sql, count) {
sql.execute("truncate table QUEUE")
int id = 1
int task = 1
// create a "history" of previous jobs
STATUS.each() { status ->
(1..count/2).each() {
sql.execute("insert into QUEUE (id,job,task,result,tstamp) values (?,?,?,?,?)", [id++,'job1',task.toString(),status.name(), new Date()])
sql.execute("insert into QUEUE (id,job,task,result,tstamp) values (?,?,?,?,?)", [id++,'job2',task.toString(),status.name(), new Date()])
task++
}
}
sleep(1000)
def d = new Date()
// insert some successful 'job1' items which 'job2' hasn't processed
(1..5).each() {
sql.execute("insert into QUEUE (id,job,task,result,tstamp) values (?,?,?,?,?)", [id++,'job1',task.toString(),STATUS.SUCCEEDED.name(), new Date()])
task++
}
return d
}
go()
[/sourcecode]
In running this test, I used:
- Dell Inspiron 1525, Intel(R) Core(TM)2 Duo Processor T5550, 1.83 GHz, 2MB Cache, 667 MHz FSB, 2GB (2 X 1024MB) 667MHz Dual Channel DDR2 SDRAM, 160GB 7200RPM Performance Hard Drive
- Ubuntu 9.10 32 bit desktop edition
- Groovy Version: 1.6.5
- JVM: 1.6.0_03
- mysql Ver 14.14 Distrib 5.1.37, for debian-linux-gnu (i486)