ElasticSearch(九)e代驾使用Elasticsearch流程设计(Yii1版本)

一、控制器层的更新、添加、删除

class AddKnowledgeAction extends CAction {
    //add and  update
    public function actionPost() { 
      if ($_POST) { //如果是post操作
          $res = array('code'=>0,'message'=>'');
          $kid = Yii::app()->request->getPost('kid');   //这里是知识主键id
          $cid = Yii::app()->request->getPost('cid');
          $title = Yii::app()->request->getPost('title');
          $content = Yii::app()->request->getPost('content');
          $auth_group = Yii::app()->request->getPost('auth_group');
          $end_time = Yii::app()->request->getPost('end_time');
          $keywords = Yii::app()->request->getPost('keywords');
 
          if (empty($kid))  {  //$kid不存在则说明是走add操作,否则是update
             if ($kid = CallCenterService::addKnowledge($cid, $title, $content, $auth_group, $end_time, $keywords)) {
                //这里表示添加成功
             }
          } else {
              if (CallCenterService::updateKnowledge($kid, $cid, $title, $content, $auth_group, $end_time, $keywords)) {
                //这里表示修改成功
              }
          }
      }      
    }
    
    //delete
    public function actionDelete (){
        $kid = Yii::app()->request->getQuery('kid');
        $action = Yii::app()->request->getQuery('action'); //action => ['delete' => '物理删除', 'Invalid' => '逻辑删除']if ($kid && $action) {
            if (CallCenterService::delKnowledge($kid, $action)) {
                //表示删除成功
            }
        } 
    }
}

二、服务层的更新、添加、删除

//服务层
class CallCenterService {

    private static $instance;

    public static $auth_group = null;

    public static function getInstance() {
       if (empty(self::$instance)) {
            self::$instance = new CallCenterService();
        }
        return self::$instance;
    }

     /**
     * 添加知识
     */
    public static function addKnowledge($cid, $title, $content, $auth_group, $end_time, $keywords) {
        $model = new Knowledgenew;
        $operator = Yii::app()->user->id;
        $created = date('Y-m-d H:i:s');
        $model->attributes = array(
            'cid' => $cid,
            'title' => $title,
            'content' => $content,
            'operator' => $operator,
            'created' => $created,
            'auth_group' => $auth_group,
            'end_time' => $end_time,
            'keywords' => $keywords,
            'updated' => $created
        );


        if ($model->save()) {
            $id = $model->id;
            //异步添加索引到es
            Knowledgenew::onKnowledgeChanged('add', array('id' => $id));
            return $id;
        } else {
        }
        return false;
    }

    /**
     * 编辑知识
     */
    public static function updateKnowledge($kid, $cid, $title, $content , $auth_group, $end_time, $keywords) {

        $knowledge = Knowledgenew::getKnowledge($kid);
        if ($knowledge) {
            $model = new Knowledgenew;
            $model->updateByPk($kid,
                array(
                    'cid' => $cid,
                    'title' => $title,
                    'content' => $content,
                    'auth_group' => $auth_group,
                    'end_time' => isset($end_time) && !empty($end_time) ? $end_time : null,
                    'keywords' => $keywords,
                    'updated' => date('Y-m-d H:i:s')
                )
            );
            Knowledgenew::onKnowledgeChanged('update', array('id' => $kid));
            return true;
        }
        return false;
    }


    /**删除一条知识
     * @param $kid
     * @param string $action  Invalid => 逻辑删除 ,delete =>物理删除
     * @return bool
     */
    public static function delKnowledge($kid, $action = 'invalid') {
        $knowledge = Knowledgenew::getKnowledge($kid);

        if ($knowledge) {
            $model = new Knowledgenew;
            if ($action == 'delete') {
                $model->deleteByPk($kid);
            } else {
                $model->updateByPk($kid,array('status'=>Knowledgenew::STATUS_DEL));
            }
            //更新es
            Knowledgenew::onKnowledgeChanged('delete', array('id' => $kid));
            //删除收藏夹中的相关知识
            KnowledgenewCollection::model()->deleteAll("kid = $kid");
            return true;
        }
        return false;
    }

}

三、Model层的更新点击浏览次数场景及异步任务更新ES信息

//model层
class Knowledgenew extends CActiveRecord
{
    const STATUS_NORMAL = 1;
    const STATUS_DEL = 2; //Invalid


  public function tableName()
  {
    return '{{knowledgenew}}';
  }


  public static function model($className=__CLASS__)
  {
    return parent::model($className);
  }


    /**
     * 增加浏览数
     */
    public static function addClickNum($kid) {
        $model = self::model();
        $model->updateCounters(array('click_num'=>1),'id=:id',array(':id'=>$kid));
        Knowledgenew::onKnowledgeChanged('update', array('id' => $kid));
        return true;
    }


    //更新es信息
    public static function onKnowledgeChanged($action, $param){
       //echo '更新知识库索引action='.$action.PHP_EOL;
        EdjLog::info('更新知识库索引action='.$action);
        $base_param = array('es_source' => 'Knowledgenew', 'es_action' => $action);
        Queue::model()->putin( //异步
            array(
                'method'=>'synchronize_elasticsearch',
                'params'=>array_merge($base_param, $param)
            ),
            'synchronize_elasticsearch'
        );
    }
}

四、异步Job队列生产

<?php
/**
 * 基于redis的queue队列
 */
class Queue {
    private static $_models;

    public $queue_max_length = array(
    );

    public static function model($className=__CLASS__) {
        $model=null;
        if (isset(self::$_models[$className]))
            $model=self::$_models[$className];
        else {
            $model=self::$_models[$className]=new $className(null);
        }
        return $model;
    }

    //确定redis
    private function select_redis($type) {
        return QueuePool::model()->get_zone($type);
    }

    private function trim($queue_name) {

        $type = str_replace("queue_", "", $queue_name);
        $max = 0;
        if (isset($this->queue_max_length[$type])) {
            $max = intval($this->queue_max_length[$type]);
        }
        if ($max>0) {
            $zone = $this->select_redis($type);
            if($zone) {
                $zone['redis']->lTrim($queue_name, 0, $max-1);
            }
            else {
                EdjLog::error("can not find zone, queue name: " . $type);
                return;
            }
        }
    }

    /**
     * 放入队列,统一队列对外暴露方法,增加类型默认放task队列,指定了就放对应的队列,同时如果不在指定类型内的,也放默认队列
     *
     * @param unknown_type $params
     * @param unknown_type $type
     * @return mixed
     */
    public function putin($params=null, $type){
        $type = empty($type) ? 'error' : strtolower($type);

                $base_qname = QNameManagerService::model()->get_base_qname($type);

        if(!empty($base_qname)) {
            $this->queue_name = 'queue_'.$base_qname;
        }else{
            $this->queue_name = 'queue_error';
        }

        if ($params===null) {
            return $this->get();
        } else {
            return $this->add($params);  //如果add替换为processTask方法,则同步
        }
    }

    /**
     * 取一条队列数据,封装多个队列,统一调用方法
     * @param string $type
     * @return array
     */
    public function getit($type='default')
    {
        $base_qname = QNameManagerService::model()->get_base_qname($type);

        if(!empty($base_qname)) {
            $this->queue_name = 'queue_'.$base_qname;
        }else{
            return array();
        }

        $zone = $this->select_redis($type);
        if($zone) {
            if($zone['brpop']) {
                $json = '';
                $result = $zone['redis']->brPop($this->queue_name, $zone['brpop']);
                if(!empty($result) && isset($result[1])) {
                    $json = $result[1];
                }
            }
            else {
                $json = $zone['redis']->rPop($this->queue_name);
            }
        }
        else {
            EdjLog::error("can not find zone, queue name: " . $type);
            return array();
        }

        return json_decode($json, true);
    }

    /**
     * 返回队列接收的类型列表
     * @return array
     */
    public function getQueueTypeList()
    {
        $list = QNameManager::model()->findall();
        if($list) {
            return $list;
        }

        EdjLog::error("Error: get queue list from database");
        return array();
    }

    /**
     * 设置或者读取位置队列
     * @param array $params
     * @return mixed
     */
    public function position($params=null) {
        $this->queue_name='queue_position';

        if ($params===null) {
            return $this->get();
        } else {
            return $this->add($params);
        }
    }

    /**
     * 心跳队列
     * @param string $params
     * @return mixed
     */
    public function heartbeat($params=null) {
        $this->queue_name='queue_heartbeat';

        if ($params===null) {
            return $this->get();
        } else {
            return $this->add($params);
        }
    }

    /**
     * 最高优先级队列
     * @param string $params
     * @return mixed
     */
    public function task($params=null) {
        $this->queue_name='queue_task';

        if ($params===null) {
            return $this->get();
        } else {
            return $this->add($params);
        }
    }

    /**
     * 保存日志到数据库
     * @param string $params
     * @return mixed
     */
    public function dumplog($params=null) {
        $this->queue_name='queue_dumplog';

        if ($params===null) {
            return $this->get();
        } else {
            return $this->add($params);
        }
    }

    /**
     * 返回各个队列中的任务总数
     */
    public function length() {

        $queue = $this->getQueueTypeList();

        $queue_length=array();
        $reg = "/P[0-9]+$/";
        foreach($queue as $item) {
            $base_qname = $item->base_qname;
            $zone = $this->select_redis($base_qname);
            $key = 'queue_'.$base_qname;
            if($zone) {
                $len = $zone['redis']->lLen($key);
                if(isset($item->max) && $len > $item->max) {
                    $key = '!'.$key;
                }

                $pkey = '';
                if(preg_match($reg, $zone['name'])) {
                    $pkey = $key.'@'.$zone['name'];
                }
                else {
                    $pkey = $key.'@'.$zone['name']."_P".$item->level;
                }

                $queue_length[$pkey] = $len;
            }
            else {
                EdjLog::error("can not find zone, queue name: " . $key);
            }
        }

        return $queue_length;
    }

    private function get() {
        $type = str_replace("queue_", "", $this->queue_name);
        $zone = $this->select_redis($type);
        if($zone) {
            if($zone['brpop']) {
                $json = '';
                $result = $zone['redis']->brPop($this->queue_name, $zone['brpop']);
                if(!empty($result) && isset($result[1])) {
                    $json = $result[1];
                }
            }
            else {
                $json = $zone['redis']->rPop($this->queue_name);
            }
        }
        else {
            EdjLog::error("can not find zone, queue name: " . $type);
            return array();
        }
        return json_decode($json, true);
    }

    private function add($params) {
        $json=json_encode($params);
        $type = str_replace("queue_", "", $this->queue_name);
        $zone = $this->select_redis($type);
        $return = 0;
        if($zone) {
            try {
                $return = $zone['redis']->lPush($this->queue_name, $json);
            } catch (Exception $e) {
                EdjLog::error("write redis error,msg:".$e->getMessage());
                //echo $e->getMessage();
            }
        }
        else {
            EdjLog::error("can not find zone, queue name: " . $type);
        }

        return $return;
    }
 
    //如果add 替换为此方法,则同步
    public function processTask($task) {
        if(!isset($task['method'], $task['params'])) {
            $task_content = json_encode($task);
            EdjLog::error("can not run task due to no 'method' or 'params' specified, task is $task_content");
            return;
        }

        $method=$task['method'];
        $params=$task['params'];
        $class = isset($task['class']) ? $task['class'] : "QueueProcess";
        EdjLog::info("REDIS_QUEUE_OUT CLASS:$class METHOD:$method PARAMS:".json_encode($params));
        try {
            //throw new Exception("Value must be 1 or below");
            $queue_process=new $class();
            // check this method is exist, if not throw ReflectionException
            new ReflectionMethod($queue_process, $method);
            call_user_func_array(array($queue_process, $method), array($params));
        } catch(Exception $e) {
            $errmsg = $e->getMessage();
            EdjLog::error("execption queue_run method:$method err: $errmsg");
        }
    }

    public function getLengthByType($type){
        $type = empty($type) ? 'error' : strtolower($type);
        $base_qname = QNameManagerService::model()->get_base_qname($type);
        $zone = $this->select_redis($base_qname);
        $key = 'queue_'.$base_qname;
        $len = 0;
        if($zone) {
            $len = $zone['redis']->lLen($key);
        } else {
            EdjLog::error("can not find zone, queue name: " . $base_qname);
        }
        return $len;
    }
}

五、异步Job队列消费

<?php
/**
 * 队列处理
 */
Yii::import("application.ecenter.service.HttpUtils");
class QueueProcess {
    private static $_models;
    private $message;

    public static function model($className=__CLASS__) {
        $model=null;
        if (isset(self::$_models[$className]))
            $model=self::$_models[$className];
        else {
            $model=self::$_models[$className]=new $className(null);
        }
        return $model;
    }


    public function synchronize_elasticsearch($param)
    {
        if (empty($param) || !isset($param['es_source'], $param['es_action'])) {
            return false;
        }

        $class_name = $param['es_source'].'Synchronizer';
        $method_name = $param['es_action'];
        if (class_exists($class_name) && method_exists($class_name, $method_name)) {
            unset($param['es_source']);
            unset($param['es_action']);
            call_user_func(array($class_name, $method_name), $param);
        } else {
            EdjLog::error('synchronize method does not exist. class name '.$class_name.' method name '.$method_name);
        }
    }

}

六、ES信息处理操作服务层

<?php
/**
 * Created by PhpStorm.
 */

class KnowledgenewSynchronizer {

    static public $index = 'knowledge_index';
    static public $type = 'knowledge';
    static public $filed = ' id, keywords, title, content, auth_group, cid, operator, click_num, status, created, updated ';


    static public function add($param)
    {
        if (empty($param) || !isset($param['id'])) {
            return false;
        }
        $id = $param['id'];
        $sql = "select".self::$filed."from t_knowledgenew where id=:id";
        $doc = Yii::app()->db->CreateCommand($sql)->queryRow(true,array('id'=>$id));

        if (empty($doc)) {
            EdjLog::error('cannot find knowledge with id: '.$id);
            return false;
        }

        return ElasticsearchSynchronizer::addDocument(self::$index, self::$type, $id, $doc);
    }

    static public function delete($param)
    {
        if (empty($param) || !isset($param['id'])) {
            return false;
        }
        $id = $param['id'];
        return ElasticsearchSynchronizer::deleteDocument(self::$index, self::$type, $id);
    }

    static public function update($param)
    {
        if (empty($param) || !isset($param['id'])) {
            return false;
        }
        $id = $param['id'];

        $sql = "select".self::$filed."from t_knowledgenew where id=:id";
        $doc = Yii::app()->db->CreateCommand($sql)->queryRow(true,array('id'=>$id));

        if (empty($doc)) {
            EdjLog::error('cannot find knowledge with id: '.$id);
            return false;
        }

        return ElasticsearchSynchronizer::updateDocument(self::$index, self::$type, $id, $doc);
    }

}

七、ES信息处理操作Model层

<?php

use Elastica\Client;
use Elastica\Query\QueryString;
use Elastica\Query;
use Elastica\Document;

