import time import psycopg2 import psycopg2.extensions import db def get_new_job_count(connection): """ get_new_job_count(connection) -> int Returns the number of new jobs found in the jobs table. New jobs have a status of 'REQUESTED' """ cursor = connection.cursor() cursor.execute("BEGIN TRANSACTION") cursor.execute(""" SELECT count(status) FROM jobs WHERE status = 'REQUESTED' """ ) count = cursor.fetchall()[0][0] cursor.execute("COMMIT") return count def get_new_jobs(connection): """ get_new_jobs(connection) -> [(id, status), ....., (id_n, status_n)] Returns new jobs found in jobs table. This will return a max of 10 jobs at a time. """ jobs = [] try: cursor = connection.cursor() cursor.execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE") cursor.execute(""" UPDATE jobs SET status = 'PROCESSING' WHERE id IN( SELECT id FROM jobs WHERE status = 'REQUESTED' LIMIT 10 ) RETURNING * """ ) jobs = cursor.fetchall() cursor.execute("COMMIT") except psycopg2.extensions.TransactionRollbackError, e: # Another client had the job table locked. Just continue. We will have # to retry the call to get_new_jobs again. cursor.execute("COMMIT") return jobs def mark_complete(connection, jobs): """ mark_complete(connection, jobs) -> None Sets the status flag for each passed in job to COMPLETE. """ cursor = connection.cursor() sql = """UPDATE jobs SET status='COMPLETE' WHERE id=%(id)s""" update_params = [{'id': job[0]} for job in jobs] cursor.execute("BEGIN TRANSACTION") cursor.executemany(sql, update_params) cursor.execute("COMMIT TRANSACTION") connection = db.connect_db() my_job_ids = [] collisions = 0 while 1: # First see if there are any new jobs entered into the queue count = get_new_job_count(connection) if not count: if len(my_job_ids): print "Job queue empty. I processed these jobs:" my_job_ids.sort() print my_job_ids print "I processed %d jobs" % (len(my_job_ids)) print "I had %d collisions" % (collisions) my_job_ids = [] collisions = 0 time.sleep(5) continue else: print "%d jobs remaining in queue" % (count) jobs = get_new_jobs(connection) if not len(jobs): print "None found, or my jobs were marked by another client" collisions += 1 else: # you would do any processing needed for these jobs here. my_job_ids.extend([j[0] for j in jobs]) #time.sleep(3) # Lets pretend it takes about 3 secs to process jobs mark_complete(connection, jobs) time.sleep(1)