标签:des blog http io ar os sp for 数据
主进程
$module = basename($_SERVER[‘SCRIPT_FILENAME‘], ‘.php‘);
$__doc__ = "Usage:{$module}.php start|stop|restart \n";
$cmd = isset($_SERVER[‘argv‘][1]) ? strtolower($_SERVER[‘argv‘][1]) : ‘start‘;
$procNum = isset($_SERVER[‘argv‘][2]) ? intval($_SERVER[‘argv‘][2]) : 1;
$daemon = new ForkPool(array(
‘module‘ => "rsyncdata",
‘pidDir‘ => "/home/services/daemon/"
));
$msg= new MsgTask(IPC_KEY_FILE_EXPORT,IPC_MSGQUEUE_SIZE,‘msgHandle‘);
$log = ResourceManager::get_logger();
$invoice = ResourceManager::get_logger(‘rsync_invoice‘);
$db = ResourceManager::getDatabase(‘chaohaowan‘);
$daemon->setTask($msg, $procNum);
switch ( $cmd ) {
case ‘start‘:
$daemon->start();
break;
case ‘stop‘:
$daemon->stop();
break;
case ‘restart‘:
$daemon->restart();
break;
default:
echo $__doc__;
echo "please input [start | stop]\n";
break;
}
function msgHandle($data)
{
global $invoice;
$invoice->err(‘data:‘ . var_export($data, true));
if (empty($data[‘cmd‘]) || empty($data[‘data‘]))
return;
$cmd = abs(intval($data[‘cmd‘]));
switch ($cmd)
{
case CMD_RSYNC_ADD_POST:rsync_add_post($data[‘data‘]);break;
case CMD_RSYNC_DELETE_POST:rsync_delete_post($data[‘data‘]);break;
case CMD_RSYNC_MOD_POST:rsync_modify_post($data[‘data‘]);break;
case CMD_RSYNC_ADD_TAG:rsync_add_term($data[‘data‘], TERMTYPE_TAG);break;
case CMD_RSYNC_MOD_TAG:rsync_modify_term($data[‘data‘]);break;
case CMD_RSYNC_ADD_CATEGORY:rsync_add_term($data[‘data‘], TERMTYPE_CATEGORY);break;
case CMD_RSYNC_MOD_CATEGORY:rsync_modify_term($data[‘data‘]);break;
case CMD_RSYNC_ADD_USER:rsync_add_user($data[‘data‘]);break;
case CMD_RSYNC_MOD_USER:rsync_modify_user($data[‘data‘]);break;
case CMD_RSYNC_MOD_POSTMETA:rsync_modify_postmeta($data[‘data‘]);break;
case CMD_RSYNC_BBS : rsync_bbs($data[‘data‘]);break;
case CMB_RSYNC_BBS_GAMESTORE : rsync_gamestore($data[‘data‘]);break;
default:
break;
}
return;
}
线程池
class ForkPool {
/**
* 任务中实现修饰ForkPool的装饰方法名
*
*/
const FUCTIONNAME_DRESS =‘dressFork‘;
public $_options = array(
‘pidDir‘ => ‘./‘,
‘module‘ => ‘‘,
‘maxPerChild‘ => 36000,
‘startNum‘ => 1
);
protected $_running = true;
protected $_procList = array();
/**
* Fork_Task
*
* @var Frok_Task
*/
protected $_task = array();
protected $_procNum = 1;
protected $_pidFile;
public function __construct(array $cfg = array()) {
$this->_options = array_merge($this->_options, $cfg);
if ( empty($this->_options[‘module‘]) ) {
$this->_options[‘module‘] = basename($_SERVER[‘SCRIPT_FILENAME‘], ‘.php‘);
}
}
public function setTask(Fork_Task $task, $procNum = ‘‘) {
$procNum = ( empty($procNum) ) ? $this->_options[‘startNum‘] : intval($procNum);
if ( $procNum > 1 ) {
require_once BASE_PATH . ‘/baselib/Fork/Multi.php‘;
$this->_task = new Fork_Multi($task);;
} else {
$this->_task = $task;
}
/* 调用任务的装饰方法 */
$func_name_dress=self::FUCTIONNAME_DRESS;
if(method_exists($task,$func_name_dress)){
$task->$func_name_dress($this);
}
/**/
$this->_pidFile = rtrim($this->_options[‘pidDir‘], "\\/") . ‘/‘ . $this->_options[‘module‘] . ‘.pid‘;
/**/
$this->_procNum = $procNum;
}
public function start() {
if ( $this->isRunning() ) {
echo "daemon is running\n";
exit;
}
$pid = pcntl_fork();
if ( $pid == -1 ) {
die("fork(1) failed!\n");
} elseif ( $pid > 0 ) {
//让由用户启动的进程退出
exit;
}
//建立一个有别于终端的新session以脱离终端
posix_setsid();
$pid = pcntl_fork();
if ( $pid == -1 ) {
die("fork(2) failed!\n");
} elseif ( $pid > 0 ) {
//父进程退出, 剩下子进程成为最终的独立进程
exit;
}
//设置进程信号过滤器
pcntl_signal(SIGHUP, SIG_IGN);
pcntl_signal(SIGTTIN, SIG_IGN);
pcntl_signal(SIGTTOU, SIG_IGN);
pcntl_signal(SIGQUIT, SIG_IGN);
pcntl_signal(SIGINT, SIG_IGN);
pcntl_signal(SIGTERM, SIG_IGN);
pcntl_signal(SIGUSR1, array($this, ‘interrupt‘));
file_put_contents($this->_pidFile, posix_getpid());
$log = Logger::getLogger();
$log->server(‘daemon started.‘);
if ( $this->_procNum > 1 ) {
$space = ceil($this->_options[‘maxPerChild‘] / $this->_procNum );
for ( $i = 0; $i < $this->_procNum; ++$i ) {
$pid = $this->_task->run($space * ($i + 1));
$this->_procList[$pid] = time();
usleep(100000);
}
unset($space);
$count = 0;
$status = 0;
while ( true ) {
$pid = pcntl_wait($status, WNOHANG);
if ( $pid > 0 ) {
unset($this->_procList[$pid]);
}
if ( $this->_running ) {
if ( $pid > 0 ) {
//$pid = $this->_task->start($this->_options[‘maxPerChild‘]);
$this->_procList[$pid] = time();
$log->server(‘fork new process pid: ‘ . $pid);
}
} else {
if ( $pid > 0 ) {
$count = 0;
}
++$count;
if ( ($count > 500) || (count($this->_procList) === 0) ) {
break;
}
usleep(1000 * (count($this->_procList) + 1));
}
usleep(10000);
}
} else {
while ( $this->_running ) {
$this->_task->run();
}
}
echo "daemon stopped.\n";
$log->server(‘daemon stoped.‘);
}
public function isRunning() {
clearstatcache();
if ( is_file($this->_pidFile) ) {
$pid = intval(file_get_contents($this->_pidFile));
if ( is_link(‘/proc/‘ . $pid . ‘/exe‘) ) {
$execLink = readlink(‘/proc/‘ . $pid . ‘/exe‘);
if ( $execLink == PHP_BINDIR . ‘/php‘ ) {
$cmdline = file_get_contents(‘/proc/‘ . $pid . ‘/cmdline‘);
$execName = basename($_SERVER[‘SCRIPT_FILENAME‘]);
if ( strpos($cmdline, $execName) === false ) {
} else {
//running
return true;
}
}
}
}
return false;
}
public function stop() {
if ( is_file($this->_pidFile) ) {
$pid = intval(file_get_contents($this->_pidFile));
if ( $pid > 0 ) {
if ( is_link(‘/proc/‘ . $pid . ‘/exe‘) ) {
$execLink = readlink(‘/proc/‘ . $pid . ‘/exe‘);
if ( $execLink == PHP_BINDIR . ‘/php‘ ) {
$cmdline = file_get_contents(‘/proc/‘ . $pid . ‘/cmdline‘);
$execName = basename($_SERVER[‘SCRIPT_FILENAME‘]);
if ( (strpos($cmdline, $execName) === false) ) {
//非自己启动的进程
echo "process check error!\n";
} else {
$ret = posix_kill($pid, SIGUSR1);
if ( $ret === false ) {
echo "stop daemon error!\n";
} else {
echo "stop daemon....\n";
//unlink pid file
unlink($this->_pidFile);
}
}
} else {
echo "process check error!\n";
}
} else {
echo "process has exited!\n";
}
}
} else {
echo "pid file not find!\n";
}
}
public function restart() {
$this->stop();
echo "restarting.......\n";
sleep(1);
$this->start();
echo "restart finished\n";
}
public function interrupt() {
$log = Logger::getLogger();
$this->_running = false;
if ( $this->_procNum > 1 ) {
foreach ( $this->_procList as $pid => $info ) {
$ret = posix_kill($pid, SIGUSR2);
if ( $ret === false ) {
$log->err("stopped process $pid error!");
} else {
$log->server("process $pid stopped!");
}
}
}
}
}
进程池派生进程
class Fork_Multi {
protected $_taskRuns = 0;
protected $_running = true;
/**
* wrap task
*
* @var Fork_Task
*/
protected $_task = null;
protected $_pid = 0;
public function __construct(Fork_Task $task) {
$this->_task = $task;
}
public function run($taskRuns = 3600) {
$pid = pcntl_fork();
if ( $pid === -1 ) {
exit;
} elseif ( $pid === 0 ) {
pcntl_signal(SIGUSR2, array($this, ‘stop‘));
$this->_taskRuns = $taskRuns;
$log = Logger::getLogger();
$this->_pid = posix_getpid();
while ( $this->_running ) {
if ( $this->_taskRuns < 1 ) {
//stop
exit(0);
}
$this->_task->run();
--$this->_taskRuns;
}
} elseif ( $pid > 0 ) {
return $pid;
}
//must exit
exit;
}
public function stop() {
$this->_running = false;
}
}
实际工作进程
class MsgTask implements Fork_Task {
protected $_date,$log;
public $code = 0;
public $msg = ‘‘;
private $delay = 5000;
protected $callback,$key,$size;
protected $msg_queue;
public function __construct($key=null,$size = null,$callback=null) {
$this -> _date = date(‘Y-m-d‘);
$this -> log = Logger::getLogger();
$this->key = $key;
$this->size = $size;
$this->msg_queue = MsgQueue::init($this->key)->open($this->size);
$this->callback = $callback;
}
public function run() {
$this->main();
usleep($this->delay);
}
public function dressFork(ForkPool &$fork_pool){
$fork_pool->_options[‘module‘].=‘_‘.$this->key;
}
public function main(){
if(is_callable($this->callback)){
$this->getAndDo();
}else{
$this->log->fatal("this handle have not method: msgHandle!");
exit(3);
}
}
public function setKey($key){
$this->key=$key;
return $this;
}
public function setDelay($microseconds)
{
$this->delay = $microseconds;
return $this;
}
public function setCallback($callback){
$this->callback=$callback;
return $this;
}
public function getAndDo($callback=null){
if(is_null($callback))
$callback=$this->callback;
$content = $this->msg_queue->get();
if (empty($content))
return false;
if(is_callable($callback) && !empty($content)){
$ret=call_user_func_array($callback,array($content));
}
}
protected function _clearErr(){
$this->code=0;
$this->msg=‘‘;
}
public function __destruct() {
}
}
IPC通信
/**
* 队列
*
*/
interface Queue {
public function open($size=0xffff, $seq_key = 1);
/**
* 从队列中获取一条记录
*
* @return mix
*/
public function get();
/**
* 添加到队列
*
* @param mix $v
* @return bool
*/
public function add($v);
}
/**
* 通过消息队列实现的队列
*
*/
class MsgQueue implements Queue {
protected $key=‘‘;
protected $msg_queue=null;
protected $msgtype=0;
function __construct($key){
$this->key=$key;
return $this;
}
/**
* 建立一个基于消息队列的队列
*
* @param string $key KEY,可采用ftok(__FILE__,‘Q‘)
* @return MsgQueue
*/
static function init($key) {
return new self ( $key );
}
/**
* 获取队列
*
* @param int $size
* @param int $msgtype
* @return MsgQueue
*/
function open($size=0xffff,$msgtype=1){
$key=$this->key;
$this->size=$size;
$this->msgtype=$msgtype;
//权限要够, 不然httpd进程写不进来
$this->msg_queue = msg_get_queue($key, 0777);
if (!$this->msg_queue) {
$this->msg_queue = msg_get_queue($key, 0777);
if (!$this->msg_queue) {
//$this->log->fatal("fail get queue,key:{$key}.");
return FALSE;
}
}
return $this;
}
function get(){
$content = null;
$code = 0;
$dummy = null;
//echo microtime(TRUE)."\n\n";
if(!is_resource( $this->msg_queue)){
return FALSE;
}
//-----------------队列, 期望类型,真实类型,最大字节数,内容回传, 反序列化,阻塞,错误码
//最大字节会过大会导致内存不够
$ret = msg_receive ( $this->msg_queue, $this->msgtype, $dummy, $this->size, $content, TRUE, MSG_IPC_NOWAIT, $code );
//echo microtime(TRUE)."\n\n++";
//收包出错
if (!$ret) {
//$this->log->fatal("fail receive queue, retrying",$code);
//收包出错, 重新获取消息队列再来过
return FALSE;
}
return $content;
}
function add($content){
if(!is_resource( $this->msg_queue)){
return FALSE;
}
//向消息队列中写
return msg_send( $this->msg_queue, 1,$content,TRUE);
}
function destory(){
return msg_remove_queue($this->msg_queue);
}
}
/**
* 基于共享内存的队列
*
*/
class ShmQueue implements Queue {
protected $key = ‘‘;
protected $sem_id, $shm_id, $seq_key;
public function __construct($key) {
$this->key = $key;
return $this;
}
/**
* 建立一个基于共享内存的队列
*
* @param string $key 内存地址KEY,可采用ftok(__FILE__,‘Q‘)
* @return ShmQueue
*/
static function init($key) {
return new self ( $key );
}
/**
* 开辟内存空间
*
* @param int $size 空间大小 (bytes)
* @param mix $seq_key 内存key
* @return unknown
*/
public function open($size=0xffff, $seq_key = 1) {
$IPC_KEY = $this->key;
if (! $this->sem_id) {
$sem_id = sem_get ( $IPC_KEY ); //创建或获得一个现有的,以$IPC_KEY为KEY的信号量
$this->sem_id = $sem_id;
$this->seq_key = $seq_key;
} else {
$sem_id = $this->sem_id;
$seq_key = $this->seq_key;
}
$this->shm_id = $shm_id = shm_attach ( $IPC_KEY, $size, 0666 ); //创建或关联一个现有的,以$IPC_KEY为KEY的共享内存
sem_acquire ( $sem_id ); //占有信号量,相当于上锁,同一时间内只有一个流程运行此段代码
$val = @shm_get_var ( $shm_id, $seq_key ); //从共享内存中获得值
if ($val && is_array ( $val )) {
} else {
$val = array ();
shm_put_var ( $shm_id, $seq_key, $val );
}
sem_release ( $sem_id ); //释放信号量,相当于解锁
return $val;
}
/**
* 关闭共享内存关联
*
*/
public function close() {
$shm_id = $this->shm_id; //创建或关联一个现有的,以$IPC_KEY为KEY的共享内存
shm_detach ( $shm_id ); //关闭共享内存关联
}
/**
* 销毁共享内存中的所有数据
*
*/
public function destory() {
$sem_id = &$this->sem_id;
if($this->shm_id){
shm_remove ( $this->shm_id );
shm_detach ( $this->shm_id ); //关闭共享内存关联
}
if ($sem_id) {
sem_remove ($sem_id);
}
}
/**
* 从队列中获取一条记录
*
* @return mix
*/
public function get() {
$shm_id = $this->shm_id;
$sem_id = $this->sem_id;
if (! ($sem_id && $shm_id))
return FALSE;
sem_acquire ( $sem_id ); //占有信号量,相当于上锁,同一时间内只有一个流程运行此段代码
$val = @shm_get_var ( $shm_id, $this->seq_key ); //从共享内存中获得值
if ($val && is_array ( $val )) {
$v = array_shift ( $val );
shm_put_var ( $shm_id, $this->seq_key, $val ); //将修改后的值写入共享内存
} else {
$v = NULL;
}
sem_release ( $sem_id ); //释放信号量,相当于解锁
return $v;
}
/**
* 添加一条记录到队列中
*
* @param mix $v
* @return bool
*/
public function add($v) {
$shm_id = $this->shm_id;
$sem_id = $this->sem_id;
if (! ($sem_id && $shm_id))
return FALSE;
sem_acquire ( $sem_id ); //占有信号量,相当于上锁,同一时间内只有一个流程运行此段代码
//$this->shm_id=$shm_id = shm_attach($this->key);
$val = shm_get_var ( $shm_id, $this->seq_key ); //从共享内存中获得值
if ($val && is_array ( $val )) {
array_push ( $val, $v );
} else {
$val = array ($v );
}
$status = shm_put_var ( $shm_id, $this->seq_key, $val ); //将修改后的值写入共享内存
sem_release ( $sem_id ); //释放信号量,相当于解锁
//shm_detach($shm_id);//关闭共享内存关联
return $status;
}
/**
* 一次添加多条记录到队列中
*
* @param mix $v
* @return bool
*/
public function addAll($v) {
$shm_id = $this->shm_id;
$sem_id = $this->sem_id;
if (! ($sem_id && $shm_id))
return FALSE;
sem_acquire ( $sem_id ); //占有信号量,相当于上锁,同一时间内只有一个流程运行此段代码
$val = shm_get_var ( $shm_id, $this->seq_key ); //从共享内存中获得值
if ($val && is_array ( $val )) {
$val=array_merge ( $val, $v );
} else {
$val = $v;
}
$status = shm_put_var ( $shm_id, $this->seq_key, $val ); //将修改后的值写入共享内存
sem_release ( $sem_id ); //释放信号量,相当于解锁
return $status;
}
}
/**
* 同步数据到旧的数据库
*
* @param int $cmd 命令
* @param int $type 要同步的类型
* @param array $data 同步的数据
*/
function rsync_data($cmd, $type ,$data, &$code, &$msg)
{
$code = 0;
$msg = ‘‘;
if ($cmd < CMD_RSYNC_ADD || $cmd > CMD_RSYNC_UPDATE)
{
$code = CODE::PARAM_SCALE_ERR;
$msg = "cmd {$cmd} is valid";
return false;
}
if ($type < TYPE_RSYNC_POST || $type > TYPE_RSYNC_USER)
{
$code = CODE::PARAM_SCALE_ERR;
$msg = "type {$type} is valid";
return false;
}
if (empty($data))
{
$code = CODE::PARAM_IS_NULL ;
$msg = "data is empty";
return false;
}
static $queue;
if (empty($queue))
$queue = MsgQueue::init(IPC_KEY_FILE_EXPORT)->open(IPC_MSGQUEUE_SIZE);
$queue->add(array(‘cmd‘=>$cmd, ‘type‘=>$type,‘data‘=>$data));
return true;
}
//end of script
标签:des blog http io ar os sp for 数据
原文地址:http://www.cnblogs.com/kudosharry/p/4123591.html