码迷,mamicode.com
首页 > 其他好文 > 详细

sphinx架构设计 -- 高并发rt实时索引

时间:2016-08-17 18:10:57      阅读:362      评论:0      收藏:0      [点我收藏+]

标签:

CleverCode最近在研究sphinx使用rt实时索引,总结了一下php调用的过程,并且总结了一下rt分布式架构设计。

1 安装Sphinx

   安装详解请查看:http://blog.csdn.net/clevercode/article/details/52204124。


2 配置rt索引文件

vim /usr/local/sphinx2/etc/realtime.conf

index username
{
    # 实时索引类型
    type = rt
    
    # 索引保存路径,平时都是保存在内存内,数据量超过内存量的时候会保存在文件内,这里随便存了下没放到data目录下
    path =/usr/local/sphinx2/var/data/username
     
    # utf-8' default value is
    charset_table = 0..9, A..Z->a..z, _, a..z,U+410..U+42F->U+430..U+44F, U+430..U+44F
    
    #对于非字母型数据的长度切割(默认已字符和数字切割,设置1为按没个字母切割)
    ngram_len = 1
    ngram_chars = U+3000..U+2FA1F
    
    # 全文检索字段声明,这里把实时索引的索引字段都声明出来
    rt_field = name
    rt_field = spell
    rt_field = shortspell
    
    #他属性字段,可以用来查询
    rt_attr_uint = isvalid 
    rt_attr_timestamp = ctime
    rt_attr_timestamp = utime    
    
    # 内存保存大小限制,超过这个就会保存到硬盘中
    rt_mem_limit = 64M
}

indexer
{
	max_iops= 40
	max_iosize= 1048576
}

searchd
{
    listen          = 9312
    listen          = 9306:mysql41

    log         = /usr/local/sphinx2/var/log/searchd.log
    query_log       = /usr/local/sphinx2/var/log/query.log
    max_children        = 1024
    pid_file        = /usr/local/sphinx2/var/log/searchd.pid
    query_log_format = sphinxql
    read_timeout = 5
    rt_flush_period = 172800
    seamless_rotate     = 1
    # ondisk_dict_default   = 1
    workers = threads
    mva_updates_pool    = 1M
    max_packet_size     = 64M
    max_filters     = 256
    binlog_path        = /tmp
    binlog_max_log_size = 1024M
    read_buffer     = 32M
  # read_unhinted     = 32K
    max_batch_queries   = 32
    subtree_docs_cache  = 64M
    subtree_hits_cache  = 64M
    dist_threads        = 24
    thread_stack            = 128K
    client_timeout        = 300
}


