223 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
			
		
		
	
	
			223 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
<?php
 | 
						|
/* -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
 | 
						|
/*
 | 
						|
# ***** BEGIN LICENSE BLOCK *****
 | 
						|
# This file is part of InDefero, an open source project management application.
 | 
						|
# Copyright (C) 2008-2011 Céondo Ltd and contributors.
 | 
						|
#
 | 
						|
# InDefero is free software; you can redistribute it and/or modify
 | 
						|
# it under the terms of the GNU General Public License as published by
 | 
						|
# the Free Software Foundation; either version 2 of the License, or
 | 
						|
# (at your option) any later version.
 | 
						|
#
 | 
						|
# InDefero is distributed in the hope that it will be useful,
 | 
						|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
n# GNU General Public License for more details.
 | 
						|
#
 | 
						|
# You should have received a copy of the GNU General Public License
 | 
						|
# along with this program; if not, write to the Free Software
 | 
						|
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 | 
						|
#
 | 
						|
# ***** END LICENSE BLOCK ***** */
 | 
						|
 | 
						|
 | 
						|
/**
 | 
						|
 * Queue system for the management of asynchronous operations.
 | 
						|
 *
 | 
						|
 * Anybody can add an item to the queue and any application can
 | 
						|
 * register itself to process an item from the queue.
 | 
						|
 *
 | 
						|
 * An item in the queue is considered as fully processed when all the
 | 
						|
 * handlers have processed it successfully.
 | 
						|
 *
 | 
						|
 * To push a new item in the queue:
 | 
						|
 *
 | 
						|
 * <code>
 | 
						|
 * $item = new IDF_Queue();
 | 
						|
 * $item->type = 'new_commit';
 | 
						|
 * $item->payload = array('what', 'ever', array('data'));
 | 
						|
 * $item->create();
 | 
						|
 * </code>
 | 
						|
 *
 | 
						|
 * To process one item from the queue, you first need to register an
 | 
						|
 * handler, by adding the following in your relations.php file before
 | 
						|
 * the return statement or in your config file.
 | 
						|
 *
 | 
						|
 * <code>
 | 
						|
 * Pluf_Signal::connect('IDF_Queue::processItem', 
 | 
						|
 *                       array('YourApp_Class', 'processItem'));
 | 
						|
 * </code>
 | 
						|
 *
 | 
						|
 * The processItem method will be called with two arguments, the first
 | 
						|
 * is the name of the signal ('IDF_Queue::processItem') and the second
 | 
						|
 * is an array with:
 | 
						|
 *
 | 
						|
 * <code>
 | 
						|
 * array('item' => $item,
 | 
						|
 *       'res' => array('OtherApp_Class::handler' => false,
 | 
						|
 *                      'FooApp_Class::processItem' => true));
 | 
						|
 * </code>
 | 
						|
 *
 | 
						|
 * When you process an item, you need first to check if the type is
 | 
						|
 * corresponding to what you want to work with, then you need to check
 | 
						|
 * in 'res' if you have not already processed successfully the item,
 | 
						|
 * that is the key 'YourApp_Class::processItem' must be set to true,
 | 
						|
 * and then you can process the item. At the end of your processing,
 | 
						|
 * you need to modify by reference the 'res' key to add your status.
 | 
						|
 *
 | 
						|
 * All the data except for the type is in the payload, this makes the
 | 
						|
 * queue flexible to manage many different kind of tasks.
 | 
						|
 *
 | 
						|
 */
 | 
						|
