Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test case for new on_commit hook [see #187] #190

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
12 changes: 12 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,18 @@ with the path to your queue class::

python manage.py rqworker high default low --queue-class 'path.to.CustomQueue'

Queues cleanup
---------------
django_rq provides a management command that flushes the queue specified as argument::

python manage.py rqflush --queue high

If you don't specify any queue it will flush the default

You can suppress confirmation message if you use the option --noinput

python manage.py rqflush --queue high --noinput

Support for RQ Scheduler
------------------------

Expand Down
25 changes: 25 additions & 0 deletions django_rq/management/commands/rqclean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from django.core.management.base import BaseCommand

from django_rq import get_queue


class Command(BaseCommand):
"""
Removes all queue jobs
"""
help = __doc__

def add_arguments(self, parser):
parser.add_argument('--queue', '-q', dest='queue', default='default',
help='Specify the queue [default]')

def handle(self, *args, **options):
"""
Queues the function given with the first argument with the
parameters given with the rest of the argument list.
"""
verbosity = int(options.get('verbosity', 1))
queue = get_queue(options.get('queue'))
queue.empty()
if verbosity:
print('Queue "%s" cleaned' % queue.name)
42 changes: 42 additions & 0 deletions django_rq/management/commands/rqflush.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from django.core.management.base import BaseCommand

from django.utils.six.moves import input

from django_rq import get_queue


class Command(BaseCommand):
"""
Flushes the queue specified as argument
"""
help = __doc__

def add_arguments(self, parser):
parser.add_argument(
'--noinput', '--no-input', action='store_false',
dest='interactive', default=True,
help='Tells Django to NOT prompt the user for input of any kind.',
)
parser.add_argument('--queue', '-q', dest='queue', default='default',
help='Specify the queue [default]')

def handle(self, *args, **options):
verbosity = int(options.get('verbosity', 1))
interactive = options['interactive']
queue = get_queue(options.get('queue'))

if interactive:
confirm = input("""You have requested a flush the "%s" queue.
Are you sure you want to do this?

Type 'yes' to continue, or 'no' to cancel: """ % queue.name)
else:
confirm = 'yes'

if confirm == 'yes':
queue.empty()
if verbosity:
print('Queue "%s" flushed.' % queue.name)
else:
if verbosity:
print("Flush cancelled.")
8 changes: 8 additions & 0 deletions django_rq/management/commands/rqworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys

from django.core.management.base import BaseCommand
from django.db import connections
from django.utils.version import get_version

from django_rq.queues import get_queues
Expand Down Expand Up @@ -34,6 +35,11 @@ def import_attribute(name):
return getattr(module, attribute)


def reset_db_connections():
for c in connections.all():
c.close()


class Command(BaseCommand):
"""
Runs RQ workers on specified queues. Note that all queues passed into a
Expand Down Expand Up @@ -84,6 +90,8 @@ def handle(self, *args, **options):
# Call use_connection to push the redis connection into LocalStack
# without this, jobs using RQ's get_current_job() will fail
use_connection(w.connection)
# Close any opened DB connection before any fork
reset_db_connections()
w.work(burst=options.get('burst', False))
except ConnectionError as e:
print(e)
Expand Down
107 changes: 107 additions & 0 deletions django_rq/tests/tests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import sys

from django.contrib.auth.models import User
from django.core.management import call_command
from django.core.urlresolvers import reverse
from django.db import transaction
from django.test import TestCase
try:
from unittest import skipIf
Expand All @@ -11,6 +14,7 @@
from django.test import override_settings
except ImportError:
from django.test.utils import override_settings
from django.utils.six import StringIO
from django.conf import settings

from rq import get_current_job, Queue
Expand Down Expand Up @@ -78,6 +82,16 @@ def get_queue_index(name='default'):
return queue_index


def stub_stdin(testcase_inst, inputs):
stdin = sys.stdin

def cleanup():
sys.stdin = stdin

testcase_inst.addCleanup(cleanup)
sys.stdin = StringIO(inputs)


@override_settings(RQ={'AUTOCOMMIT': True})
class QueuesTest(TestCase):

Expand Down Expand Up @@ -305,6 +319,32 @@ def test_default_timeout(self):
queue = get_queue('test1')
self.assertEqual(queue._default_timeout, 400)

def test_rqflush_default(self):
queue = get_queue()
queue.enqueue(divide, 42, 1)
call_command("rqflush", "--verbosity", "0", "--noinput")
self.assertFalse(queue.jobs)

def test_rqflush_test3(self):
queue = get_queue("test3")
queue.enqueue(divide, 42, 1)
call_command("rqflush", "--queue", "test3", "--verbosity", "0", "--noinput")
self.assertFalse(queue.jobs)

def test_rqflush_test3_interactive_yes(self):
queue = get_queue("test3")
queue.enqueue(divide, 42, 1)
stub_stdin(self, "yes")
call_command("rqflush", "--queue", "test3", "--verbosity", "0")
self.assertFalse(queue.jobs)

def test_rqflush_test3_interactive_no(self):
queue = get_queue("test3")
queue.enqueue(divide, 42, 1)
stub_stdin(self, "no")
call_command("rqflush", "--queue", "test3", "--verbosity", "0")
self.assertTrue(queue.jobs)


@override_settings(RQ={'AUTOCOMMIT': True})
class DecoratorTest(TestCase):
Expand Down Expand Up @@ -580,6 +620,73 @@ def test_error(self):
self.assertEqual(queue.count, 0)


class ThreadQueueWithTransactionAtomicTest(TestCase):

@override_settings(RQ={'AUTOCOMMIT': True})
def test_enqueue_autocommit_on(self):
"""
Running ``enqueue`` when AUTOCOMMIT is on should
immediately persist job into Redis.
"""
queue = get_queue()
queue.empty()
with transaction.atomic():
job = queue.enqueue(divide, 1, 1)
self.assertTrue(job.id in queue.job_ids)
job.delete()

@override_settings(RQ={'AUTOCOMMIT': False})
def test_enqueue_autocommit_off(self):
"""
Running ``enqueue`` when AUTOCOMMIT is off should
puts the job in the delayed queue but ...
"""
thread_queue.clear()
queue = get_queue()
queue.empty()
with transaction.atomic():
queue.enqueue(divide, 1, 1)

# the following call SHOULDN'T BE necessary
# it should be called by an on_commit hook
# https://docs.djangoproject.com/en/1.10/topics/db/transactions/#django.db.transaction.on_commit
thread_queue.commit()

job = queue.dequeue()
self.assertTrue(job)
self.assertTrue(job.func.func_name, "divide")
job.delete()
self.assertFalse(queue.dequeue())

@override_settings(RQ={'AUTOCOMMIT': False})
def test_enqueue_autocommit_offand_db_error(self):
"""
Running ``enqueue`` when AUTOCOMMIT is off should
puts the job in the delayed queue only if dba transaction succedes
"""
thread_queue.clear()
queue = get_queue()
queue.empty()

try:
with transaction.atomic():
queue.enqueue(divide, 1, 1)
# something went wrong on DB
assert False
except AssertionError:
# the following call SHOULDN'T BE necessary
# but if you don't make it, the final situation would be inconsistent:
# DB transaction has failed but job is enqueued
thread_queue.clear()

# the following call SHOULDN'T BE necessary
thread_queue.commit()

job = queue.dequeue()
self.assertFalse(job)
self.assertFalse(queue.dequeue())


class SchedulerTest(TestCase):

@skipIf(RQ_SCHEDULER_INSTALLED is False, 'RQ Scheduler not installed')
Expand Down
29 changes: 29 additions & 0 deletions integration_test/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
A sample project to test rqworker and site interraction

## Prerequisites

Install PostgreSQL

sudo apt-get install postgresql

Create user and database

sudo -u postgres psql
# drop database djangorqdb;
# drop user djangorqusr;
# create user djangorqusr with createrole superuser password 'djangorqusr';
# create database djangorqdb owner djangorqusr;

Init database schema

./manage.py migrate

Install required packages:

pip install -r requirements.txt

## Test

To run tests:

python _test.py
Loading