From 2f22d48dd0a046956b20763722c3bb0ea3dcf85d Mon Sep 17 00:00:00 2001 From: Loic d'Anterroches Date: Thu, 6 May 2010 10:27:08 +0200 Subject: [PATCH] Added the queue system to handle the webhooks and asynchronous events. --- src/IDF/Migrations/14Queue.php | 53 ++++++++++ src/IDF/Migrations/Backup.php | 2 + src/IDF/Migrations/Install.php | 2 + src/IDF/Queue.php | 186 +++++++++++++++++++++++++++++++++ 4 files changed, 243 insertions(+) create mode 100644 src/IDF/Migrations/14Queue.php create mode 100644 src/IDF/Queue.php diff --git a/src/IDF/Migrations/14Queue.php b/src/IDF/Migrations/14Queue.php new file mode 100644 index 0000000..24904a9 --- /dev/null +++ b/src/IDF/Migrations/14Queue.php @@ -0,0 +1,53 @@ +model = new $model(); + $schema->createTables(); + } +} + +function IDF_Migrations_14Queue_down($params=null) +{ + $models = array( + 'IDF_Queue', + ); + $db = Pluf::db(); + $schema = new Pluf_DB_Schema($db); + foreach ($models as $model) { + $schema->model = new $model(); + $schema->dropTables(); + } +} \ No newline at end of file diff --git a/src/IDF/Migrations/Backup.php b/src/IDF/Migrations/Backup.php index 7b6af71..a90d433 100644 --- a/src/IDF/Migrations/Backup.php +++ b/src/IDF/Migrations/Backup.php @@ -51,6 +51,7 @@ function IDF_Migrations_Backup_run($folder, $name=null) 'IDF_Review_FileComment', 'IDF_Key', 'IDF_Scm_Cache_Git', + 'IDF_Queue', ); $db = Pluf::db(); // Now, for each table, we dump the content in json, this is a @@ -94,6 +95,7 @@ function IDF_Migrations_Backup_restore($folder, $name) 'IDF_Review_FileComment', 'IDF_Key', 'IDF_Scm_Cache_Git', + 'IDF_Queue', ); $db = Pluf::db(); $schema = new Pluf_DB_Schema($db); diff --git a/src/IDF/Migrations/Install.php b/src/IDF/Migrations/Install.php index 28dd004..a3d42dd 100644 --- a/src/IDF/Migrations/Install.php +++ b/src/IDF/Migrations/Install.php @@ -48,6 +48,7 @@ function IDF_Migrations_Install_setup($params=null) 'IDF_Review_FileComment', 'IDF_Key', 'IDF_Scm_Cache_Git', + 'IDF_Queue', ); $db = Pluf::db(); $schema = new Pluf_DB_Schema($db); @@ -85,6 +86,7 @@ function IDF_Migrations_Install_teardown($params=null) $perm = Pluf_Permission::getFromString('IDF.project-authorized-user'); if ($perm) $perm->delete(); $models = array( + 'IDF_Queue', 'IDF_Scm_Cache_Git', 'IDF_Key', 'IDF_Review_FileComment', diff --git a/src/IDF/Queue.php b/src/IDF/Queue.php new file mode 100644 index 0000000..19242dc --- /dev/null +++ b/src/IDF/Queue.php @@ -0,0 +1,186 @@ + + * $item = new IDF_Queue(); + * $item->type = 'new_commit'; + * $item->payload = array('what', 'ever', array('data')); + * $item->create(); + * + * + * To process one item from the queue, you first need to register an + * handler, by adding the following in your relations.php file before + * the return statement or in your config file. + * + * + * Pluf_Signal::connect('IDF_Queue::processItem', + * array('YourApp_Class', 'processItem')); + * + * + * The processItem method will be called with two arguments, the first + * is the name of the signal ('IDF_Queue::processItem') and the second + * is an array with: + * + * + * array('item' => $item, + * 'res' => array('OtherApp_Class::handler' => false, + * 'FooApp_Class::processItem' => true)); + * + * + * When you process an item, you need first to check if the type is + * corresponding to what you want to work with, then you need to check + * in 'res' if you have not already processed successfully the item, + * that is the key 'YourApp_Class::processItem' must be set to true, + * and then you can process the item. At the end of your processing, + * you need to modify by reference the 'res' key to add your status. + * + * All the data except for the type is in the payload, this makes the + * queue flexible to manage many different kind of tasks. + * + */ +class IDF_Queue extends Pluf_Model +{ + public $_model = __CLASS__; + + function init() + { + $this->_a['table'] = 'idf_queue'; + $this->_a['model'] = __CLASS__; + $this->_a['cols'] = array( + // It is mandatory to have an "id" column. + 'id' => + array( + 'type' => 'Pluf_DB_Field_Sequence', + 'blank' => true, + ), + 'status' => + array( + 'type' => 'Pluf_DB_Field_Integer', + 'blank' => false, + 'choices' => array( + 'pending' => 0, + 'in_progress' => 1, + 'need_retry' => 2, + 'done' => 3, + 'error' => 4, + ), + 'default' => 0, + ), + 'trials' => + array( + 'type' => 'Pluf_DB_Field_Integer', + 'default' => 0, + ), + 'type' => + array( + 'type' => 'Pluf_DB_Field_Varchar', + 'blank' => false, + 'size' => 50, + ), + 'payload' => + array( + 'type' => 'Pluf_DB_Field_Serialized', + 'blank' => false, + ), + 'results' => + array( + 'type' => 'Pluf_DB_Field_Serialized', + 'blank' => false, + ), + 'lasttry_dtime' => + array( + 'type' => 'Pluf_DB_Field_Datetime', + 'blank' => true, + ), + 'creation_dtime' => + array( + 'type' => 'Pluf_DB_Field_Datetime', + 'blank' => true, + ), + ); + } + + function preSave($create=false) + { + if ($create) { + $this->creation_dtime = gmdate('Y-m-d H:i:s'); + $this->lasttry_dtime = gmdate('Y-m-d H:i:s'); + $this->results = array(); + } + } + + /** + * The current item is going to be processed. + */ + function processItem() + { + /** + * [signal] + * + * IDF_Queue::processItem + * + * [sender] + * + * IDF_Queue + * + * [description] + * + * This signal allows an application to run an asynchronous + * job. The handler gets the queue item and the results from + * the previous run. If the handler key is not set, then the + * job was not run. If set it can be either true (already done) + * or false (error at last run). + * + * [parameters] + * + * array('item' => $item, 'res' => $res) + * + */ + $params = array('item' => $this, 'res' => $this->results); + Pluf_Signal::send('IDF_Queue::processItem', + 'IDF_Queue', $params); + $this->status = 3; // Success + foreach ($params['res'] as $handler=>$ok) { + if (!$ok) { + $this->status = 2; // Set to need retry + $this->trials += 1; + break; + } + } + $this->results = $params['res']; + $this->lasttry_dtime = gmdate('Y-m-d H:i:s'); + $this->update(); + } +}