码迷,mamicode.com
首页 > 其他好文 > 详细

daemon框架

时间:2015-07-26 22:33:41      阅读:395      评论:0      收藏:0      [点我收藏+]

标签:

这个框架主要是开多个子进程来异步处理任务,同时主进程能和子进程通信,了解子进程的status,并且能restart子进程。

管理者可以了解主进程的信息info,并且能shutdown主进程。管理者和主进程之间通过tcp通信。

这个过程涉及主进程Master,子进程Worker,队列Queue,以及消费者Consumer。

 

这几天学习框架,一方面要学会如何组装零件,也要回拆解零件,更加要学会改进零件。

问问自己

(1)       框架的数据流是否梳理清楚

(2)       能否改进框架

(3)       如果框架有bug,能否将其修复

(4)       这个框架有什么优缺点,为什么要如此设计

(5)       如果重写这个框架,你是否能重写好这个框架。

 

有些事情,出现得刚刚好。

刚好最近在研究框架之类的东西,先看了一个MVC框架类的简单东西。很久没有刷微博,偶尔刷微博的时候,突然发现了一个项目框架作者分享的daemon处理框架,跟项目的框架非常类似,但是更加简洁。

技术分享

于是屁颠屁颠将代码git clone下来,并且在机器上部署了ubantu的虚拟机。

--运行daemon程序

--从queue中pop出data,然后输出这个data

---使用phpredis的扩展

---在Redis中写队列的相关扩展

---使用mysqlDB,能够访问db的资源

---为一个工程配置各种资源

---想想如何将工程逻辑单独提取出来,再单独建立一个App的文件夹,然后在App中写好run的相关逻辑,在配置指定好run函数所在脚本。

---主进程和子进程如何通信的,子进程如何循环重启的?---代码中有自动重启的部分,如果运行超过一定时间,子进程会给主进程发送restart的命令,主进程会将子进程的worker从workerclients中去除。主进程在轮询的时候,遍历workerclients,发现worker不存在了,会重新再创建。

 

---管理进程与主进程保持通信,直接redis-cli –h 127.0.0.1 –p 9008就可以看主进程的运行状态。

 

1. 感性认识

配置运行环境

安装xmapp

安装redis

配置phpredis

 

运行代码

https://github.com/pythias/Stark

php src/Stark/run.php -f [ini_config_file]

第一步自己写了一个测试,将数据打印出来,之后逐步修改代码,queue->pop出数据,consumer->run处理数据。

在run.php中get_options返回queue的相关参数,这样可以创建队列实例,并且实现pop函数。这样可以通过queue->pop()从队列中获取到相关数据,并且采用实例化的consumer对象调用run函数来处理data。