Class ElasticsearchSynchronizer
{//测试
    //const ES_HOST='search.n.edaijia.cn';
    //const ES_PORT=9200;

    static public function addDocument($index, $type, $id, $doc)
    {
        $client = new Client(array('host' => self::ES_HOST, self::ES_PORT));
        $type = $client->getIndex($index)->getType($type);
        try {
            $response = $type->addDocument(new Document($id, $doc));
            if ($response->isOk()) {
                EdjLog::info("add document $id succeeded");
                return true;
            } else {
                EdjLog::info("add document $id failed");
                return false;
            }
        } catch (Exception $e) {
            print_r($e);
            EdjLog::error("add document $id failed with exception ".$e->getMessage());
            return false;
        }
    }

    static public function updateDocument($index, $type, $id, $doc)
    {
        $client = new Client(array('host' => self::ES_HOST, 'port' => self::ES_PORT));
        try {
            $response = $client->getIndex($index)
                ->getType($type)
                ->updateDocument(new Document($id, $doc));

            if ($response->isOk()) {
                EdjLog::info("update document $id succeeded");
                return true;
            } else {
                EdjLog::info("update document $id failed");
                return false;
            }
        } catch (Exception $e) {
            EdjLog::error("update document $id failed with exception ".$e->getMessage());
            return false;
        }
    }

    static public function deleteDocument($index, $type, $id)
    {
        $client = new Client(array('host' => self::ES_HOST, 'port' => self::ES_PORT));
        try {
            $response = $client->getIndex($index)->getType($type)->deleteById($id);
            if ($response->isOk()) {
                EdjLog::info("delete document $id succeeded");
                return true;
            } else {
                EdjLog::info("delete document $id failed");
                return false;
            }
        } catch (Exception $e) {
            EdjLog::error("delete document $id failed with exception ".$e->getMessage());
            return false;
        }
    }

}

 八、查询

 /**
     * @param $keyword
     * @param int $page
     * @param int $size
     * @param str $search_type
     * 搜索知识
     * 搜索标题和内容,多个关键词是并且关系,空格分隔
     */
    public static function searchKnowledge($keyword, $page = 0, $size = 50, $search_type = 'default') {
        //对搜索关键词空格隔开
//        $keywords = explode(' ',trim($keyword));
        $start = $page * $size;
        $client = new \Elastica\Client(array('host' => ElasticsearchSynchronizer::ES_HOST, 'port' => ElasticsearchSynchronizer::ES_PORT));//更改成线上主机和端口
        $search = new \Elastica\Search($client);
        $search ->addIndex(KnowledgenewSynchronizer::$index)->addType(KnowledgenewSynchronizer::$type);
//        $query = new \Elastica\Query\Bool();
        $query = new \Elastica\Query();
        //设置必要查询
//        foreach($keywords as $word) {
//            if($word) {
//                $knowledge_query = new \Elastica\Query\QueryString();
//                $knowledge_query->setFields(array('title', 'content'));
//                $knowledge_query->setQuery('"' . $word . '"');
//                $query->addMust($knowledge_query);
//            }
//        }
        $MultiMatch_obj = new \Elastica\Query\MultiMatch();
        $MultiMatch_obj->setQuery($keyword);
        if ($search_type == 'default') {
            $MultiMatch_obj->setFields(array('keywords'));
        } else {
            $MultiMatch_obj->setTieBreaker(0.3);
            $MultiMatch_obj->setType('best_fields');
            $MultiMatch_obj->setFields(array('keywords^901209','content'));
            $MultiMatch_obj->setOperator('or');
            //这里是字符串,在es的扩展目录下 setMinimumShouldMatch方法把转int去掉//
            //$this->setParam('minimum_should_match', (int)$minimumShouldMatch);
            $MultiMatch_obj->setMinimumShouldMatch('30%');

        }
        $query->setQuery($MultiMatch_obj); //命中全部纪录
        $query = \Elastica\Query::create($query);
        $query->setSource(["id","cid","updated", "title", 'keywords',"content",'auth_group','status']);
//        $query->setSort([
//            'click_num' => ['order' => 'desc']
//        ]);


        //设置起始页
        $query->setFrom($start);
        $query->setSize($size);
        //设置高亮显示
        $query->setHighlight(array(
            'pre_tags' => array('<span style="color: red">'),
            'post_tags' => array('</span>'),
            'fields' => array(
                'title' => array(
                    'fragment_size' => 5,//包含高亮词语,最多展示多少个
                    'number_of_fragments' => 1,//分几段展示,这里一段就可以
                ),
                'keywords' => array(
                    'fragment_size' => 10,//包含高亮词语,最多展示多少个
                    'number_of_fragments' => 1,//分几段展示,这里一段就可以
                ),
                'content' => array(
                    'fragment_size' => 10,//包含高亮词语,最多展示多少个
                    'number_of_fragments' => 1,//分几段展示,这里一段就可以
                ),
            ),
        ));

        $search->setQuery($query);

        $results = array();
        $totalResults = 0;
        try {
            $resultSet = $search->search();
            $results = $resultSet->getResults();
            $totalResults = $resultSet->getTotalHits();
        } catch (Exception $e) {
            EdjLog::error("query elasticsearch failed");
        }

        if ($totalResults <= 0) {

            return;
        }

        $poi_result = array();
        foreach ($results as $result) {
            $highlight = $result->getHighlights();
            $title_hightlight = isset($highlight['title'][0])?$highlight['title'][0]:'';
            $content_hightlight = isset($highlight['content'][0])?$highlight['content'][0]:'';
            $keywords_highlight = isset($highlight['keywords'][0])?$highlight['keywords'][0]:'';

            $poi_result[] = array(
                'id' => $result->id,
                'cid' => $result->cid,
                'title' => $result->title,
                'keywords' => $result->keywords,
                'content' => $result->content,
                'auth_group' => $result->auth_group,
                'title_highlight'=>$title_hightlight, //高亮展示标题搜索关键词
                'keywords_highlight'=>$keywords_highlight, //高亮展示标题搜索关键词
                'content_highlight'=>$content_hightlight,//高亮展示内容搜索关键词
                'updated'=>$result->updated,
                'status' => $result->status,
            );
        }

        //这里过滤了用户非权限列表
        $poi_result = self::filterNoAuthKnowledge($poi_result);
        $totalResults = count($poi_result) ;
        $info = array('totalNum'=>$totalResults,'data'=>$poi_result);
        return json_encode($info);
    }

 九、源码包

链接:https://pan.baidu.com/s/1lVcrb50HSLrJh3zvBOdH5g 
提取码:9c9c 

 

上一篇:bili之前端超神之路jquery1003


下一篇:Python3监控IP丢包率