3 启动Sphinx(实时索引不需要启动indexer

# pkill searchd  
# /usr/local/sphinx2/bin/searchd --config /usr/local/sphinx2/etc/realtime.conf

4 查看rt索引结构

技术分享


5 更新rt数据源

5.1 SphinxRt类的封装。这个类是根据:http://www.sphinxsearch.org/sphinx-realtime-api。提供的简单改版。

<?php

class SphinxRt
{
    private $_link; //sphinx 连接池
    protected $_field = array(); //当前索引的字段属性
    protected $_sql = array(); //sql表达式
    protected $queryStr = ''; //查询的sql

    public $rt = '' ; //當前索引
    public $error = ''; //最后的错误信息

    public $debug = false; //调试状态

    //构造函数
    public function __construct($rt='',$host='127.0.0.1:9306')
    {
        try {
            $this->_link = mysql_connect($host);
            if(!$this->_link)
            {
                throw  new Exception('sphinx 实时索引服务器连接失败!');
            }
            if($rt !='')
            {
                $this->rt = $this->_sql['rt'] = $rt;
            }
        }
        catch (Exception $e)
        {
            $this->error = $e->getMessage();          
        }
    }

    /**
      +----------------------------------------------------------
      * @todo 设置索引表
      * @access public 
      * @param param
      * @return void
      +----------------------------------------------------------
     */     
    public  function rt($rt)
    {
        $this->_sql['rt'] = $this->rt = $rt;
        return $this;
    }

    /**
      +----------------------------------------------------------
      * @todo where 匹配条件.注意:这里一定要主动加上where 关键词 不能出现这样的情况 where 1
      * @access public 
      * @param $where
      * @return void
      +----------------------------------------------------------
     */     
    public  function where($where)
    {
        $this->_sql['where'] = $where;
        return $this;
    }

    /**
         +----------------------------------------------------------
         * @todo limit
         * @access public 
         * @param param
         * @return void
         +----------------------------------------------------------
        */     
    public  function limit($limit)
    {
        $this->_sql['limit'] = $limit;
        return $this;
    }

    /**
            +----------------------------------------------------------
            * @todo option 评分权值设定等
            * @access public 
            * @param param
            * @return void
            +----------------------------------------------------------
           */     
    public  function option($option)
    {
        $this->_sql['option'] = $option;
        return $option;
    }
    /**
            +----------------------------------------------------------
            * @todo field
            * @access public 
            * @param param
            * @return void
            +----------------------------------------------------------
           */     
    public  function field($field)
    {
        $this->_sql['field'] = $field;
        return $this;
    }

    /**
               +----------------------------------------------------------
               * @todo order
               * @access public 
               * @param param
               * @return void
               +----------------------------------------------------------
              */     
    public  function order($order)
    {
        $this->_sql['order'] = $order;
        return $this;
    }
    /**
  +----------------------------------------------------------
  * @todo group
  * @access public 
  * @param param
  * @return void
  +----------------------------------------------------------
 */     
    public  function group($group,$withGroup)
    {
        $this->_sql['group'] = $group;
        if($group)
        {
            $this->_sql['withGroup'] = $withGroup;
        }
        return $this;
    }

    /**
      +----------------------------------------------------------
      * @todo 检索数据,并对数据进行排序,过滤,评分设定等
      * @access public 
      * @param param
      * @example select * from rt where match('keyword') group by gid WITHIN GROUP ORDER BY @weight DESC
      *          order by gid desc limit 0,1 option ranker=bm25,max_matches=3,field_weights=(title=10,content=3);
      * @return array
      +----------------------------------------------------------
     */     
    public  function search()
    {
        //排序
        if($this->_sql['order'] != '')
        {
            $orderSql = ' ORDER BY '.$this->_sql['order'];
        }
        //分组聚合
        if($this->_sql['group'] !='')
        {
            $groupSql = ' GROUP BY '.$this->_sql['group'];
            //组内排序
            if ($this->_sql['withGroup']!='') {
                $groupSql .= ' WITHIN GROUP ORDER BY '.$this->_sql['withGroup'];
            }
        }
        //附加选项
        if($this->_sql['option'] !='')
        {
            $optionSql = ' OPTION '.$this->_sql['option'];
        }
        //数量限制
        if($this->_sql['limit']!='')
        {
            $limitSql = 'limit '.$this->_sql['limit'];
        }
        //字段
        if($this->_sql['field']=='')
        {
            $field = '*';
        }
        else
        {
            $field= $this->_sql['field'];
        }

        if($this->_sql['where']!='')
        {
            $where = $this->_sql['where'];
        }
        else
        {
            $where ='';
        }

        $this->queryStr = sprintf("SELECT %s FROM %s %s %s %s %s %s",$field,$this->_sql['rt'],$where,$groupSql,$orderSql,$limitSql,$optionSql);

        $rs = $this->query();

        if($rs)
        {
            $resArr = array();
            while ($row = mysql_fetch_assoc($rs)) {
                $resArr[] = $row;
            }
            $resArr['meta'] = $this->getMeta();
            return $resArr;
        }
        return false;
    }


    /**
      +----------------------------------------------------------
      * @todo 添加索引,注意,这里的添加并未考虑并发操作,可能在sphinx端会出现id冲突
      * @access public 
      * @param mixed $data  插入的数据
      * @return bool
      +----------------------------------------------------------
     */     
    public  function insert($data,$lastId=0)
    {
        if(!empty($data))
        {
            if($lastId===0)
            {
                $lastId = $this->getLastId();
            }
	    $fields = $values = '';
            foreach ($data as $k=>$v) {
                $fields .= ','.$k;
                $values .= ",'".$v."'";
            }
            $this->queryStr = "insert into ".$this->_sql['rt']."(id".$fields.") values ($lastId {$values})";
            return $this->query();
        }
        $this->error = '插入数据不能为空';
        return false;
    }
    /**
      +----------------------------------------------------------
      * @todo 批量插入数据
      * @access public 
      * @param mixed $datas
      * @param boolean $asStr 是否使用逗号分隔的方式一次性插入
      * @return void
      +----------------------------------------------------------
     */     
    public  function insertAll($datas,$asStr=true)
    {
        if(!empty($datas))
        {
            $fields = 'id'; //字段
            $values ='';    //值
            $lastId = $this->getLastId();
            $i = 0;
            foreach ($datas as $k=>$v) {
                //一次性插入数据,格式化
                if($asStr)
                {
                    $values .=',('.($i+$lastId);
                    foreach ($v as $kk=>$va) {
                        //属性字段
                        if($i==0)
                        {
                            $fields .= ','.$kk;
                        }
                        $values .= ",'".$va."'";
                    }
                    $i++;
                    $values .= ')';
                }
                else
                {
                    $this->insert($v,$lastId);
                }
            }

            //批量数据sql格式化
            if($asStr)
            {
                $values = ltrim($values,',');
                $this->queryStr = sprintf("insert into {$this->_sql['rt']}(%s) values %s",$fields,$values);
                return $this->query();
            }

        }
        else
        {
            $this->error = '无效数据!';
            return false;
        }

    }


    /**
     +----------------------------------------------------------
     * @todo 更新索引数据
     * @access public 
     * @param mixed $data 要更新的数据
     * @param int  $id  更新条件id
     * @return bool
     +----------------------------------------------------------
     */     
    public  function update($data,$id,$insert=true)
    {
        if(!empty($data) || $id>0)
        {
            //如果未找到记录且不需要不需要插入的话
            if($insert ===false && $this->getById($id) ===false) return true;

            foreach ($data as $k=>$v) {
                $fields .= ','.$k;
                $values .= ",'".$v."'";
            }
            //若该条数据不存在,直接插入
            $this->queryStr = "replace into ".$this->_sql['rt']."(id".$fields.") values ($id{$values})";
            return $this->query();
        }
        $this->error = '无效更新数据!';
        return false;

    }

    /**
      +----------------------------------------------------------
      * @todo 条件删除索引,如,根据外部id删除
      * @access public 
      * @param $condition
      * @return void
      +----------------------------------------------------------
     */     
    public  function delBy($condition)
    {
        $rs = $this->where($condition)->search();

        if($rs)
        {
            foreach ($rs as $v) {
                if($v['id']) $idArr[] = $v['id'];
            }
            $this->delete($idArr);
            return true;
        }
        return false;
    }


    /**
    +----------------------------------------------------------
    * @todo 删除索引数据,sphinx暂未提供批量删除的功能,如 in (123,34,565);
    * @access public 
    * @param mixed $id 
    * @return void
    +----------------------------------------------------------
   */     
    public  function delete($id)
    {
        if(is_array($id) && count($id)>=1)
        {
            $rs = true;
            foreach ($id as $v) {
                $this->queryStr = sprintf("delete from %s where id=%d",$this->_sql['rt'],$v);
                $rs &= $this->query();
            }
        }
        else
        {
            $this->queryStr = sprintf("delete from %s where id=%d",$this->_sql['rt'],$id);
            $rs =  $this->query();
        }

        return $rs;
    }
    /**
      +----------------------------------------------------------
      * @todo 清空表
      * @access public 
      * @return bool
      +----------------------------------------------------------
     */     
    public  function truncate()
    {
        $lastId = $this->getLastId();
        for ($i=1;$i<=$lastId;$i++)
        {
            $this->delete($i);
        }
        return true;
    }


    /**
      +----------------------------------------------------------
      * @todo 获取总记录
      * @access public 
      * @param param
      * @return void
      +----------------------------------------------------------
     */     
    public  function countAll()
    {
        $this->queryStr = "SELECT * FROM $this->_sql['rt'] ";
        $this->query();
        $meta = $this->getMeta();
        if($meta)
        {
            return  $meta['total_found'];
        }
        return false;
    }

    /**
      +----------------------------------------------------------
      * @todo 获取当前最大值id,实现如mysql的auto_increment功能
      * @access public 
      * @param param
      * @return void
      +----------------------------------------------------------
     */     
    public  function getLastId()
    {
        $this->queryStr = "select * from {$this->_sql['rt']} order by id desc limit 1";
        $rs = $this->query();

        //若存在值,则取最大id的值,否则为1
        $row = mysql_fetch_assoc($rs);
	$lastId = 1;
        if($row)
        {
            $lastId = $row['id']+1;
        }
        return $lastId?$lastId:1;

    }

    /**
         +----------------------------------------------------------
         * @todo 获取查询状态值
         * @access protected 
         * @param param
         * @return array();
         +----------------------------------------------------------
        */     
    protected  function getMeta()
    {
        $metaSql = "show meta";
        $meta = mysql_query($metaSql);
        while ($row = mysql_fetch_assoc($meta)) {
            $metaArr[$row['Variable_name']] = $row['Value'];
        }
        return $metaArr;
    }

    /**
      +----------------------------------------------------------
      * @todo 根据id获取记录
      * @access public 
      * @param int $id
      * @return array
      +----------------------------------------------------------
     */     
    public  function getById($id)
    {
        if($id>0)
        {
            $sql = "'select * from $this->rt where id=".$id;
            $rs = mysql_query($sql);
            $row = mysql_fetch_assoc($rs);
            return $row;
        }
        return false;
    }

    /**
      +----------------------------------------------------------
      * @todo 获取索引的字段值,前提条件是索引服务器中必须至少一个值,暂时没有api显示可以直接像mysql 的语句 desc table 来获取索引的字段;
      * @access public
      * @param param
      * @return void
      +----------------------------------------------------------
     */     
    public  function _getField($rt)
    {
        $rt = $rt?$rt:$this->rt;
        $this->queryStr = "select * from {$rt} limit 1";
        $res = $this->query();
        if($res)
        {
            $row = mysql_fetch_assoc($res);
            $field = array_keys($row);
            unset($field[1]); //去掉weight,这个字段是sphinx的权重值
            return $field;
        }
        else
        {
            $this->error = '实时索引'.$rt.'没有任何记录,无法获取索引字段';
            return false;
        }
    }

    /**
      +----------------------------------------------------------
      * @todo mysql查询
      * @access public 
      * @param param
      * @return void
      +----------------------------------------------------------
     */     
    public  function query($sql = '')
    {
        if($sql == '')
        {
            $sql = $this->queryStr;
        }
        if(!$this->_link) $this->triggerDebug($this->debug);
        
        $rs = mysql_query($sql,$this->_link);
        if(!$rs) $this->error = mysql_error();
        $this->triggerDebug($this->debug);
        return $rs;
    }

    /**
      +----------------------------------------------------------
      * @todo 获取错误信息
      * @access public       
      * @return string
      +----------------------------------------------------------
     */     
    public  function getError()
    {
        return $this->error;
    }

    /**
         +----------------------------------------------------------
         * @todo 获取最后的sql语句
         * @access public 
         * @param param
         * @return string
         +----------------------------------------------------------
        */     
    public  function getLastSql()
    {
        return $this->queryStr;
    }

    /**
      +----------------------------------------------------------
      * @todo 触发错误信息
      * @access public 
      * @param param
      * @return void
      +----------------------------------------------------------
     */     
    public  function triggerDebug($debugMode=false)
    {
        if($debugMode)
        {
            $debugInfo = debug_backtrace();

            $errorStr = 'file:'.$debugInfo[0]['file'];
            $errorStr .= '<br />line:'.$debugInfo[0]['line'];
            $errorStr .= '<br />sql:'.$debugInfo[0]['object']->queryStr;
            $errorStr .= '<br />error:<font color="red">'.$debugInfo[0]['object']->error.'</font>';

            if($debugInfo[0]['object']->error!='')die($errorStr);
            echo ($errorStr);
        }
        return ;
    }

}

5.2 更新数据源

vim modifySource.php

<?php
require_once "SphinxRt.php";

function insert($data)
{
	$sphinx = new SphinxRt('username','127.0.0.1:9306');
	$sphinx->insert($data);
}

function start()
{
	$data = array();
	$name = '张三';
	$utf8Name = iconv("GBK","UTF-8//IGNORE",$name);
	$data['name'] = $utf8Name; 
	$data['spell'] = 'zhangsan'; 
	$data['shortspell'] = 'zs'; 
	$data['isvalid'] = 1; 
	$data['ctime'] = '2016-08-17 12:00:00'; 
	$data['utime'] = '2016-08-17 12:00:00'; 
	$ret = insert($data);
	print_r($ret);
}

start();

?>


查看数据

技术分享

5.2 查询数据

vim search.php

<?php

//分词
function parseWord($word)
{
	$so = scws_new();
	$so->set_charset('utf-8');
	//默认词库
	$so->add_dict(ini_get('scws.default.fpath') . '/dict.utf8.xdb');
	//自定义词库
	// $so->add_dict('./dd.txt',SCWS_XDICT_TXT);
	//默认规则
	$so->set_rule(ini_get('scws.default.fpath') . '/rules.utf8.ini');

	//设定分词返回结果时是否去除一些特殊的标点符号
	$so->set_ignore(true);

	//设定分词返回结果时是否复式分割,如“中国人”返回“中国+人+中国人”三个词。
	// 按位异或的 1 | 2 | 4 | 8 分别表示: 短词 | 二元 | 主要单字 | 所有单字
	//1,2,4,8 分别对应常量 SCWS_MULTI_SHORT SCWS_MULTI_DUALITY SCWS_MULTI_ZMAIN SCWS_MULTI_ZALL
	$so->set_multi(false);

	//设定是否将闲散文字自动以二字分词法聚合
	$so->set_duality(false);

	//设定搜索词
	$utf8Key = iconv("GBK","UTF-8//IGNORE",$word);
	$so->send_text($utf8Key);
	$words_array = $so->get_result();
	$so->close();

	return $words_array;
}

//查询结果
function search($words)
{
	$sc = new SphinxClient();
	$sc->SetServer('127.0.0.1',9312);
	$sc->SetMatchMode(SPH_MATCH_ALL);
	//$sc->SetMatchMode(SPH_MATCH_EXTENDED);
	$sc->SetArrayResult(TRUE);
	$res = $sc->Query($words);
	return $res;
}

function start()
{
	$key = '张三';
	
	//分词
	$words_array =  parseWord($key); 
	if(false == is_array($words_array) || count($words_array) < 1)
	{
	    echo "words_array is empty!";
	    return;
	}
	
        $words = '';
	foreach($words_array as $v)
	{
		$words = $words.'|('.$v['word'].')';
	}

       $words = trim($words,'|');  	

        //搜索	
	$res = search($words);
	$str = print_r($res,true);
	//打印
 
	echo '<p>输入:'.$key.'</p>'."\r\n";
	echo '<p>分词:'.iconv("UTF-8","GBK//IGNORE",$words).'</p>'."\r\n";
	echo iconv("UTF-8","GBK//IGNORE",$str);
	
}

start();

?>

打印结果

<p>输入:张三</p>
<p>分词:(张三)</p>
Array
(
    [error] => 
    [warning] => 
    [status] => 0
    [fields] => Array
        (
            [0] => name
            [1] => spell
            [2] => shortspell
        )

    [attrs] => Array
        (
            [isvalid] => 1
            [ctime] => 2
            [utime] => 2
        )

    [matches] => Array
        (
            [0] => Array
                (
                    [id] => 1
                    [weight] => 2
                    [attrs] => Array
                        (
                            [isvalid] => 1
                            [ctime] => 2016
                            [utime] => 2016
                        )

                )

        )

    [total] => 1
    [total_found] => 1
    [time] => 0.001
    [words] => Array
        (
            [张] => Array
                (
                    [docs] => 1
                    [hits] => 1
                )

            [三] => Array
                (
                    [docs] => 1
                    [hits] => 1
                )

        )

)


6 rt分布式架构与负载均衡设计

当username的索引足够大的时候,以及并发量特别高的时候,可以考虑以下架构设计。

6.1 更新数据源

     当需要更新数据源的时候(modifySource)的时候,把需要更新的机器都更新一遍。即192.168.100,92.168.101,92.168.102。


6.2 查询

     当需要查询的时候(search),可以根据192.168.100,92.168.101,92.168.102处理能力的权重选择一台机器进行检索。

技术分享

8 源码下载

http://download.csdn.net/download/clevercode/9605832。




sphinx架构设计 -- 高并发rt实时索引

标签:

原文地址:http://blog.csdn.net/clevercode/article/details/52231015

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!