Skip to content

Instantly share code, notes, and snippets.

@aronwc
Created July 29, 2013 22:12
Show Gist options
  • Save aronwc/6108360 to your computer and use it in GitHub Desktop.
Save aronwc/6108360 to your computer and use it in GitHub Desktop.
Celery example
''' An example of how to use Celery to manage a mix of serial and parallel
tasks. This depends on a running instance of a rabbitmq messaging server to
keep track of task statuses. This can be launched on our ec2 instance with:
~/rabbitmq/rabbitmq_server-3.1.3/sbin/rabbitmq-server
For this script to work, you first need to run a celery worker process to
await orders:
$ celery -A tasks worker --loglevel=info
Then, you can call any of the functions below (see main for an example).
The worker task will print all the log messages. This is useful for seeing the
order in which tasks are actually executed.
'''
from celery import Celery
from celery import chain, group
'This is the extent of the celery configuration!'
celery = Celery('tasks',
backend='amqp',
broker='amqp://guest@localhost//')
@celery.task
def guidestar(*args, **kwargs):
print 'guidestar', args, kwargs
return True
@celery.task
def lookup_twitter_handle(*args, **kwargs):
print 'lookup_twitter_handle', args, kwargs
return True
@celery.task
def get_twitter_followers(*args, **kwargs):
print 'get_twitter_followers', args, kwargs
return True
@celery.task
def get_tweets(*args, **kwargs):
print 'get_tweets', args, kwargs
return True
@celery.task
def download_charity_homepage(*args, **kwargs):
print 'download_charity_homepage', args, kwargs
return True
@celery.task
def download_charity_news(*args, **kwargs):
print 'download_charity_news', args, kwargs
return True
@celery.task
def process_ein(ein):
'''Add a new charity with the given ein. Launches asynchronous
tasks. Return a GroupResult to monitor the success of each task if
needed.'''
# look up twitter handle then (get followers and get tweets in parallel)
twitter_chain = chain(lookup_twitter_handle.s(),
group([get_twitter_followers.s(),
get_tweets.s()])
)
# do these three in parallel
tasks = group([twitter_chain,
download_charity_news.s(),
download_charity_homepage.s()])
# lookup guidestar info before doing anything else.
return chain(guidestar.s(),
tasks
).apply_async(ein)
if __name__ == '__main__':
print process_ein('123')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment