Prskavčí blog

Apr 10, 2011 - Comments

PHP a RabbitMQ

V poslední době se objevilo hodně článků o RabbitMQ a připravuje se kniha kde většina příkladů je v PHP. Připravil jsem malou demonstraci jak se message queue dobře využit. RabbitMQ je napsaný v Erlangu podobně jako CouchDB a hodí ke zpracování dávkových úloh. V demonstraci využívám knihovnu wkhtmltopdf která umí zpracovat html stránku na PDF, používá k tomu webkit jádro.

Design

Malý design aplikace jsem zvolil takto:

Kód

Základ aplikace jsou dva úlohy producer a consumer. Nejdříve je potřeba pustit consumer.
error_reporting(E_ALL);
/**
 * Application path
 */
define(‘APPLICATION_PATH’, realpath(dirname(FILE) . ‘/../application’));
/**
 * Application enviroment
 */
define(‘APPLICATION_ENV’, ‘development’);

require_once APPLICATION_PATH . ‘/models/Rabbit.php’; /** * /Ensure library/ is on include_path */ set_include_path(implode(PATH_SEPARATOR, array( realpath(APPLICATION_PATH . ‘/../library’), get_include_path() )));

/** Zend_Application */ require_once ‘Zend/Application.php’;

// Create application, bootstrap, and run $application = new Zend_Application( APPLICATION_ENV, APPLICATION_PATH . ‘/configs/application.ini’ ); $application->getBootstrap()->bootstrap();

$config = new Zend_Config_Ini(APPLICATION_PATH . ‘/configs/application.ini’, APPLICATION_ENV);

$r = new Application_Model_Rabbit($config->rabbitmq); $r->consumer();

Tento script musí běžet na serveru, kde se zpracovává vlastní požadavek. Model potom volá metodu, která se vykoná.
        $conn = new AMQPConnection($this->options[‘host’],
                                   $this->options[‘port’],
                                   $this->options[‘user’],
                                   $this->options[‘pass’],
                                   $this->options[‘vhost’]
        );

    $channel = $conn->channel();
    /**
     * $exchange, $type,$passive=false,$durable=false,$auto_delete=true,
     */
    $channel->exchange_declare(self::EXCHANGE, 'direct', false, true, false);
    $channel->queue_declare(self::QUEUE);
    $channel->queue_bind(self::QUEUE, self::EXCHANGE);

    $consumer = function($msg) {
      $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['delivery_tag']);
      if ($msg->body == 'quit') {
          $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']);
      } else {
          if (!empty($msg->body))  {
              // make PDF
              Application_Model_Wkhtmltopdf::proceed($msg->body, APPLICATION_PATH . '/../output/');
              // notify user
              system("growlnotify -n \"Rabbit demo\" -m \"PDF CREATED\"");
          }
      }
    };

    $channel->basic_consume(self::QUEUE,
                            self::CONSUMER_TAG,
                            false,
                            false,
                            false,
                            false,
                            $consumer);
    while (count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $conn-&gt;close();</pre>

Vlastní aplikace je potom jednoduchá

    public function indexAction()
    {
        $r = new Application_Model_Rabbit($this->_config->rabbitmq);

    $this-&gt;view-&gt;form = $form = new Application_Form_AddUrl();
    // process form
    if ($this-&gt;getRequest()-&gt;isPost()) {
        $formData = $this-&gt;getRequest()-&gt;getParams();
        if ($form-&gt;isValid($formData)) {
            $r-&gt;setUrl($form-&gt;url-&gt;getValue());
            $r-&gt;run();
        } else {
            $form-&gt;populate($formData);
        }
    }

    $this-&gt;view-&gt;url = $r-&gt;getUrl();
}</pre>

a volá se producer, který invokuje RabbitMQ

        $conn = new AMQPConnection($this->options[‘host’],
                                   $this->options[‘port’],
                                   $this->options[‘user’],
                                   $this->options[‘pass’],
                                   $this->options[‘vhost’]
        );
        $channel = $conn->channel();
        $channel->exchange_declare(self::EXCHANGE,
                                   ‘direct’,
                                   false,
                                   true,
                                   false);
        $msg = new AMQPMessage($string, array(‘content-type’ => ‘text/plain’));
        $channel->basic_publish($msg, self::EXCHANGE);
        $channel->close();
        $conn->close();
Demo u mne funguje velmi dobře, nevím jak na jiných platformách, zkoušel jsem to jen na Macu. Na linuxu by to mělo fungovat obdobně. Jen pro webovou aplikaci by bylo vhodné použít jiný systém notifikace pro webovou aplikaci. Nenašel jsem jak například předávat notifikace přes RabbitMQ, ale pokud někdo víte jak to elegatně udělat přidejte to do komentářů.

Veškerý zdrojový kód je dostupný na githubu.

Zvolil jsem jednoduché jednosměrné řešení bez implementace RPC, kde by se dala pro notifikaci použít reply fronta. Ale myslím, že to celkem stačí pro vetšinu dávkových aplikací jako jsou logovaní, upload souborů i generovaní PDF. Nic nebrání nahradit moji notifikaci například posláním linku ke stažení výsledného PDF apod. Příklad by šel pomocí RPC vylepšit na notifikaci přímo v aplikaci. Pro implementaci RPC lze například použít Thumper.