原文出处:http://www.lornajane.net/posts/2014/working-with-php-and-beanstalkd
I have just introduced Beanstalkd into my current PHP project; it was super-easy so I thought I‘d share some examples and my thoughts on how a job queue fits in with a PHP web application.
I have an API backend and a web frontend on this project (there may be apps later. It‘s a startup, there could be anything later). Both front and back ends are PHP Slim Framework applications, and there‘s a sort of JSON-RPC going on in between the two.
The job queue will handle a few things we don‘t want to do in real time on the application, such as:
There are two ends to this process, let‘s start by adding jobs to the queue. Anything you don‘t want to make a user wait for is a good candidate for a job. As I mentioned, some of our jobs get handled periodically with cron creating jobs, but since they are just beanstalkd jobs I can easily give an admin interface to trigger them manually also. In this case, I‘m just making a job to process things we update when a user makes a comment.
A good job is very self-contained; a bit like a stateless web request it should contain anything that is needed to process it and not rely on anything that went before. On a live platform you would typically have many workers all consuming jobs from a single queue so there are no guarantees that one job will be completed before the next one begins to be processed! You can put any data you like into a job; you could send all the data fields to fill in and send an email template for example.
In this example I need to talk to the database anyway so I‘m just storing information about which task should be done and including the comment ID with it.
I‘m using an excellent library called Pheanstalk which is well-documented and available via Composer. The lines I added to my composer.json:
"require": { "pda/pheanstalk": "2.1.0", }
I start by creating an object which connects to the job server and allows me to put jobs on the queue:
new Pheanstalk_Pheanstalk( $config[‘beanstalkd‘][‘host‘] . ":" . $config[‘beanstalkd‘][‘port‘] )
The config settings there will change between platforms but for my development version of this project, beanstalkd is just running on my laptop so my settings are the defaults:
[beanstalkd] host=127.0.0.1 port=11300
Once you have the object created, $queue
in my
example, we can easily add jobs with the put()
command -
but first you specify which "tube" to use. The tubes would be queues in another
tool, just a way of putting jobs into different areas, and it is possible to ask
the workers to listen on specific tubes so you can have specialised workers if
needed. Beanstalkd also supports adding jobs with different priorities.
Here‘s adding the simple job to the queue; the data is just a string so I‘m
using json_encode
to wrap up a couple of fields:
$job = array("action" => "comment_added", "data" => array("comment_id" => $comment_id)); $queue->useTube(‘mytube‘)->put(json_encode($job));
I wrote a bit in a previous post about how to check the current number of jobs on beanstalkd, so you can use those instructions to check that you have jobs stacking up. To use those, we‘ll need to write a worker.
The main application and the worker scripts don‘t need to be in the same technology stack since beanstalkd is very lightweight and technology agnostic. I‘m working with an entirely PHP team though so both the application and the workers are PHP in this instance. The workers are simply command-line PHP scripts that run for a long time, picking up jobs when they become available.
For my workers I have added the Pheanstalk libraries via Composer again and then my basic worker script looks like this:
require("vendor/autoload.php"); $queue = new Pheanstalk_Pheanstalk($config[‘beanstalkd‘][‘host‘] . ":" . $config[‘beanstalkd‘][‘port‘]); $worker = new Worker($config); // Set which queues to bind to $queue->watch("mytube"); // pick a job and process it while($job = $queue->reserve()) { $received = json_decode($job->getData(), true); $action = $received[‘action‘]; if(isset($received[‘data‘])) { $data = $received[‘data‘]; } else { $data = array(); } echo "Received a $action (" . current($data) . ") ..."; if(method_exists($worker, $action)) { $outcome = $worker->$action($data); // how did it go? if($outcome) { echo "done \n"; $queue->delete($job); } else { echo "failed \n"; $queue->bury($job); } } else { echo "action not found\n"; $queue->bury($job); } }
Here you can see the Pheanstalk object again, but this time we use some different commands:
reserve()
picks up a job from the queue and marks it as
reserved so that no other workers will pick it updelete()
removes the job from the queue when it has been
successfully completedbury()
marks the job as terminally failed and no workers
will restart it.The other alternative outcome is to return without a specific status - this will cause the job to be retried again later.
Once one job has been processed, the worker will pick up another, and so on. With multiple workers running, they will all just pick up jobs in turn until the queue is empty again.
The Worker class really doesn‘t have much that is beanstalkd-specific. The constructor connects to MySQL and also instantiates a Guzzle client which is used to hit the backend API of the application for the tasks where all the application framework and config is really needed to perform the task - we create endpoints for those and the worker has an access token so it can make the requests. Here‘s a snippet from the Worker class:
class Worker { protected $config; protected $db; protected $client; public function __construct($config) { $this->config = $config; // connect to mysql $dsn = ‘mysql:host=‘ . $config[‘db‘][‘host‘] . ‘;dbname=‘ . $config[‘db‘][‘database‘]; $username = $config[‘db‘][‘username‘]; $password = $config[‘db‘][‘password‘]; $this->db = new \PDO($dsn, $username, $password, array(PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES utf8")); $this->client = new \Guzzle\Http\Client($config[‘api‘][‘url‘]); } public function comment_added($data) { $comment_sql = "select * from comments where comment_id = :comment_id"; $comment_stmt = $this->db->prepare($comment_sql); $comment_stmt->execute(array("comment_id" => $data[‘comment_id‘])); $comment = $comment_stmt->fetch(PDO::FETCH_ASSOC); if($comment) { // more SQL to update various counts } return true; }
There are various different tasks here that call out to either our own API backend, or to MySQL as shown here, or to something else.
Working with workers leads me to often do either one of these:
Beanstalkd doesn‘t really have access control so you will want to lock down what can talk to your server on the port it listens on. It‘s a deliberately lightweight protocol and I like it, but do double check that it isn‘t open to the internet or something!
Long-running PHP scripts aren‘t the most robust thing in the world. I recommend running then under the tender loving care of supervisord (which I wrote about previously) - this has the added advantage of a really easy way to restart your workers and good logging. You should probably also include a lot more error handling than I have in the scripts here; I abbreviated to keep things readable.
What did I miss? If you‘re working with Beanstalkd and PHP and there‘s something I should have mentioned, please share it in the comments. This was my first beanstalkd implementation but I think it‘s the first of many - it was super-easy to get started!
working-with-php-and-beanstalkd,布布扣,bubuko.com
working-with-php-and-beanstalkd
原文地址:http://www.cnblogs.com/argb/p/3729087.html