class IDF_Queue extends Pluf_Model
 | 
						|
{
 | 
						|
    public $_model = __CLASS__;
 | 
						|
 | 
						|
    function init()
 | 
						|
    {
 | 
						|
        $this->_a['table'] = 'idf_queue';
 | 
						|
        $this->_a['model'] = __CLASS__;
 | 
						|
        $this->_a['cols'] = array(
 | 
						|
                             // It is mandatory to have an "id" column.
 | 
						|
                            'id' =>
 | 
						|
                            array(
 | 
						|
                                  'type' => 'Pluf_DB_Field_Sequence',
 | 
						|
                                  'blank' => true, 
 | 
						|
                                  ),
 | 
						|
                            'status' => 
 | 
						|
                            array(
 | 
						|
                                  'type' => 'Pluf_DB_Field_Integer',
 | 
						|
                                  'blank' => false,
 | 
						|
                                  'choices' => array(
 | 
						|
                                                     'pending' => 0,
 | 
						|
                                                     'in_progress' => 1,
 | 
						|
                                                     'need_retry' => 2,
 | 
						|
                                                     'done' => 3,
 | 
						|
                                                     'error' => 4,
 | 
						|
                                                     ),
 | 
						|
                                  'default' => 0,
 | 
						|
                                  ),
 | 
						|
                            'trials' =>
 | 
						|
                            array(
 | 
						|
                                  'type' => 'Pluf_DB_Field_Integer',
 | 
						|
                                  'default' => 0,
 | 
						|
                                  ),
 | 
						|
                            'type' =>
 | 
						|
                            array(
 | 
						|
                                  'type' => 'Pluf_DB_Field_Varchar',
 | 
						|
                                  'blank' => false,
 | 
						|
                                  'size' => 50,
 | 
						|
                                  ),
 | 
						|
                            'payload' =>
 | 
						|
                            array(
 | 
						|
                                  'type' => 'Pluf_DB_Field_Serialized',
 | 
						|
                                  'blank' => false,
 | 
						|
                                  ),
 | 
						|
                            'results' =>
 | 
						|
                            array(
 | 
						|
                                  'type' => 'Pluf_DB_Field_Serialized',
 | 
						|
                                  'blank' => false,
 | 
						|
                                  ),
 | 
						|
                            'lasttry_dtime' =>
 | 
						|
                            array(
 | 
						|
                                  'type' => 'Pluf_DB_Field_Datetime',
 | 
						|
                                  'blank' => true,
 | 
						|
                                  ),
 | 
						|
                            'creation_dtime' =>
 | 
						|
                            array(
 | 
						|
                                  'type' => 'Pluf_DB_Field_Datetime',
 | 
						|
                                  'blank' => true,
 | 
						|
                                  ),
 | 
						|
                            );
 | 
						|
    }
 | 
						|
 | 
						|
    function preSave($create=false)
 | 
						|
    {
 | 
						|
        if ($create) {
 | 
						|
            $this->creation_dtime = gmdate('Y-m-d H:i:s');
 | 
						|
            $this->lasttry_dtime = gmdate('Y-m-d H:i:s');
 | 
						|
            $this->results = array();
 | 
						|
            $this->trials = 0;
 | 
						|
            $this->status = 0;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * The current item is going to be processed.
 | 
						|
     */
 | 
						|
    function processItem()
 | 
						|
    {
 | 
						|
        /**
 | 
						|
         * [signal]
 | 
						|
         *
 | 
						|
         * IDF_Queue::processItem
 | 
						|
         *
 | 
						|
         * [sender]
 | 
						|
         *
 | 
						|
         * IDF_Queue
 | 
						|
         *
 | 
						|
         * [description]
 | 
						|
         *
 | 
						|
         * This signal allows an application to run an asynchronous
 | 
						|
         * job. The handler gets the queue item and the results from
 | 
						|
         * the previous run. If the handler key is not set, then the
 | 
						|
         * job was not run. If set it can be either true (already done)
 | 
						|
         * or false (error at last run).
 | 
						|
         *
 | 
						|
         * [parameters]
 | 
						|
         *
 | 
						|
         * array('item' => $item, 'res' => $res)
 | 
						|
         *
 | 
						|
         */
 | 
						|
        $params = array('item' => $this, 'res' => $this->results);
 | 
						|
        Pluf_Signal::send('IDF_Queue::processItem',
 | 
						|
                          'IDF_Queue', $params);
 | 
						|
        $this->status = 3; // Success
 | 
						|
        foreach ($params['res'] as $handler=>$ok) {
 | 
						|
            if (!$ok) {
 | 
						|
                $this->status = 2; // Set to need retry
 | 
						|
                $this->trials += 1;
 | 
						|
                break;
 | 
						|
            }
 | 
						|
        }
 | 
						|
        $this->results = $params['res'];
 | 
						|
        $this->lasttry_dtime = gmdate('Y-m-d H:i:s');
 | 
						|
        $this->update();
 | 
						|
    }
 | 
						|
 | 
						|
    /** 
 | 
						|
     * Parse the queue.
 | 
						|
     *
 | 
						|
     * It is a signal handler to just hook itself at the right time in
 | 
						|
     * the cron job performing the maintainance work.
 | 
						|
     *
 | 
						|
     * The processing relies on the fact that no other processing jobs
 | 
						|
     * must run at the same time. That is, your cron job must use a
 | 
						|
     * lock file or something like to not run in parallel.
 | 
						|
     *
 | 
						|
     * The processing is simple, first get 500 queue items, mark them
 | 
						|
     * as being processed and for each of them call the processItem()
 | 
						|
     * method which will trigger another event for processing.
 | 
						|
     *
 | 
						|
     * If you are processing more than 500 items per batch, you need
 | 
						|
     * to switch to a different solution.
 | 
						|
     *
 | 
						|
     */
 | 
						|
    public static function process($sender, &$params)
 | 
						|
    {
 | 
						|
        $where = 'status=0 OR status=2';
 | 
						|
        $items = Pluf::factory('IDF_Queue')->getList(array('filter'=>$where,
 | 
						|
                                                           'nb'=> 500));
 | 
						|
        Pluf_Log::event(array('IDF_Queue::process', $items->count()));
 | 
						|
        foreach ($items as $item) {
 | 
						|
            $item->status = 1;
 | 
						|
            $item->update();
 | 
						|
        }
 | 
						|
        foreach ($items as $item) {
 | 
						|
            $item->status = 1;
 | 
						|
            $item->processItem();
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 |