Instal·lar

Instal·larem RabbitMQ, celery i celery_tryton

Rabbitmq

sudo apt-get install rabbitmq-server

Dins del virtualenv:

pip install celery
pip install celery_tryton --no-dependencies // pip install hg+http://hg.b2ck.com/celery-tryton/ (versió trytond 3.8 > celery_tryton 0.3)
pip install flower

Activarem els pluggins:

$ sudo rabbitmq-plugins enable rabbitmq_management

Ubuntu 12.04:

sudo /usr/lib/rabbitmq/lib/rabbitmq_server-2.7.1/sbin/rabbitmq-plugins enable rabbitmq_management

El resultat activació plugins serà:

    The following plugins have been enabled:
    mochiweb
    webmachine
    rabbitmq_web_dispatch
    amqp_client
    rabbitmq_management_agent
    rabbitmq_management

Crearem el directori si no existeix (ubuntu 12)

sudo mkdir /etc/rabbitmq/rabbitmq.conf.d

Copiarem el fitxer de configuració:

sudo cp /usr/share/doc/rabbitmq-server/rabbitmq.config.example.gz /etc/rabbitmq/
sudo gzip -d rabbitmq.config.example.gz
sudo mv rabbitmq.config.example rabbitmq.config
sudo /etc/init.d/rabbitmq-server restart

Canvi nom màquina

Si canviem el nom de la màquina, caldrà reinstal·lar RabbitMQ de nou:

  1. Stop RabbitMQ: rabbitmqctl stop
  2. Change /etc/hosts
  3. Change /etc/hostname
  4. Uninstall old RabbitMQ: dpkg -P rabbitmq-server
  5. Remove RabbitMQ’s database: rm -rf /var/lib/rabbitmq
  6. Find erlang’s process that is running rabbit: ps ax | grep rabbit
  7. Kill the listed process
  8. Reinstall RabbitMQ: apt-get install rabbitmq-server

Celery + Rabbit

Crearem un usuari a RabbitMQ i un virtual host amb accés aquest usuari:

$ sudo rabbitmqctl add_user myuser mypassword
$ sudo rabbitmqctl add_vhost myvhost
$ sudo rabbitmqctl set_user_tags myuser mytag
tags: administrator
$ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

Per veure els valors creats:

$ sudo rabbitmqctl list_users
$ sudo rabbitmqctl list_permissions -p myvhost

Tasks

Crearem un fitxer anomenat tasks.py on agruparem TOTES les nostres tasques. A continuació un exemple:

from celery import Celery
from celery_tryton import TrytonTask
from trytond.pool import Pool

# Triarem una d'aquestes opcions:
celery = Celery('tasks', broker='amqp://guest@localhost//')
celery = Celery('tasks', broker='amqp://guest:guest@localhost:5672//')
celery = Celery('tasks', broker='amqp://guest:guest@vignemalle:5672//')
celery = Celery('tasks', broker='amqp://tryton:tryton@localhost:5672/try32')
celery = Celery('tasks', backend='amqp', broker='amqp://')

celery = Celery()
celery.config_from_object({
    'BROKER_URL': 'amqp://localhost',
    'CELERY_RESULT_BACKEND': 'amqp://',
    'CELERYD_POOL_RESTARTS': True,  # Required for /worker/pool/restart API
})

celery.conf.TRYTON_DATABASE = 'try32bdades'
celery.conf.TRYTON_CONFIG = '/etc/trytond_dev.conf'

@celery.task(base=TrytonTask)
def hello(user_id):
    User = Pool().get('res.user')
    user = User(user_id)
    return 'hello world, %s' % user.name

@celery.task
def add(x, y):
    return x + y

@celery.task(ignore_result=True)
def print_hello():
    print 'hello there'

@celery.task
def gen_prime(x):
    multiples = []
    results = []
    for i in xrange(2, x+1):
        if i not in multiples:
            results.append(i)
            for j in xrange(i*i, x+1, i):
                multiples.append(j)
    return results

Celery Config

Un exemple del fitxer de configuració de celeryconfig.py és:

import os

TRYTON_DATABASE = os.environ.get('TRYTON_DATABASE')
TRYTON_CONFIG = os.environ.get('TRYTON_CONFIG')

# Enable this options to debug. More info in celery page.
CELERY_ALWAYS_EAGER = False
CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
CELERY_TASK_TIME_LIMIT = 7200
CELERY_SOFT_TASK_TIME_LIMIT =  7200
CELERY_MAX_TASKS_PER_CHILD  = 2
CELERY_ACCEPT_CONTENT = ['json']

Arrancar les nostres tasques

celery worker -A tasks &

Podem matar les nostres tasques:

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill

Executar una tasca

Terminal Python

Des de la terminal podem executar les nostres tasques.

El que no farem és cridar directament la funció, doncs no s'executarà en background:

   1 primes = gen_prime(50000)

Per executar-ho en background ho farem amb "delay":

   1 primes = gen_prime.delay(50000)
   2 primes.ready()
   3 print primes.get()

Terminal Shell

$ celery call tasks.hello -b amqp://tryton:tryton@localhost:5672/try32 --config celeryconfig --args=[1]

Control de les tasques

RabbitMQ

Amb rabbitmqctl podem veure el llistat de tasques executades:

sudo rabbitmqctl list_queues -p vhost

Flower

Flower és un client web que ens llistarà totes els nostres workes i tasques que es van executant. També ens ofereix la possibilitat mitjançant API que terceres aplicacions donar resultats:

Instal·lació

pip install flower

Execució

$ flower --port=5555
$ flower --broker=amqp://tryton:tryton@localhost:5672/try32
$ celery -A proj flower
$ celery -A proj flower --port=5555
$ flower -A proj --broker_api=http://username:password@rabbitmq-server-name:15672/api/
$ celery flower --basic_auth=user1:password1,user2:password2

Supervisord

A continuació un exemple de fitxer de supervisor per arrancar un servei Flower:

[program:flower_localhost]
command=/home/resteve/virtualenv/try32galatea/bin/python /home/resteve/virtualenv/try32galatea/bin/flower --broker=amqp://tryton:tryton@localhost:5672/try32
directory=/home/resteve/virtualenv/try32galatea
autostart=true
autorestart=true
stdout_logfile=/var/log/flower/localhost_supervisord.log
redirect_stderr=true

Tryton

Encara que cada mòdul que fa ús de Celery conté un arranc de Celery amb la variable "celery_start", NO ho farem, doncs varis mòduls poden fer ús de Celery i centralitzarem TOTES les tasques en un sol fitxer.

Cada INSTANCIA estarà format per:

En el fitxer de configuració de trytond afegirem els següents valors:

   1 celery_start = False # not start celery by module. Start Celery custom module
   2 celery_user = tryton
   3 celery_password = tryton
   4 celery_host = localhost
   5 celery_port = 5672
   6 celery_vhost = try32
   7 celery_database = nombasedades

Un exemple d'ús de tryton_celery:

   1 from celery import Celery, group
   2 from celery_tryton import TrytonTask
   3 from trytond.pool import Pool
   4 from trytond.transaction import Transaction
   5 
   6 celery = Celery('purchase_requests')
   7 
   8 @celery.task(base=TrytonTask)
   9 def generate(products):
  10     pool = Pool()
  11     User = pool.get('res.user')
  12     PurchaseRequest = pool.get('purchase.request')
  13     Product = pool.get('product.product')
  14     admin, = User.search([
  15             ('login', '=', 'admin'),
  16             ])
  17     with Transaction().set_user(admin.id), \
  18         Transaction().set_context(
  19             User.get_preferences(context_only=True)):
  20         PurchaseRequest.generate_requests(Product.browse(products))

Supervisor

[program:celery_INSTANCIA]
command=/home/INSTANCIA/tryton/bin/celery worker --app=INSTANCIA_tasks --loglevel=info --workdir=/home/INSTANCIA/tryton/src/INSTANCIA --queues=INSTANCIA --time-limit=7400 --concurrency=1 --hostname=INSTANCIA.localhost --pidfile=/var/run/trytond_celery_INSTANCIA.pid
directory=/home/INSTANCIA/tryton/
autostart=true
autorestart=true
stdout_logfile=/var/log/trytond/INSTANCIA_supervisord.log
redirect_stderr=true
environment=TRYTON_DATABASE='INSTANCIA',TRYTON_CONFIG='/etc/trytond/.INSTANCIA.conf',CELERY_USER='INSTANCIA',CELERY_PASSWORD='INSTANCIAc3po',CELERY_HOST='localhost',CELERY_PORT=5672,CELERY_VHOST='INSTANCIA'

Opcions rabbit

    stop [<pid_file>]
    stop_app
    start_app
    wait <pid_file>
    reset
    force_reset
    rotate_logs <suffix>

    join_cluster <clusternode> [--ram]
    cluster_status
    change_cluster_node_type disc | ram
    forget_cluster_node [--offline]
    update_cluster_nodes clusternode
    sync_queue queue
    cancel_sync_queue queue

    add_user <username> <password>
    delete_user <username>
    change_password <username> <newpassword>
    clear_password <username>
    set_user_tags <username> <tag> ...
    list_users

    add_vhost <vhostpath>
    delete_vhost <vhostpath>
    list_vhosts [<vhostinfoitem> ...]
    set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
    clear_permissions [-p <vhostpath>] <username>
    list_permissions [-p <vhostpath>]
    list_user_permissions <username>

    set_parameter [-p <vhostpath>] <component_name> <name> <value>
    clear_parameter [-p <vhostpath>] <component_name> <key>
    list_parameters [-p <vhostpath>]

    set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to>] 
    <name> <pattern>  <definition>
    clear_policy [-p <vhostpath>] <name>
    list_policies [-p <vhostpath>]

    list_queues [-p <vhostpath>] [<queueinfoitem> ...]
    list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
    list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
    list_connections [<connectioninfoitem> ...]
    list_channels [<channelinfoitem> ...]
    list_consumers [-p <vhostpath>]
    status
    environment
    report
    eval <expr>

    close_connection <connectionpid> <explanation>
    trace_on [-p <vhost>]
    trace_off [-p <vhost>]
    set_vm_memory_high_watermark <fraction>

Celery (last edited 2016-01-12 08:42:04 by resteve)

Contenidos creados por el equipo de Zikzakmedia. Creative Commons By-NC-SA

PythonZikzakmedia