<?php
...
function get_options($file) { .... ‘queue‘ => array( ‘class‘ => ‘\\Stark\\Daemon\\Queue\\RedisQueue‘, ‘options‘ => array( ‘host‘ => get_option_value($config, ‘queue.host‘, ‘127.0.0.1‘), ‘port‘ => get_option_value($config, ‘queue.port‘, 6379), ‘queueKey‘ => get_option_value($config, ‘queue.queueKey‘, ""), ), ), ....

RedisQueue其实是使用redis来作为队列,进行数据的pop。

<?php
namespace Stark\Daemon\Queue;

class RedisQueue extends Base {
    protected $_serverConfigs = false;
    protected $_queueKey = false;
    protected $_host = false;
    protected $_port = false;
    private $_redis = null;

    public function init(\Stark\Daemon\Worker $worker) {
        //连接服务器
        $this->_redis = new \Redis();
        $this->_redis->connect($this->_host, $this->_port);
        $this->_redis->rpush($this->_queueKey, "you are a beautiful girl!");
    }

    public function pop(\Stark\Daemon\Worker $worker) {
          $data = $this->_redis->lpop($this->_queueKey);
          return $data;
    }

    public function push(\Stark\Daemon\Worker $worker, $data) {

          echo "push data - {$this->_queueKey} - {$data}\r\n";
          return $this->_redis->rpush($this->_queueKey, $data);
    }


    public function complete(\Stark\Daemon\Worker $worker) {
        //关闭链接
        $this->_redis->close();
    }

}

2. 入口文件run.php

$options = get_options($config_file);

$daemon = new \Stark\Daemon\Master($options);

$daemon->start();

(1)getOptions解析配置文件,获取Consumer,Queue,Master的参数。

配置文件config.ini如下

[main]
name = "config"
host = "127.0.0.1"
port = 9008
working_dir = "/tmp"

[run]
script_file = "run.php"
memory_limit = "1024M"

[worker]
count = 4
max_run_count = 10000
max_run_seconds = 3600
max_idle_seconds = 60

[queue]
host = "127.0.0.1"
port = "6380"
queueKey = "test:redis"

解析出来的参数如下:

Options = array(
        ‘consumer‘ => array(
            ‘class‘ => ‘\\Stark\\Daemon\\Consumer\\Callback‘,
            ‘options‘ => array(
                ‘init‘ => ‘init‘,
                ‘run‘ => ‘run‘,
                ‘complete‘ => ‘complete‘,
            ),
        ),

        ‘queue‘ => array(
            ‘class‘ => ‘\\Stark\\Daemon\\Queue\\RedisQueue‘,
            ‘options‘ => array(
                ‘host‘ => get_option_value($config, ‘queue.host‘, ‘127.0.0.1‘),
                ‘port‘ => get_option_value($config, ‘queue.port‘, 6379),
                ‘queueKey‘ => get_option_value($config, ‘queue.queueKey‘, ""),

            ),
        ),
        ‘master‘ => array(
            ‘name‘ => get_option_value($config, ‘main.name‘, ‘Stark_‘ . time()),
            ‘host‘ => get_option_value($config, ‘main.host‘, ‘127.0.0.1‘),
            ‘port‘ => get_option_value($config, ‘main.port‘, 9003),
            ‘maxWorkerCount‘ => get_option_value($config, ‘worker.count‘, 1),
            ‘maxRunCount‘ => get_option_value($config, ‘worker.max_run_count‘, 10000),
            ‘maxRunSeconds‘ => get_option_value($config, ‘worker.max_run_seconds‘, 3600),
            ‘maxIdleSeconds‘ => get_option_value($config, ‘worker.max_idle_seconds‘, 60),
            ‘memoryLimit‘ => get_option_value($config, ‘run.memory_limit‘, ‘1024M‘),
        ),
);

(2)创建Master实例,并根据返回options创建consumer,worker,queue实例,并且初始化master的相应选项。

Master继承Options类,所以创建Master实例的时候,会执行Options的构造函数。

<?php
namespace Stark\Core;

class Options {
    public function __construct($options = array()) {
        $this->setOptions($options);
    }

    protected function setOptions($options) {
        if (is_array($options) == false && empty($options)) {
            return;
        }

        foreach ($options as $key => $value) {
            $methodName = ‘_set‘ . ucfirst($key) . ‘Options‘;
            if (method_exists($this, $methodName)) {
                if (call_user_func_array(array($this, $methodName), array($value)) == false) {
                    throw new \Stark\Daemon\Exception\Options("Set option failed, option:{$key}");
                } else {
                    continue;
                }
            }

            $property = "_{$key}";
            if (isset($this->$property) == false) throw new \Stark\Daemon\Exception\Options("Set option failed, option:{$key}");
            $this->$property = $value; //TODO:value type
        }

        return true;
    }
}

?>

Options相当于一个基类。可以用来创建类实例,同时初始化实例成员。依据上面options数组的KEY(master,consumer,queue等),如果存在_set[KEY]Options 这个函数,就执行该函数,如果成员变量存在相应的key,就初始化该成员。

 protected function _setMasterOptions($options) {
        return $this->setOptions($options);
    }

    protected function _setQueueOptions($options) {
        return $this->_setClassOptionsByType($options, ‘Queue‘);
    }

    protected function _setConsumerOptions($options) {
        return $this->_setClassOptionsByType($options, ‘Consumer‘);
    }

    private function _setClassOptionsByType($options, $type) {
        if (empty($options[‘class‘])) {
            return false;
        }

        $className = $options[‘class‘];        
        if ($className[0] != ‘\\‘) {
            $className = "\\Stark\\Daemon\\{$type}\\{$className}";
        }

        if (class_exists($className) == false) {
            return false;
        }
        
        $property = ‘_‘ . lcfirst($type);
        $this->$property = new $className(isset($options[‘options‘]) ? $options[‘options‘] : array());

        return true;
    }

上述的代码的功能主要是创建Master的_consumer(Consumer类型),_queue(Queue类型)实例,然后初始化Master的name,host,port以及maxWorkerCount,maxRunCount,maxRunSeconds,maxIdelSeconds,memoryLimit成员变量。

(3)启动主进程

主进程启动函数,主要负责检查运行环境(php版本)是否满足要求checkEnvironments;并初始化工程目录initialize;判断daemon程序是否一直在运行(若在运行,则终止);在后台运行主进程runInBackground,并且设置其跑满所有的核setAffinity;然后是创建主进程和子进程的本地通信socket连接(createDaemonSocket),管理进程和主进程的网络通信socket连接(createAdminSocket)。之后创建多个workers,并且启动多个worker。

startLoop轮询去检查多个worker的状态,接受管理进程的请求(info,shutdown),接受子进程的请求(restart,status)。

以下是Master::start的函数:

    public function start() {
        ini_set(‘memory_limit‘, $this->_memoryLimit);
        $this->_checkEnviroments();
        $this->_initialize();

        $this->_daemonIsRunning();

        \Stark\Core\System::runInBackground();
        $this->_pid = posix_getpid();
        $this->_createPidFile();
        \Stark\Core\System::setAffinity($this->_pid);
        \Stark\Core\System::setProcTitle($this->_pid, "daemon ‘{$this->_name}‘");

        //sample worker
        $this->_createSampleWorker();

        //start daemon
        $this->_startTime = microtime(true);
        $this->_setupSignal();
        $this->_createDaemonSocket();
        $this->_createAdminSocket();
        $this->_startWorkers();
        $this->_startLoop();
    }

创建多个wokers子进程,并且启动各个子进程,各个子进程进行相应的初始化,各项轮询,最后进行关闭整个daemon。至于子进程是如何启动的,后续再进行讲述。

private function _createWorker($index) {
        echo "Start worker {$index}\r\n";
        $currentMicroTime = microtime(true);

        if (isset($this->workerStatuses[$index]) == false) {
            $this->workerStatuses[$index] = new \Stark\Daemon\Status(); 
        }

        if (($currentMicroTime - $this->workerStatuses[$index]->startTime) < self::WORKER_START_INTERVAL_SECONDES) {
            $this->_log->log("Worker {$index} cannt start right now", \Stark\Core\Log\Level::ERROR);
            return false;
        }
        
        $this->workerStatuses[$index]->startTime = $currentMicroTime;
        $this->workerStatuses[$index]->lastActiveTime = $currentMicroTime;
        
        $forkPid = pcntl_fork();

        if ($forkPid == -1) {
            $this->_exit("Unable to fork worker {$index}");
        }
        
        if ($forkPid) {
            $this->workerStatuses[$index]->pid = $forkPid;
        } else {
            socket_close($this->_daemonSocket);
            socket_close($this->_adminSocket);

            $pid = posix_getpid();
            \Stark\Core\System::setAffinity($pid, ‘2-32‘);
            \Stark\Core\System::setProcTitle($pid, "daemon ‘{$this->_name}‘ worker {$index}");

            $this->workerStatuses[$index]->totalCpuU += $this->workerStatuses[$index]->cpuU;
            $this->workerStatuses[$index]->totalCpuS += $this->workerStatuses[$index]->cpuS;

            $this->_worker->pid = $pid; 
            $this->_worker->index = $index;
            $this->_worker->start();
        }
    }

Master是主进程,Worker是子进程,Master和Worker之间是主进程和子进程间的本地通信,而Admin和Master之间是两台机器之间的网络TCP通信。

这个是管理进程和主进程进行通信。

 private function _startLoop() {
        $this->_log->log("Starting daemon: {$this->_name}", \Stark\Core\Log\Level::INFO);

        $time = microtime(true);
        $lastWorkerTime = $time;
        $lastAdminTime = $time;
        
        while (true) {
            $now = microtime(true);

            if (($now - $lastAdminTime) > self::ADMIN_DATA_INTERVAL_SECONDES) {
                $this->_acceptConnection($this->_adminSocket, $this->_adminClients);
                $this->_checkAdminCommands();

                $lastAdminTime = $now;
            }

            $this->_acceptConnection($this->_daemonSocket, $this->_workerClients);

            if (($now - $lastWorkerTime) > $this->_heartbeat) {
                $this->_sendHeartbeatToWorkers();

                $missingWorkerList = $this->_checkWorkerStatus();

                if (empty($missingWorkerList) === false) {
                    foreach ($missingWorkerList as $index) {
                        $this->_createWorker($index);    
                    }
                }

                $lastWorkerTime = $now;
            }

            usleep(1000);
        }
    }

主进程和子进程如何通信的,子进程如何循环重启的?

    private function _acceptConnection($socket, &$clients) {
        $newClient = @socket_accept($socket);

        if ($newClient) {
            socket_set_nonblock($newClient);
            socket_getsockname($newClient, $ip, $port);
            $clients[] = array(
                ‘client‘ => $newClient,
                ‘ip‘ => $ip,
                ‘port‘ => $port,
                ‘index‘ => -1,
            );
        }
    }

 

    private function _checkAdminCommands() {
        $offlineClients = array();

        foreach ($this->_adminClients as $key => $clientInfo) {
            $client = $clientInfo[‘client‘];
            $responseValue = Protocol::read($client); // 从客户端client读取到命令

            if (empty($responseValue)) {
                if (is_resource($client)) {
                    $errorCode = socket_last_error($client);
                    
                    if ($errorCode != SOCKET_EAGAIN) {
                        $offlineClients[] = $key;
                    }
                } else {
                    $offlineClients[] = $key;
                }
            } else {
                $this->_responseCommand($client, $responseValue, self::COMMAND_FROM_ADMIN); // 解析管理进程发出的命令
            }
        }
        
        foreach ($offlineClients as $key) {
            if (is_resource($this->_adminClients[$key])) {
                socket_close($this->_adminClients[$key]);
            }

            unset($this->_adminClients[$key]);
        }

        return true;
    }

 info命令,应该是主进程从管理进程所在客户端读取,然后解析info命令,第一个参数为command,后续参数为命令参数,,调用infoAdminHandler进行相应处理。

 shutdown命令类似。

 
// 给子进程发送心跳,检查其状态,如果无法获取到子进程的状态,说明该子进程已经死掉,所以要从workerClients列表中清除它。
private function _sendHeartbeatToWorkers() { foreach ($this->_workerClients as $key => $clientInfo) { $client = $clientInfo[‘client‘]; if ($this->_sendCommandToWorker($client, ‘status‘) === false) { unset($this->_workerClients[$key]); continue; } } } private function _sendCommandToWorker($client, $command, $arguments = array()) { $write = Protocol::write($client, $command, $arguments); // 往子进程写命令请求 if ($write === false) return false; $responseValue = Protocol::read($client, false); // 读取子进程返回的结果 if (empty($responseValue) === false) { $this->_responseCommand($client, $responseValue, self::COMMAND_FROM_WORKER); // 解析子进程的返回的结果 } return true; }

主进程往子进程写status命令请求,子进程读取status命令,并将处理结果返回给主进程,即主进程从子进程客户端读取status命令的处理结果。

status是子进程处理命令请求,返回结果。但是这个status命令请求是主进程发出来的。

后续的restart命令是子进程传给主进程,主进程处理restart命令请求,将该子进程杀掉,之后在检查该子进程不存在的时候,重新创建。

(4)启动子进程,并且轮询消耗队列,进行相关处理,同时

    public function start() {
        $this->_initialize();
        $this->_startLoop();
        $this->_finalize();

        exit;
    }

Worker的循环,不断的消耗队列。DoQueue由消费者去消耗queue->pop出的data。ReceiveCommands连接主进程,接收主进程的命令,并返回相应的结果。

private function _startLoop() {
        while ($this->_run) {
            $this->_queueStartTime = microtime(true);

            $this->_checkStatus();     // 检查相关的状态
            $this->_receiveCommands(); // 接收主进程的相关命令(比如status命令)

            if ($this->_pause) {
                usleep(1000);
                continue;
            }

            if ($this->_doQueue() === false) {
                usleep(1000);
            }
        }
    }

该框架的难点在于:进程间通信(包括主进程和子进程之间的通信,也包括管理进程和主进程之间的TCP通信)

打日志----标记时间,文件名,类名,函数名,功能,来处理相关函数。

还非常需要流程图!!!!

 

 

技术分享

daemon框架

标签:

原文地址:http://www.cnblogs.com/TsingLo/p/4678565.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!