商品中心—11.商品B端搜索系统的实现文档

大纲

1.商品数据管理以及binlog监听

2.基于ES的商品B端搜索系统架构设计

3.商品B端搜索系统实现步骤介绍

4.步骤一:ES生产集群部署

5.步骤二:IK分词器改造和部署

6.步骤三:为商品数据设计和创建索引

7.步骤四:为商品数据生成索引

8.步骤四:基于索引实现搜索功能

9.步骤五:大数据量写入ES和搜索性能的调优

10.elasticsearch-analysis-ik⼯程的表结构

11.elasticsearch-analysis-ik⼯程的执行步骤

12.elasticsearch-analysis-ik⼯程的代码

13.demo-product-es⼯程的介绍

14.demo-product-es⼯程的商品索引

15.demo-product-es⼯程的suggest索引

16.demo-product-es⼯程的搜索相关接⼝

1.商品数据管理以及binlog监听

对单库单表的MySQL的数据进行搜索、全文检索、模糊匹配效率和性能都很差,对多库多表的MySQL的数据进行搜索就更麻烦了,所以一般会通过Canal监听MySQL的binlog + 去ElasticSearch建立索引来实现搜索。

中小型项目的ES需求,基本由"自动补全、拼音纠错、全文检索、结构化搜索"即可满足。

2.基于ES的商品B端搜索系统架构设计

当MySQL进行了插入操作时,就会产生对应的insert类型的binlog日志。这些insert类型的binlog日志会被Canal系统监听到,然后Canal系统会拿着这些binlog日志以HTTP方式向商品B端搜索系统的指定接口发起调用。接着商品B端搜索系统便能根据这条binlog日志对应的数据提取出要进行搜索的字段。

商品数据中要进行搜索的字段可能有:title、description、category等,商品B端搜索系统便会把这些用来进行搜索的字段以及商品ID写入到ElasticSearch,ElasticSearch便会根据这些内容建立倒排索引。

后续MySQL对这些商品数据进行修改时,会产生对应的update类型的binlog日志,这些update类型的binlog日志也会被Canal系统监听到。然后Canal又会通过商品B端搜索系统,根据这些商品数据修改后的内容更新ElasticSerch索引。

3.商品B端搜索系统实现步骤介绍

(1)分词器对ES来说是非常重要的

(2)实现商品B端搜索系统的关键步骤

(1)分词器对ES来说是非常重要的

在ES里建立索引时,都是根据文本内容类型的字段来建立索引的。而这些字段如果是中文,就要使用中文分词器来进行分词。所以在落地搜索系统时,中文分词器的实现是至关重要的,这个是必须要做的。

(2)实现商品B端搜索系统的关键步骤

步骤一:需要搭建和部署一套ES生产集群。

步骤二:需要给ES集群各个节点去安装部署我们自己开发好的一套中文分词器。

步骤三:需要分析商品数据模型,针对商品数据模型去设计ES里的索引数据结构。所以需要创建好商品索引、设计好索引结构、索引里指定的字段必须用我们的分词器。除了商品核心索引外,还有用来辅助用户实现搜索提示和自动纠错的suggest索引。

步骤四:需要实现数据写入ES建立索引的接口、实现基于ES索引进行搜索的接口。在搜索框里进行输入时,实现自动补全、自动纠错、自动推荐。搜索接口分为全文检索和结构化搜索:全文检索就是根据关键词在索引里进行搜索,结构化搜索就是根据一些固定的条件去搜索商品(比如商品品牌、销售属性、颜色等固定字段)。

步骤5:需要测试ES写入性能和ES优化、搜索接口的性能测试和优化。大量数据瞬时高并发写入的性能测试和优化,海量数据搜索时的性能测试和优化。

4.步骤一:ES生产集群部署

(1)OS内核参数优化

(2)ES生产集群部署之三节点配置

(3)ES生产集群部署之Kibana监控

经典的ES生产集群配置:ES的3节点都用8核16G + 一台2核4G的用于可视化监控的Kibana。

ES生产集群部署不会使用低配置的机器,而是使用配置高一点的机器。因为需要对这些机器进行亿级数据量下的高并发写入测试和搜索测试。这里则使用了3台阿里云上标准的8核16G的机器,来部署ES生产集群。测试时选用的硬盘容量几十G也可以,毕竟1亿的数据量使用40G磁盘也可满足了。

一般部署MySQL、RocketMQ、Redis、ES等中间件和基础系统,会使用8核16G+的机器。一般部署普通业务系统,则使用4核8G甚至2核4G的机器。

(1)OS内核参数优化

nproc表示单个用户可以打开的进程数量
nofile表示单个进程可以打开的文件句柄的数量
max_map_count表示单个进程可以拥有的虚拟内存数量
$ vi /etc/security/limits.conf (退出当前⽤户重新登录后⽣效)
* soft nofile 65535
* hard nofile 65535
* soft nproc 4096
* hard nproc 4096

$ vi /etc/sysctl.conf (然后执⾏sysctl -p这条命令让配置)
vm.max_map_count=262144

(2)ES生产集群部署之三节点配置

一.配置elasticsearch.yml文件

需要注意配置打开对ES节点的监控,以便后续ES节点的监控数据可以传输给ELK的Kibana。通过Kibana把监控数据做一个展示,这样就可以看到ES集群各个节点完整的监控数据。

$ cd /app/elasticsearch/elasticsearch-7.9.3/config
$ echo'' > elasticsearch.yml (默认的配置⽂件⾥⾯的内容全都被注释了,就把它清空了,重新写配置⽂件)
$ vi elasticsearch.yml
# 集群名称
cluster.name: escluster

# 节点名称
node.name: esnode1

# 节点⻆⾊
node.master: true
node.data: true

# 最⼤的节点数
node.max_local_storage_nodes: 3

# 绑定的ip地址
network.host: 0.0.0.0

# 对外的端⼝
http.port: 9300

# 节点之间通信的端⼝
transport.tcp.port: 9800

# 节点发现和集群选举
discovery.seed_hosts: ["172.19.16.132:9800","172.19.16.133:9800","172.19.16.134:9800"]
cluster.initial_master_nodes: ["esnode1", "esnode2","esnode3"]

# 数据⽬录和⽇志⽬录 
path.data: /app/elasticsearch/data
path.logs: /app/elasticsearch/log

# 配置了之后到时候kibana上可以看堆栈监控数据
xpack.monitoring.enabled: true
xpack.monitoring.collection.enabled: true

二.配置jvm.options文件

还有至关重要的配置是JVM配置,ES节点会以JVM进程的形式运行。一般会把机器的一半内存分配给JVM,另一半内存留给OS Cache。这样可以让ES中索引文件的数据尽可能多地驻留在OS Cache内存里,搜索时尽量从OS Cache里搜索,从而提升搜索的性能。现在机器是16G的,也就是8G内存给JVM,8G内存给OS Cache。

$ cd /app/elasticsearch/elasticsearch-7.9.3/config 
$ vi jvm.options
-Xms8g
-Xmx8g

(3)ES生产集群部署之Kibana监控

部署Kibana的机器配置只需要低配的2核4G即可,因为只用来做展示而已,Kibana也会以JVM进程的方式来运行。修改kibana.yml配置文件:

$ echo '' > kibana.yml 
$ vi kibana.yml
# kibana的端⼝
server.port:9000

# 绑定的地址 
server.host:"0.0.0.0"

# es的地址 
elasticsearch.hosts: ["http://106.14.80.207:9300", "http://139.196.198.62:9300", "http://139.196.231.156:9300"]

# 显示的语⾔
i18n.locale: "zh-CN"

5.步骤二:IK分词器改造和部署

(1)IK分词器词库热刷新机制介绍

(2)IK分词器源码改造流程步骤

(3)IK分词器定时加载词库实现

(4)在ES生产集群中安装IK分词器

(5)在ES生产集群中安装拼音分词器

(1)IK分词器词库热刷新机制介绍

原生开源的IK分词器不太好用,一般都要对IK分词器进行改造,基于IK分词器的源码进行二次开发。

要进行中文分词,首先就需要有中文词库。比如对"我特别喜欢在床上看书"这句话进行分词,中文词库中有"特别"、"喜欢"、"看书"三个词。那么就会把上面这句话分成:我、特别、喜欢、在、床、上、看书。也就是句子里的词语如果在词库中找不到,那就会拆成单独的字。如果在中文词库加上"床上"这个词语,那么就可以把"床上"分出来。

因此,使用IK分词器时,一般要从数据库加载定义好的中文词库来进行初始化。之后会开启一个后台线程定时从数据库里加载最新的词库。这样当我们想要更新词库时,就可以在web界面里手工录入最新的词语到数据库。当然也可以从专门的词汇系统通过爬虫从外部的网络环境来对中文词库进行自动录入。

然后,在ES的JVM进程里会运行这个IK中文分词器的代码。此时IK中文分词器的代码就可以不断去热加载数据库里最新的词汇,这样线上运行的ES搜索集群就能随时热加载热刷新最新的中文词库了。

所以,IK分词器其实是一个Java代码包,它会嵌入到ES的JVM进程里去跑的。我们需要修改IK分词器的源码,让它在跑的时候可以从外部的MySQL里进行词库热刷新。当然,IK中文分词器在启动时也会内嵌一个自己的基础中文词库。

此外,还有停用词词库(stop words),也就是在分词的时候可以忽略掉的没意义的词汇。

(2)IK分词器源码改造流程步骤

步骤一:在IK分词器源码的pom文件中添加MySQL依赖
步骤二:添加⼀个DictLoader类,用来加载MySQL中的词库内容
步骤三:将Dictionary的私有方法getDictRoot()改成public以便能在DictLoader中调用
步骤四:Dictionary类中添加⼀个addStopWords⽅法
步骤五:在Dictionary的initial()⽅法中开启⼀个加载词库的线程
步骤六:重新打包IK分词器源码

(3)IK分词器定时加载词库实现

读取配置文件内容的最佳实践:先获取配置文件的全路径名,然后根据全路径名构建输入流,接着加载输入流到Java的配置数据对象Properties中,最后从该对象就可以获取配置值了。

当IK分词器打包后,就可以部署安装到线上各个ES的节点里。然后这个IK分词器的代码便能随着ES的JVM进程的启动来运行,于是就能够定时地去MySQL里热刷新词库和停用词。

//加载MySQL中的词库内容,单例
public class DictLoader {
    private static final Logger LOGGER = ESPluginLoggerFactory.getLogger(DictLoader.class.getName());
    private static final DictLoader INSTANCE = new DictLoader();
    private final String url;
    private final String username;
    private final String password;
    private final AtomicBoolean extensionWordFistLoad = new AtomicBoolean(false);
    private final AtomicReference<String> extensionWordLastLoadTimeRef = new AtomicReference<>(null);
    private final AtomicBoolean stopWordFistLoad = new AtomicBoolean(false);
    private final AtomicReference<String> stopWordLastLoadTimeRef = new AtomicReference<>(null);

    //单例类,构造函数是私有的
    private DictLoader() {
        //创建一个Properties配置数据对象,用来获取MySQL JDBC连接的配置
        Properties mysqlConfig = new Properties();

        //PathUtils会从指定目录下,对指定的文件名进行拼接,然后返回全路径名
        //所以这里会把"IK分词器配置目录 + jdbc.properties"拼接成"jdbc.properties的成全路径名"
        Path configPath = PathUtils.get(Dictionary.getSingleton().getDictRoot(), "jdbc.properties");

        try {
            //根据全路径名构建输入流,然后加载到mysqlConfig对象中,这样就可以从mysqlConfig对象读取配置值了
            mysqlConfig.load(new FileInputStream(configPath.toFile()));
            this.url = mysqlConfig.getProperty("jdbc.url");
            this.username = mysqlConfig.getProperty("jdbc.username");
            this.password = mysqlConfig.getProperty("jdbc.password");
        } catch (IOException e) {
            throw new IllegalStateException("加载jdbc.properties配置文件发生异常");
        }

        try {
            //加载MySQL驱动的类
            Class.forName("com.mysql.cj.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException("加载数据库驱动时发生异常");
        }
    }

    public static DictLoader getInstance() {
        return INSTANCE;
    }

    public void loadMysqlExtensionWords() {
        //每次从MySQL里加载词库时会执行一条SQL语句
        //这时就必须要有一个和MySQL之间建立的网络连接,才能发送SQL语句出去

        //由于这里会每分钟执行一次SQL语句
        //所以每次执行SQL语句的时候就创建一个数据库的网络连接Connection,执行完SQL后再把该Connection释放即可

        Connection connection = null;
        Statement statement = null;
        ResultSet resultSet = null;

        String sql;

        //第一次执行时会通过CAS操作把extensionWordFistLoad变量由false改成true,并且查全量词汇
        //之后的执行,extensionWordFistLoad变量已经变为true,所以CAS操作会不成功,于是只查增量词汇
        if (extensionWordFistLoad.compareAndSet(false, true)) {
            //首次加载会从数据库查全量的词汇
            sql = "SELECT word FROM extension_word";
        } else {
            //后面按照最近的修改时间来加载增量的词
            sql = "SELECT word FROM extension_word WHERE update_time >= '" + extensionWordLastLoadTimeRef.get() + "'";
        }

        //每次生成了加载词库的SQL后,都会去设置一个本次加载的时间
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String nowString = dateFormat.format(new Date());
        //设置最近一次加载词库的时间,extensionWordLastLoadTimeRef也是Atomic变量,线程安全的
        extensionWordLastLoadTimeRef.set(nowString);

        //加载扩展词词库内容
        try {
            //使用传统的JDBC编程获取连接
            connection = DriverManager.getConnection(url, username, password);
            //创建statement
            statement = connection.createStatement();
            //执行SQL语句获取结果集
            resultSet = statement.executeQuery(sql);
            LOGGER.info("从MySQL加载extensionWord, sql={}", sql);

            Set<String> extensionWords = new HashSet<>();
            while (resultSet.next()) {
                String word = resultSet.getString("word");
                if (word != null) {
                    extensionWords.add(word);
                    //为了方便看日志,可以把加载到的扩展词全都打印出来了
                    LOGGER.info("从MySQL加载extensionWord, word={}", word);
                }
            }

            //放到字典里
            Dictionary.getSingleton().addWords(extensionWords);
        } catch (Exception e) {
            LOGGER.error("从MySQL加载extensionWord发生异常", e);
        } finally {
            //把结果集resultSet、statement、连接connection都进行释放
            if (resultSet != null) {
                try {
                    resultSet.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }

            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }

            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }
        }
    }

    public void loadMysqlStopWords() {
        //和loadMysqlExtensionWords逻辑基本一样
    }
}

(4)在ES生产集群中安装IK分词器

步骤一:首先下载elasticsearch-analysis-ik的发⾏版,比如7.9.3版本
步骤二:然后在ES节点机器上的elasticserach的plugins⽬录下创建⼀个ik⽬录
步骤三:接着把elasticsearch-analysis-ik-7.9.3.zip包上传到ik⽬录,然后解压
步骤四:然后⽤打好的elasticsearch-analysis-ik-7.9.3.jar包替换掉解压得到的同名jar包
步骤五:接着把mysql-connector-java-8.0.20.jar包也上传到ik⽬录
步骤六:最后在ik/config⽬录⾥⾯添加⼀个jdbc.properties⽂件

这样就完成了在一个ES节点上安装IK分词器了。

之后,重启ES集群的三个节点:

$ ps -ef | grep elasticsearch
$ kill 2989
$ echo '' > /app/elasticsearch/log/escluster.log
$ /app/elasticsearch/elasticsearch-7.9.3/bin/elasticsearch -d
$ tail -f /app/elasticsearch/log/escluster.log

(5)在ES生产集群中安装拼音分词器

拼音分词器是专门和IK分词器配合使用来更好实现中文搜索的。如果用户在搜索框输入一些拼音,那么拼音分词器就可以帮助ES返回拼音对应的结果。

步骤一:首先下载elasticsearch-analysis-pinyin的发⾏版,比如版本为7.9.3
步骤二:在ES节点机器上的elasticserach的plugins⽬录下创建⼀个pinyin⽬录
步骤三:把elasticsearch-analysis-pinyin-7.9.3.zip包上传到ik⽬录,然后解压
步骤四:最后重启ES集群的各个节点,这样拼音分词器便安装好了

6.步骤三:为商品数据设计和创建索引

(1)对商品的核心数据模型进行分析

(2)生产项目中的搜索分词器方案

(3)商品和suggest索引的设计与创建

索引会包括:商品核心索引 + suggest索引

(1)对商品的核心数据模型进行分析

商品数据中用于搜索的核心字段如下:

一.skuName商品名称
商品名称是⼀个字符串,我们要对商品名称进⾏全⽂检索。

二.skuId商品id
商品id⼀般是⼀个long类型的数字。

三.category商品分类
商品分类是⼀个字符串;我们不会对商品分类做全⽂检索,⽽是对商品分类做精准匹配。

四.basePrice商品价格 | vipPrice会员价格 | saleCount销量 | commentCount评论数
这⼏个字段都是数字。

五.skuImgUrl商品图⽚
商品图⽚⼀个图⽚的url地址,我们不会对这个字段做任何搜索操作,也不需要索引这个字段。

六.createTime创建时间 | updateTime修改时间

(2)生产项目中的搜索分词器方案

一.IK分词器的算法类型

创建索引时,要设置具体的分片数量,分片数量可以参考ES集群有几个节点。比如每个节点可以有一个shard数据分片,每个shard数据分片可以有1个副本。下面对IK分词器的两个分词算法类型ik_max_word和ik_smart进行简单介绍。

ik_max_word:会对一个词尽可能多地进行拆分,让该词汇可以匹配的搜索词尽可能多。

ik_smart:会对一个词尽可能精准地进行拆分,也就是让该词汇可以匹配的搜索词尽可能少但精准。

可以理解前者细粒度、后者粗粒度。

二.IK分词器的算法使用方案

在生产中使用IK中文分词器时,会分不同的场景来使用这两种分词算法类型。

场景一:在写入数据场景,会使用ik_max_word去建立索引。从而产生非常精细化的小词汇,这样就可以对后续的搜索词匹配,提供更多的选择。

场景二:在对字段进行搜索的场景,传入的搜索词就会使用ik_smart来进行分词。对搜索词进行精准拆分,那么按这些拆分后的词去匹配,搜出来的结果才能更准确。

这种方案具体的实现就是如下所示:

"skuName":{
    "type":"text",
    "analyzer":"ik_max_word",
    "search_analyzer":"ik_smart"
}

(3)商品和suggest索引的设计与创建

一.elasticsearch数据类型说明

数据类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/mapping-types.html

text数据类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/text.html

keyword数据类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/keyword.html

数字类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html

时间类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html

二.数据类型简单说明

type为text时,会进行分词
并且通常指定创建索引时用ik_max_word,搜索时搜索词用ik_smart

type为keyword时,为keyword类型,不会进行分词

type为integer时,为数字类型

index为false时,表示指明不需要建立索引

三.创建商品索引的mappings

商品的索引名为:career_plan_sku_index_序号。因为需要做多次不同的测试,有的测试是使⽤不同的索引,⽽且在实现接⼝时并没有把接⼝写死,可以指定操作那个索引,所以索引后⾯就加了⼀个序号。

PUT /demo_plan_sku_index_15
{
    "settings": {
        "number_of_shards": 3,//数据分片
        "number_of_replicas": 1//分片副本
    },
    "mappings": {
        "properties": { 
            "skuId": { 
                "type": "keyword" 
            },
            "skuName": { 
                "type": "text", 
                "analyzer": "ik_max_word", 
                "search_analyzer": "ik_smart" 
            },
            "category": { 
                "type": "keyword" 
            },
            "basePrice": { 
                "type": "integer" 
            },
            "vipPrice": { 
                "type": "integer" 
            },
            "saleCount": { 
                "type": "integer" 
            },
            "commentCount": { 
                "type": "integer" 
            },
            "skuImgUrl": { 
                "type": "keyword", 
                "index": false 
            },
            "createTime": { 
                "type": "date", 
                "format": "yyyy-MM-dd HH:mm:ss" 
            },
            "updateTime": { 
                "type": "date", 
                "format": "yyyy-MM-dd HH:mm:ss" 
            } 
        } 
    } 
}

对skuName字段进行分词建立索引,是用来进行全文检索的。对分类、价格、销售数、评论数、时间这些字段建立索引,是用来进行结构化搜索的。结构化搜索指的是,可以去查询指定分类、指定价格范围、指定销售数范围等条件进行搜索。

四.创建suggest索引

suggest索引是用来实现用户输入自动补全、拼写纠错、搜索推荐(搜不出结果时推荐其他数据)的。suggest索引一共有两个字段:word1字段是用来做自动补全,word2字段是用来做拼写纠错和搜索推荐的。word1字段的类型会使用completion数据类型,该数据类型具体信息可以参考如下文档:

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/mapping-types.html

下面在创建suggest索引时自定义了一个analyzer:ik_and_pinyin_analyzer。该analyzer同时使用了IK分词器和拼音分词器,这样用户输入汉字和拼音时都能做自动补全。

PUT /career_plan_sku_suggest_15 
{ 
    "settings": { 
        "number_of_shards": 3, 
        "number_of_replicas": 1, 
        "analysis": { 
            "analyzer": { 
                "ik_and_pinyin_analyzer": { 
                    "type": "custom", 
                    "tokenizer": "ik_smart", 
                    "filter": "my_pinyin" 
                } 
            },
            "filter": { 
                "my_pinyin": { 
                    "type": "pinyin", 
                    "keep_first_letter": true, 
                    "keep_full_pinyin": true, 
                    "keep_original": true, 
                    "remove_duplicated_term": true 
                } 
            } 
        } 
    },
    "mappings": { 
        "properties": { 
            "word1": { 
                "type": "completion", 
                "analyzer": "ik_and_pinyin_analyzer" 
            },
            "word2": { 
                "type": "text" 
            } 
        } 
    } 
}

7.步骤四:为商品数据生成索引

(1)往ES索引中写入模拟数据简介

(2)单线程模式bulk批量写入商品数据

(3)多线程并发大批量写入商品数据实现

(4)多批次有限线程池导入ES代码分析

(5)全量与增量商品数据导入ES生产方案

(6)suggest索引同步写入实现

(7)全量数据双索引写入的生产代码实现

(8)商品数据写入双索引后的效果分析

(1)往ES索引中写入模拟数据简介

会有一个MockDataController往ES索引中写入模拟数据,也就是MockDataController会模拟触发商品和suggest索引数据的写入。

当触发了MockDataController的接口后,会先从文件中加载出10万条数据。之后会对内存中的这10万条数据,基于ES的API进行批量写入。

(2)单线程模式bulk批量写入商品数据

从文件中加载出10万条数据到内存后,首先会看看需要分成多少个批次来进行批量写入,以及每个批次需要写入多少条数据。然后会按照批次数batchTimes进行遍历,接着会对每个批次构建一个ES的BulkRequest对象,最后会调用RestHighLevelClient的bulk()方法将BulkRequest对象写入到ES里去。

注意:在真正在生产环境下,不可能使用单个线程处理一个一个batch写入来实现大批量数据的写入。实现这个单线程批量写入的方法主要是用来和接下来的多线程批量写入的方法进行性能对比。

@Configuration
public class ElasticSearchConfig {
    @Value("${elasticsearch.addr}")
    private String addr;

    @Bean(destroyMethod = "close")
    public RestHighLevelClient restHighLevelClient() {
        String[] segments = addr.split(",");
        HttpHost[] esNodes = new HttpHost[segments.length];
        for (int i = 0; i < segments.length; i++) {
            String[] hostAndPort = segments[i].split(":");
            esNodes[i] = new HttpHost(hostAndPort[0], Integer.parseInt(hostAndPort[1]), "http");
        }
        return new RestHighLevelClient(RestClient.builder(esNodes));
    }
}

@RestController
@RequestMapping("/api/mockData")
public class MockDataController {
    private static final String dataFileName = "100k_products.txt";

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    //单线程写入模拟的商品数据
    //https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-high-document-bulk.html
    @PostMapping("/mockData1")
    public JsonResult mockData1(@RequestBody MockData1Dto request) throws IOException {
        if (!request.validateParams()) {
            return JsonResult.buildError("参数有误");
        }

        //索引名字
        String indexName = request.getIndexName();
        //一次批量写入多少数据
        int batchSize = request.getBatchSize();
        //进行批量写入的次数
        int batchTimes = request.getBatchTimes();

        //1.从txt文件里面加载10w条商品数据,大小才13M,可以全部一次读出来
        List<Map<String, Object>> skuList = loadSkusFromTxt();

        long startTime = System.currentTimeMillis();

        //2.每次随机取出batchSize个商品数据,然后批量写入,一共执行batchTimes次
        for (int i = 0; i < batchTimes; i++) {
            //把指定的batchSize条数据打包成一个BulkRequest对象
            BulkRequest bulkRequest = buildSkuBulkRequest(indexName, batchSize, skuList);
            //然后调用ES的restHighLevelClient.bulk()接口,将BulkRequest对象写入到ES里去
            restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            log.info("写入[{}]条商品数据", batchSize);
        }

        long endTime = System.currentTimeMillis();

        //3.记录统计信息
        int totalCount = batchSize * batchTimes;
        long elapsedSeconds = (endTime - startTime) / 1000;
        long perSecond = totalCount / elapsedSeconds;
        log.info("此次共写入[{}]条商品数据,耗时[{}]秒,平均每秒导入[{}]条数据", totalCount, elapsedSeconds, perSecond);

        Map<String, Object> result = new LinkedHashMap<>();
        result.put("startTime", DateUtil.format(new Date(startTime), DatePattern.NORM_DATETIME_PATTERN));
        result.put("endTime", DateUtil.format(new Date(endTime), DatePattern.NORM_DATETIME_PATTERN));
        result.put("totalCount", totalCount);
        result.put("elapsedSeconds", elapsedSeconds);
        result.put("perSecond", perSecond);
        return JsonResult.buildSuccess(result);
    }
    ...

    //读取txt文件中的sku数据
    private List<Map<String, Object>> loadSkusFromTxt() throws IOException {
        InputStream resourceAsStream = getClass().getClassLoader().getResourceAsStream(dataFileName);
        InputStreamReader inputStreamReader = new InputStreamReader(resourceAsStream);
        BufferedReader bufferedReader = new BufferedReader(inputStreamReader);

        List<Map<String, Object>> skuList = new ArrayList<>();

        //读取文件内容(一共是10万条商品数据,总共才13MB大小),一条数据的内容如下:
        //10001,房屋卫士自流平美缝剂瓷砖地砖专用双组份真瓷胶防水填缝剂镏金色,品质建材,398.00,上海,540785126782
        String line;
        Random random = new Random();
        while ((line = bufferedReader.readLine()) != null) {
            String[] segments = line.split(",");
            int id = Integer.parseInt(segments[0]);
            String skuName = segments[1];
            String category = segments[2].replace("会场", "").replace("主会场", "").replace("风格好店", "");
            int basePrice = Integer.parseInt(segments[3].substring(0, segments[3].indexOf(".")));
            if (basePrice <= 100) {
                basePrice = 200;
            }

            //10个字段
            Map<String, Object> sku = new HashMap<>();
            sku.put("skuId", id);
            sku.put("skuName", skuName);
            sku.put("category", category);
            sku.put("basePrice", basePrice);
            sku.put("vipPrice", basePrice - 100);

            sku.put("saleCount", random.nextInt(100_000));
            sku.put("commentCount", random.nextInt(100_000));
            sku.put("skuImgUrl", "http://sku_img_url.png");
            sku.put("createTime", "2021-01-04 10:00:00");
            sku.put("updateTime", "2021-01-04 10:00:00");
            skuList.add(sku);
        }
        return skuList;
    }
    ...

    //从10万个sku里面随机选择batchSize个,然后封装成一个批量写入的BulkRequest对象
    private BulkRequest buildSkuBulkRequest(String indexName, int batchSize, List<Map<String, Object>> skuList) {
        //根据索引名称indexName创建BulkRequest对象
        BulkRequest bulkRequest = new BulkRequest(indexName);
        Random random = new Random();
        for (int j = 0; j < batchSize; j++) {
            //获取0到10万中的随机数
            int index = random.nextInt(100_000);
            //根据随机出来的index获取一条数据
            Map<String, Object> map = skuList.get(index);

            //下面List的元素大概如下所示:
            //list[0] = skuId,list[1] = xx,list[2] = skuName, list[3] = xx
            List<Object> list = new ArrayList<>();
            map.forEach((k, v) -> {
                list.add(k);
                list.add(v);
            });

            IndexRequest indexRequest = new IndexRequest().source(XContentType.JSON, list.toArray());
            bulkRequest.add(indexRequest);
        }
        return bulkRequest;
    }
    ...
}

(3)多线程并发大批量写入商品数据实现

从文件中加载出10万条数据到内存后,首先会看看需要分成多少个批次来进行批量写入,以及每个批次需要写入多少条数据。然后会按照批次数batchTimes进行遍历,遍历到的每个批次对应的批量写入都会提交一个任务到线程池去执行。接着线程池的一个线程便会对一个任务中的一个批次构建一个ES的BulkRequest对象,最后会调用RestHighLevelClient的bulk()方法将BulkRequest对象写入到ES里。

其中在遍历批次数提交任务到线程池前,会根据批次数创建一个CountdownLatch。每当线程池的一个线程执行完批量写入任务时,该CountdownLatch就会减1直到0,从而使得所有批次数的批量写入任务都完成时程序才结束。

此外,还使用了一个信号量Semaphore来控制同时执行的最多任务数。即每当要提交一个批次的批量写入任务到线程池前,都要先获取一个信号量,否则就阻塞等待。

注意:当一个线程刚刚释放完Semaphore后,还要执行下一行代码。也就是线程还没释放,此时新一个任务可能就可以获取到Semaphore并提交任务到线程池了。所以线程池实际需要执行的任务数可能会比Semaphore允许数threadCount多一点,因此才将maxCorePoolSize设置为2倍的threadCount。

@RestController
@RequestMapping("/api/mockData")
public class MockDataController {
    private static final String dataFileName = "100k_products.txt";

    @Autowired
    private RestHighLevelClient restHighLevelClient;
    ...

    //多线程写入模拟的商品数据
    @PostMapping("/mockData2")
    public JsonResult mockData2(@RequestBody MockData2Dto request) throws IOException, InterruptedException {
        if (!request.validateParams()) {
            return JsonResult.buildError("参数有误");
        }

        String indexName = request.getIndexName();
        //需要进行多少次batch批量写入
        int batchTimes = request.getBatchTimes();
        //每次batch批量写入多少条数据
        int batchSize = request.getBatchSize();
        //可以同时执行batch批量写入的线程数量
        int threadCount = request.getThreadCount();
        //读取10万条数据到内存
        List<Map<String, Object>> skuList = loadSkusFromTxt();

        //CountDownLatch:一个线程完成任务后才进行countDown,最后countDown到0时才能结束
        CountDownLatch countDownLatch = new CountDownLatch(batchTimes);

        //Semaphore:一个线程可以尝试从semaphore获取一个信号,如果获取不到就阻塞等待,获取到了,信号就是这个线程的了
        //当一个线程执行完其任务之后,会把信号还回去,所以最多只能有threadCount个线程可以获取到信号量
        Semaphore semaphore = new Semaphore(threadCount);

        //虽然semaphore可以控制线程池中同时进行的任务数,但是maximumPoolSize也不能设置的和semaphore一样的大小
        //因为线程池用了SynchronousQueue队列,可能会出现实际需要执行的任务数比semaphore允许数多一两个的情况
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            threadCount,
            threadCount * 2,
            60,
            TimeUnit.SECONDS,
            new SynchronousQueue<>()
        );

        long startTime = System.currentTimeMillis();

        //batch数量可以是比线程数量多
        for (int i = 0; i < batchTimes; i++) {
            //通过semaphore保证一直最多有threadCount个线程同时在执行批量写入的操作
            //先获取一个信号量,获取到了就提交任务到线程池执行批量写入的操作,获取不到就阻塞等待有空余的信号量
            semaphore.acquireUninterruptibly();

            threadPoolExecutor.submit(() -> {
                try {
                    BulkRequest bulkRequest = buildSkuBulkRequest(indexName, batchSize, skuList);
                    restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                    log.info("线程[{}]写入[{}]条商品数据", Thread.currentThread().getName(), batchSize);
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    //从下面两行代码就可以看到,当一个线程刚刚释放完semaphore后,还要执行下一行代码,还没释放线程
                    //这时新一个任务可能就获取到semaphore并提交到线程池了
                    //所以线程池实际需要执行的任务数可能会比semaphore的允许数threadCount多一点
                    semaphore.release();
                    countDownLatch.countDown();
                }
            });
        }

        long endTime = System.currentTimeMillis();

        //在这里等待一下最后一个批次的批量写入操作执行完
        countDownLatch.await();

        //现在的使用方式,在这里需要手动的把线程池给关掉
        threadPoolExecutor.shutdown();

        int totalCount = batchSize * batchTimes;
        long elapsedSeconds = (endTime - startTime) / 1000;
        long perSecond = totalCount / elapsedSeconds;
        log.info("此次共导入[{}]条商品数据,耗时[{}]秒,平均每秒导入[{}]条数据", totalCount, elapsedSeconds, perSecond);

        Map<String, Object> result = new LinkedHashMap<>();
        result.put("startTime", DateUtil.format(new Date(startTime), DatePattern.NORM_DATETIME_PATTERN));
        result.put("endTime", DateUtil.format(new Date(endTime), DatePattern.NORM_DATETIME_PATTERN));
        result.put("totalCount", totalCount);
        result.put("elapsedSeconds", elapsedSeconds);
        result.put("perSecond", perSecond);
        return JsonResult.buildSuccess(result);
    }
    ...

    private BulkRequest buildSkuBulkRequest(String indexName, int batchSize, List<Map<String, Object>> skuList) {
        //根据索引名称indexName创建BulkRequest对象
        BulkRequest bulkRequest = new BulkRequest(indexName);
        Random random = new Random();
        for (int j = 0; j < batchSize; j++) {
            //获取0到10万中的随机数
            int index = random.nextInt(100_000);
            //根据随机出来的index获取一条数据
            Map<String, Object> map = skuList.get(index);

            //下面List的元素大概如下所示:
            //list[0] = skuId,list[1] = xx,list[2] = skuName, list[3] = xx
            List<Object> list = new ArrayList<>();
            map.forEach((k, v) -> {
                list.add(k);
                list.add(v);
            });

            IndexRequest indexRequest = new IndexRequest().source(XContentType.JSON, list.toArray());
            bulkRequest.add(indexRequest);
        }
        return bulkRequest;
    }
    ...
}

(4)多批次有限线程池导入ES代码分析

代码执行流程如下:

(5)全量与增量商品数据导入ES生产方案

一.全量商品数据导入ES的生产方案

说明一:假设商品数据一共有1000万,那么如何进行查询,每次查多少?每条商品数据需要导入ES的只有部分字段,10万条这样的数据才10MB。这样10万的数据放到内存里加上对象额外的开销,总共才需要几十MB。所以可对1000万商品数据进行分批次查询导入处理,每次查询10万条数据,共100次。

说明二:在每次处理查询出来的10万条商品数据时,要拆分多少个Bulk来进行批量写入?这要看测试效果,看看往ES里进行一次Bulk写入时,写入多少MB的数据会比较好。ES官方建议每次的Bulk写入不超过15MB,这里假设每次Bulk写入不超过10MB。

假设10万商品数据到了内存后,由于商品数据对象的结构导致内存消耗膨胀到了需要100MB,那么这10万条商品数据可以拆分为10个Bulk进行批量写入,每个Bulk的批量写入任务都可以往线程池进行提交。

说明三:接下来需要考虑线程池究竟开启多少线程。如果将每次查询出来的10万条数据进行拆分成5个Bulk写入,那么可以让线程池开启5个线程。让这5个线程并发去进行Bulk写入即可,也就是threadCount可以设为5。

二.增量商品数据导入ES的生产方案

Canal会监听MySQL数据库的增删改binlog,然后把这些binlog发送给商品索引系统。接着商品索引系统从binlog中提取出相关的字段,构建出单个BulkRequest,然后把这单个BulkRequest直接写入到ES即可。

(6)suggest索引同步写入实现

suggest索引的同步写入和商品索引的同步写入差不多,其实每个线程在进行bulk批量写入时构建两个BulkRequest即可:一个是商品索引的BulkRequest,一个是suggest索引的BulkRequest。

关键代码如下:

private BulkRequest buildProductIndexBulkRequest(List<Map<String, Object>> bulkList) {
    BulkRequest bulkRequest = new BulkRequest("product_index");
    for (int i = 0; i < bulkList.size(); i++) {
        Map<String, Object> productDataMap = bulkList.get(i);
        List<Object> productDataList = new ArrayList<>();
        productDataMap.forEach((k, v) -> {
            productDataList.add(k);
            productDataList.add(v);
        });

        IndexRequest indexRequest = new IndexRequest().source(XContentType.JSON, productDataList.toArray());
        bulkRequest.add(indexRequest);
    }
    return bulkRequest;
}

private BulkRequest buildSuggestIndexBulkRequest(List<String> skuNameBulkList) throws Exception {
    BulkRequest bulkRequest = new BulkRequest("suggest_index");
    for (int i = 0; i < skuNameBulkList.size(); i++) {
    String skuName = skuNameBulkList.get(i);
        IndexRequest indexRequest = new IndexRequest().source(XContentType.JSON, "word1", skuName, "word2", skuName);
        bulkRequest.add(indexRequest);
    }
    return bulkRequest;
}

(7)全量数据双索引写入的生产代码实现

实现思路:进行分batch查询 + 每个batch的查询结果进行分bulk批量写 + 使用CountDownLatch等待所有bulk任务都执行完毕 + 使用Semaphore控制同时执行任务的线程数。

举个例子:一般会从数据库中查询出10w条数据作为一个batch,所以1000万的商品数据就需要100个batch批量写入。然后每次查出来的一批10万条数据,会拆分为多个bulk进行并发批量写入。比如每个bulk只有150条数据,于是有667次bulk批量写入,CountDownLatch大小为667。于是就会产生667个任务,每个任务由线程池执行完之后都会counDown()一下。

@RestController
@RequestMapping("/api/mockData")
public class MockDataController {
    @Autowired
    private RestHighLevelClient restHighLevelClient;
    ...

    //多线程向ES写入生产全量商品的双索引数据
    @PostMapping("/indexAllProductData")
    public JsonResult indexAllProductData(@RequestBody MockData2Dto request) throws IOException, InterruptedException {
        if (!request.validateParams()) {
            return JsonResult.buildError("参数有误");
        }
        //1000万除以10万=100,需要进行多少次batch批量写入
        int batchCount = 100;
        //每个批次10万条数据
        int batchSize = 100_000;
        //每次bulk批量写入ES多少条数据
        int bulkSize = request.getBatchSize();
        //每个批次需要进行多少次bulk批量写入
        int bulkCount = batchSize / bulkSize + 1;
        //可以同时执行bulk批量写入的线程数量
        int threadCount = request.getThreadCount();

        for (int batchIndex = 1; batchIndex <= batchCount; batchIndex++) {
            //一般会每次从数据库中查询出10w条数据作为一个batch
            //查出来的每一批数据,都会拆分为多个bulk进行并发批量写入
            List<Map<String, Object>> batchList = queryProductBatchFromDatabase(batchIndex, batchSize);

            CountDownLatch countDownLatch = new CountDownLatch(bulkCount);
            Semaphore semaphore = new Semaphore(threadCount);

            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                threadCount,
                threadCount * 2,
                60,
                TimeUnit.SECONDS,
                new SynchronousQueue<>()
            );

            int bulkDataCurrentIndex = 0;
            for (int bulkIndex = 1; bulkIndex <= bulkCount; bulkIndex++) {
                List<Map<String, Object>> bulkList = new ArrayList<Map<String, Object>>();
                List<String> skuNameBulkList = new ArrayList<String>();
                for (int bulkDataIndex = bulkDataCurrentIndex; bulkDataIndex < bulkDataCurrentIndex + bulkSize; bulkDataIndex++) {
                    if (batchList.get(bulkDataIndex) == null) {
                        //外层循环不断break,否则CountDownLatch会少countDown()而一直等待
                        break;
                    }
                    bulkList.add(batchList.get(bulkDataIndex));
                    skuNameBulkList.add(String.valueOf(batchList.get(bulkDataIndex).get("skuName")));
                }

                bulkDataCurrentIndex += bulkSize;
                semaphore.acquireUninterruptibly();

                threadPoolExecutor.submit(() -> {
                    try {
                        if (bulkList.size() > 0) {
                            BulkRequest productIndexBulkRequest = buildProductIndexBulkRequest(bulkList);
                            restHighLevelClient.bulk(productIndexBulkRequest, RequestOptions.DEFAULT);
                        }
                        if (skuNameBulkList.size() > 0) {
                            BulkRequest suggestIndexBulkRequest = buildSuggestIndexBulkRequest(skuNameBulkList);
                            restHighLevelClient.bulk(suggestIndexBulkRequest, RequestOptions.DEFAULT);
                        }
                        log.info("线程[{}]写入[{}]条商品数据", Thread.currentThread().getName(), bulkList.size());
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                        countDownLatch.countDown();
                    }
                });
            }

            countDownLatch.await();
            threadPoolExecutor.shutdown();
        }

        long endTime = System.currentTimeMillis();
        int totalCount = batchSize * batchCount;
        long elapsedSeconds = (endTime - startTime) / 1000;
        long perSecond = totalCount / elapsedSeconds;
        log.info("此次共导入[{}]条商品数据,耗时[{}]秒,平均每秒导入[{}]条数据", totalCount, elapsedSeconds, perSecond);

        Map<String, Object> result = new LinkedHashMap<>();
        result.put("startTime", DateUtil.format(new Date(startTime), DatePattern.NORM_DATETIME_PATTERN));
        result.put("endTime", DateUtil.format(new Date(endTime), DatePattern.NORM_DATETIME_PATTERN));
        result.put("totalCount", totalCount);
        result.put("elapsedSeconds", elapsedSeconds);
        result.put("perSecond", perSecond);
        return JsonResult.buildSuccess(result);
    }

    private List<Map<String, Object>> queryProductBatchFromDatabase(int batchIndex, int batchSize) {
        //根据第几个batch,每个batch多少条数据,对数据库发起sql查询,把一批一批的数据查出来
        return new ArrayList<Map<String, Object>>();
    }
    ...
}

(8)商品数据写入双索引后的效果分析

把数据写入ES,无非就是让ES为数据建立倒排索引和正排索引。倒排索引是用来进行全文检索的,正排索引是用来进行结构化搜索和数据分析的。

比如针对商品skuName字段的检索数据,会使用ik_max_word类型尽量精细化地分词。然后基于分词建立尽可能多的倒排索引,后续就可以根据skuName来进行全文检索。当输入针对skuName的搜索词到输入框时,会使用ik_smark类型进行粗粒度分词,然后到倒排索引进行精准匹配。

8.步骤四:基于索引实现搜索功能

(1)基于suggest索引的自动补全实现

(2)输入框中的拼写纠错实现

(3)商品B端的商品搜索代码实现

(4)搜索结果为空时的自动推荐代码实现

(5)基于多条件对商品进行结构化搜索

(1)基于suggest索引的自动补全实现

实现自动补全的代码比较简单,其原理是:把搜索词汇和倒排索引里的所有前缀匹配的词条进行score比较,然后把分数最高的那些返回,其中会涉及到suggest索引的word1(IK分词器 + pinyin分词器)。

具体步骤如下:

步骤一:构建CompletionSuggestion条件
步骤二:封装搜索请求
步骤三:通过restHighLevelClient查询ElasticSearch
步骤四:获取响应中的补全的词的列表
@RestController
@RequestMapping("/api/common")
public class CommonSearchController {
    ...
    //通用服务组件
    @Autowired
    private CommonSearchService commonSearchService;

    //输入内容自动补全接口
    @GetMapping("/autoComplete")
    public JsonResult autoComplete(@RequestBody AutoCompleteRequest request) throws IOException {
        List<String> completedWords = commonSearchService.autoComplete(request);
        return JsonResult.buildSuccess(completedWords);
    }
    ...
}

@Data
public class AutoCompleteRequest {
    //索引名称
    private String indexName;
    //字段名称
    private String fieldName;
    //需要补全的词(用户输入的内容)
    private String text;
    //返回多少个补全后的词
    private int count;
}

//通用查询服务实现类
@Service
public class CommonSearchServiceImpl implements CommonSearchService {
    private static final String MY_SUGGEST = "my_suggest";

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Override
    public List<String> autoComplete(AutoCompleteRequest request) throws IOException {
        //1.构建CompletionSuggestion条件
        CompletionSuggestionBuilder completionSuggestionBuilder = SuggestBuilders.completionSuggestion(request.getFieldName());
        completionSuggestionBuilder.prefix(request.getText());
        completionSuggestionBuilder.skipDuplicates(true);
        completionSuggestionBuilder.size(request.getCount());

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
        searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, completionSuggestionBuilder));

        //2.封装搜索请求
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices(request.getIndexName());
        searchRequest.source(searchSourceBuilder);

        //3.查询ElasticSearch
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

        //4.获取响应中的补全的词的列表
        CompletionSuggestion completionSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST);
        List<CompletionSuggestion.Entry.Option> options = completionSuggestion.getEntries().get(0).getOptions();

        List<String> result = new ArrayList<>();
        for (CompletionSuggestion.Entry.Option option : options) {
            result.add(option.getText().string());
        }

        return result;
    }
    ...
}

(2)输入框中的拼写纠错实现

实现拼写纠错的代码也比较简单,其原理是:把输入的有拼写错误的搜索词汇,先自动进行纠错。然后再和倒排索引里的所有匹配的词条进行score比较,最后把分数最高的那一条返回,其中会涉及到suggest索引的word2。

具体步骤如下:

步骤一:构建PhraseSuggestion条件
步骤二:封装搜索请求
步骤三:通过restHighLevelClient查询ElasticSearch
步骤四:获取响应中纠错后的词
@RestController
@RequestMapping("/api/common")
public class CommonSearchController {
    //通用服务组件
    @Autowired
    private CommonSearchService commonSearchService;
    ...

    //输入内容拼写纠错接口
    @GetMapping("/spellingCorrection")
    public JsonResult spellingCorrection(@RequestBody SpellingCorrectionRequest request) throws IOException {
        String correctedWord = commonSearchService.spellingCorrection(request);
        return JsonResult.buildSuccess(correctedWord);
    }
}

@Data
public class SpellingCorrectionRequest {
    //索引名称
    private String indexName;
    //字段名称
    private String fieldName;
    //用户输入的内容
    private String text;
}

//通用查询服务实现类
@Service
public class CommonSearchServiceImpl implements CommonSearchService {
    private static final String MY_SUGGEST = "my_suggest";

    @Autowired
    private RestHighLevelClient restHighLevelClient;
    ...

    @Override
    public String spellingCorrection(SpellingCorrectionRequest request) throws IOException {
        //1.构建PhraseSuggestion条件
        PhraseSuggestionBuilder phraseSuggestionBuilder = new PhraseSuggestionBuilder(request.getFieldName());
        phraseSuggestionBuilder.text(request.getText());
        phraseSuggestionBuilder.size(1);

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
        searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, phraseSuggestionBuilder));

        //2.封装搜索请求
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices(request.getIndexName());
        searchRequest.source(searchSourceBuilder);

        //3.查询ElasticSearch
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

        //4.获取响应中纠错后的词
        PhraseSuggestion phraseSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST);
        List<PhraseSuggestion.Entry.Option> options = phraseSuggestion.getEntries().get(0).getOptions();

        return Optional.ofNullable(options).filter(e -> !e.isEmpty()).map(e -> e.get(0)).map(e -> e.getText().string()).orElse("");
    }
}

(3)商品B端的商品搜索代码实现

搜索流程应为:输入搜索词 -> 拼写纠错 -> 自动补全 -> 全文检索。

具体步骤如下:

步骤一:构建match条件
步骤二:设置搜索高亮配置(现在基本面向移动端,所以高亮处理也没太必要)
步骤三:设置搜索分页参数
步骤四:封装搜索请求
步骤五:调用restHighLevelClient查询ElasticSearch
步骤六:对结果进行高亮处理
@RestController
@RequestMapping("/api/product")
public class ProductSearchController {
    //商品服务组件
    @Autowired
    private ProductService productService;

    //商品全文检索接口
    @GetMapping("/fullTextSearch")
    public JsonResult fullTextSearch(@RequestBody FullTextSearchRequest request) throws IOException {
        SearchResponse searchResponse = productService.fullTextSearch(request);
        Map<String, Object> resultMap = new HashMap<>();
        SearchHit[] hits = searchResponse.getHits().getHits();
        long totalCount = searchResponse.getHits().getTotalHits().value;
        resultMap.put("hits", hits);
        resultMap.put("totalCount", totalCount);
        resultMap.put("pageNum", request.getPageNum());
        resultMap.put("pageSize", request.getPageSize());
        return JsonResult.buildSuccess(resultMap);
    }
    ...
}

@Data
public class FullTextSearchRequest {
    //索引名字
    private String indexName;
    //查询参数:key为字段的名字,value为字段的关键词,可以指定从哪些字段里检索
    private Map<String, String> queryTexts;
    //高亮字段
    private String highLightField;
    //当前页
    private int pageNum;
    //每页条数
    private int pageSize;
}

//商品查询服务实现类
@Service
public class ProductServiceImpl implements ProductService {
    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Override
    public SearchResponse fullTextSearch(FullTextSearchRequest request) throws IOException {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.trackTotalHits(true);

        //1.构建match条件
        request.getQueryTexts().forEach((field, text) -> {
            searchSourceBuilder.query(QueryBuilders.matchQuery(field, text));
        });

        //2.设置搜索高亮配置(现在基本面向移动端,所以高亮处理也没太必要)
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        highlightBuilder.field(request.getHighLightField());
        highlightBuilder.preTags("<span stype=color:red>"); //搜索结果里,商品标题和搜索词匹配的部分会显示为红色
        highlightBuilder.postTags("</span>");
        highlightBuilder.numOfFragments(0);
        searchSourceBuilder.highlighter(highlightBuilder);

        //3.设置搜索分页参数
        int from = (request.getPageNum() - 1) * request.getPageSize();
        searchSourceBuilder.from(from);
        searchSourceBuilder.size(request.getPageSize());

        //4.封装搜索请求
        SearchRequest searchRequest = new SearchRequest(request.getIndexName());
        searchRequest.source(searchSourceBuilder);

        //5.查询ElasticSearch
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

        //6.对结果进行高亮处理
        SearchHits hits = searchResponse.getHits();
        for (SearchHit hit : hits) {
            HighlightField highlightField = hit.getHighlightFields().get(request.getHighLightField());
            Map<String, Object> sourceAsMap = hit.getSourceAsMap();
            Text[] fragments = highlightField.fragments();
            StringBuilder builder = new StringBuilder();
            for (Text fragment : fragments) {
                builder.append(fragment.string());
            }
            sourceAsMap.put(request.getHighLightField(), builder.toString());
        }
        return searchResponse;
    }
    ...
}

(4)搜索结果为空时的自动推荐代码实现

如果全文检索的结果为空,那么可以继续调用自动推荐进行相似搜索。

搜索流程应为:输入搜索词 -> 拼写纠错(completion) -> 自动补全(phrase) -> 全文检索(match) -> 自动推荐(term)。

具体步骤如下:

步骤1:构建TermSuggestion条件
步骤2:封装搜索请求
步骤3:调用restHighLevelClient查询ElasticSearch
步骤4:获取响应中推荐给用户的词
@GetMapping("/recomendWhenMissing")
public JsonResult recommendWhenMissing(@RequestBody RecommendWhenMissingRequest request) throws IOException {
    String recommendWord = commonSearchService.recommendWhenMissing(request);
    return JsonResult.buildSuccess(recommendWord);
}

@Override
public String recommendWhenMissing(RecommendWhenMissingRequest request) throws IOException {
    //1.构建TermSuggestion条件
    TermSuggestionBuilder termSuggestionBuilder = new TermSuggestionBuilder(request.getFieldName());
    termSuggestionBuilder.text(request.getText());
    termSuggestionBuilder.analyzer(IK_SMART);
    termSuggestionBuilder.minWordLength(2);
    termSuggestionBuilder.stringDistance(TermSuggestionBuilder.StringDistanceImpl.NGRAM);

    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
    searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, termSuggestionBuilder));

    //2.封装搜索请求
    SearchRequest searchRequest = new SearchRequest();
    searchRequest.indices(request.getIndexName());
    searchRequest.source(searchSourceBuilder);

    //3.查询ElasticSearch
    SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

    //4.获取响应中推荐给用户的词
    TermSuggestion termSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST);
    List<TermSuggestion.Entry.Option> options = termSuggestion.getEntries().get(0).getOptions();

    return Optional.ofNullable(options).map(e -> e.get(0)).map(e -> e.getText().string()).orElse("");
}

(5)基于多条件对商品进行结构化搜索

具体步骤如下:

步骤1:解析queryDSL
步骤2:设置搜索分页参数
步骤3:封装搜索请求
步骤4:调用restHighLevelClient查询ElasticSearch
@RestController
@RequestMapping("/api/product")
public class ProductSearchController {
    //商品服务组件
    @Autowired
    private ProductService productService;
    ...

    //商品结构化搜索接口
    @GetMapping("/structuredSearch")
    public JsonResult structuredSearch(@RequestBody StructuredSearchRequest request) throws IOException {
        SearchResponse searchResponse = productService.structuredSearch(request);
        Map<String, Object> resultMap = new HashMap<>();
        SearchHit[] hits = searchResponse.getHits().getHits();
        long totalCount = searchResponse.getHits().getTotalHits().value;
        resultMap.put("hits", hits);
        resultMap.put("totalCount", totalCount);
        resultMap.put("pageNum", request.getPageNum());
        resultMap.put("pageSize", request.getPageSize());
        return JsonResult.buildSuccess(resultMap);
    }
}

@Data
public class StructuredSearchRequest {
    //索引名字
    private String indexName;
    //Query DSL:ES查询语法,是按照JSON来组织
    //按照ElasticSearch的规范写的Query DSL,是一个JSON对象
    //解析的时候转成JSON字符串,客户端API可以直接解析字符串
    private Map<String, Object> queryDsl;
    //当前页
    private int pageNum;
    //每页条数
    private int pageSize;
}

@Service
public class ProductServiceImpl implements ProductService {
    @Autowired
    private RestHighLevelClient restHighLevelClient;
    ...

    @Override
    public SearchResponse structuredSearch(StructuredSearchRequest request) throws IOException {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.trackTotalHits(true);

        //1.解析queryDSL
        String queryDsl = JSON.toJSONString(request.getQueryDsl());
        SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
        NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents());
        XContent xContent = XContentFactory.xContent(XContentType.JSON);
        XContentParser xContentParser = xContent.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, queryDsl);
        searchSourceBuilder.parseXContent(xContentParser);

        //2.设置搜索分页参数
        int from = (request.getPageNum() - 1) * request.getPageSize();
        searchSourceBuilder.from(from);
        searchSourceBuilder.size(request.getPageSize());

        //3.封装搜索请求
        SearchRequest searchRequest = new SearchRequest(request.getIndexName());
        searchRequest.source(searchSourceBuilder);

        //4.查询ElasticSearch
        return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    }
}

9.步骤五:大数据量写入ES和搜索性能的调优

(1)单线程将百万商品数据写入ES

(2)多线程将百万商品数据写入ES

(3)数据写入到ES的存储层原理简析

(4)将数据写入到ES的性能影响因素

(5)全量数据写入ES的性能调优方案

(6)百万商品数据写入ES的调优性能

(7)亿级商品数据的搜索性能测试

(8)ES搜索性能优化的方案分析

(1)单线程将百万商品数据写入ES

一.创建索引

PUT /demo_plan_sku_index_01
{ 
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
            "skuId": {
                "type": "keyword"
            },
            "skuName": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            },
            "category": {
                "type": "keyword"
            },
            "basePrice": {
                "type": "integer"
            },
            "vipPrice": {
                "type": "integer"
            },
            "saleCount": {
                "type": "integer"
            },
            "commentCount": {
                "type": "integer"
            },
            "skuImgUrl": {
                "type": "keyword",
                "index": false
            },
            "createTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
            },
            "updateTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
            }
        }
    } 
}

二.请求接口

/api/mockData/mockData1

三.请求参数

写入demo_plan_sku_index_01索引,每次批量插入1000条商品数据,一共执行1000次批量插入。

{
    "indexName":"demo_plan_sku_index_01",
    "batchSize":1000,
    "batchTimes":1000
}

四.请求响应

该次测试耗时62s,写入了100万条数据。每个线程每秒可以写入1.6万条数据,所以单线程每秒差不多执行了16个BulkRequest批量写入。60ms可以执行一次BulkRequest批量写入,每个BulkRequest会包含1000条数据。100万条数据大概会占用几百MB,所以很多数据都可以驻留在ES机器的OS Cache里,有利搜索。

{ 
    "success": true,
    "data": {
        "totalCount": 1000000,
        "elapsedSeconds": 62,
        "perSecond": 16130
    },
    "errorCode": null,
    "errorMessage": null
}

(2)多线程将百万商品数据写入ES

一.创建索引

//demo_plan_sku_index_02和demo_plan_sku_index_03一样的
PUT /demo_plan_sku_index_02
{
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
            "skuId": {
                "type": "keyword"
            },
            "skuName": { 
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            },
            "category": {
                "type":"keyword"
            },
            "basePrice": {
                "type": "integer"
            },
            "vipPrice": {
                "type": "integer"
            },
            "saleCount": {
                "type": "integer"
            },
            "commentCount": {
                "type": "integer"
            },
            "skuImgUrl": {
                "type": "keyword",
                "index": false
            },
            "createTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
            },
            "updateTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
            }
        }
    } 
}

二.请求接口

/api/mockData/mockData2

三.请求参数

操作demo_plan_sku_index_02索引,每次批量插⼊1000条商品数据,⼀共执⾏1000次批量插⼊,使⽤30个线程同时执⾏。

{
    "indexName": "demo_plan_sku_index_02",
    "batchSize": 1000,
    "batchTimes": 1000,
    "threadCount": 30
}

操作demo_plan_sku_index_03索引,每次批量插⼊1000条商品数据,⼀共执⾏1000次批量插⼊,使⽤60个线程同时执⾏。

{
    "indexName": "demo_plan_sku_index_03",
    "batchSize": 1000,
    "batchTimes": 1000,
    "threadCount": 60
}

四.请求响应

该次测试耗时11秒,每秒写入9万条数据,总共使用11秒完成100万条数据的写入。由于有30个线程在并发地一起跑,所以每个线程每秒可以写入3000条数据。即每个线程每秒能将3个BulkRequest批量写入到ES,每个BulkRequest的写入需要300ms左右。

对比单线程写入百万数据到ES时,每个线程每秒可以写入1.6万条数据。而这里多线程写入百万数据到ES时,每个线程每秒才写入3000天数据。

可见,并不是线程数量越多越好。线程数量越多会导致对CPU负载和消耗越大,要耗费更多时间进行线程上下文切换。CPU负载高了之后,线程处理同样的任务,吞吐量和速率会下降。CPU只要不超过80%,其实都可以接受。

//下面是30个线程时的响应
{
    "success": true,
    "data": {
        "totalCount": 1000000,
        "elapsedSeconds": 11,
        "perSecond": 90909
    },
    "errorCode": null,
    "errorMessage": null
}
//下面是60个线程时的响应
{
    "success": true,
    "data": {
        "totalCount": 1000000,
        "elapsedSeconds": 10,
        "perSecond": 100000
    },
    "errorCode": null,
    "errorMessage": null
}

总结:多线程 + Bulk批量写入,10秒就可以完成百万级数据的写入。会有一个最佳的线程数,超过这个临界点,线程数越多反而效果会下降。

(3)数据写入到ES的存储层原理简析

首先ES会将收到的写入请求,将数据写到一个叫index buffer的JVM缓冲区中。然后会有一个线程,每隔1秒定时将这个JVM缓冲区的数据refresh刷新到OS Page Cache。当数据刷到OS Page Cache时,就可以被ES搜索到了。过一段时间后,OS Page Cache的数据会被flush到ES的磁盘文件里。

为了保证数据不丢失,会把数据也写入到内存translog里面,默认内存translog会每隔5秒进行刷盘到translog磁盘文件。

写入到单节点的数据还会进行副本复制到其他节点。

(4)将数据写入到ES的性能影响因素

因素一:refresh间隔,默认会每隔1秒刷新JVM缓冲的数据到OS Page Cache。这会影响数据写入的速度,在写入全量数据的场景,可以将间隔调大一点。比如120秒,通过减少频繁的refresh来提升性能。

因素二:副本复制会影响写入的速度。在写入全量数据的场景,同样没必要进行副本的复制。可以先将数据都写入到一个节点,之后再慢慢进行副本的复制。

因素三:index buffer的大小。在写入全量数据的场景,可以调大index buffer的大小。

因素四:translog的刷盘策略。在写入全量数据的场景,可以调整translog为异步刷盘,并且刷盘间隔调大一些。存放translog的内存大小也调大一些,让其存放更多的数据才去进行刷盘。

(5)全量数据写入ES的性能调优方案

下面这些参数的调整是针对写入全量数据的场景,全量写入完毕后应恢复原来的值。

一.调整refresh_interval参数(可以动态配置)。在全量写⼊数据的场景下,对"写⼊后1s就要能搜索到"的要求没有那么⾼。所以可以把这个值设置为120s,来减少频繁的refresh和lucene段合并⾏为。

二.调整number_of_replicas参数(可以动态配置)。ElasticSearch的副本数是可以动态调整的,写⼊时可以先把副本数设置为0,缩短数据写⼊的流程。批量导⼊完成之后,重新设置回副本数。

三.调整index_buffer_size参数。把JVM缓冲区的大小调大,可以让数据先写入到内存。避免JVM缓存区内存太小,很快写满而需要频繁刷盘。

四.调整translog参数(可以动态配置)。把translog的相关参数调大,避免尽量触发translog刷盘策略。

综上可知:首先在elasticsearch.yml中修改ES的配置,然后重启ES集群的三个节点。

$ vi /app/elasticsearch/elasticsearch-7.9.3/config/elasticsearch.yml
# 写⼊优化参数
indices.memory.index_buffer_size: 30%
indices.memory.min_index_buffer_size: 128m

然后在创建索引时对索引进行如下配置:

{
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 0,
        "index.refresh_interval": "120s",
        "index.translog.durability": "async",
        "index.translog.sync_interval": "120s",
        "index.translog.flush_threshold_size": "2048mb"
    }
}

(6)百万商品数据写入ES的调优性能

可见,调优后的写入性能提升了一倍多。完成全量数据写入ES后,就可以动态调整索引的settings来恢复默认的配置。

(7)亿级商品数据的搜索性能测试

一.全文搜索测试

请求接口:

/api/product/fullTextSearch

请求参数:

{
    "pageNum": 1,
    "pageSize": 100,
    "indexName": "demo_plan_sku_index",
    "highLightField": "skuName",
    "queryTexts": {
        "skuName": "华为⼿机"
    }
}

比如搜索"华为手机",那么首先会对搜索词使用ik_smart进行分词,分成"华为"和"手机",之后再去倒排索引里对"华为"和"手机"这两分词进行搜索。

在上亿的商品数据里进行全文检索,耗时几百ms算是很快了,符合标准。查询多次的耗时详情如下,其中匹配的文档数有35万。

二.结构化搜索测试

请求接口:

/api/product/structuredSearch

请求参数:

{
    "pageNum": 1,
    "pageSize": 100,
    "indexName": "career_plan_sku_index",
    "queryDsl": {
        "query": {
            "bool": {
                "must": [{
                    "term": {
                        "category": {
                            "value": "⼿机"
                        }
                    }
                }],
                "filter": [{
                    "range": {
                        "basePrice": {
                            "gte": 1000,
                            "lte": 3000
                        }
                    }
                }]
            }
        },
        "sort": [{
            "basePrice": {
                "order":"desc"
            }
        }]
    } 
}

比如搜索手机分类下的商品按某价格区间倒序排列,刚开始需要花几秒。因为首先根据分类和价格区间去索引里查找数据,之后还需要按照价格排序。排序的过程可能会导致大量数据从磁盘读入内存,再写入临时磁盘文件进行排序,排序之后还需要分页提取。所以第一次整个过程比较慢。

后续再次搜索时,大量数据已经读入内存,不用再去进行磁盘IO了,所以会变快。查询多次的耗时详情如下,其中匹配的文档数有35万。

(8)ES搜索性能优化的方案分析

ES的性能是特别棒的,在合理的机器配置下,其实是不怎么需要做优化的。当我们的业务遇到查询瓶颈时再根据业务场景的特点从以下⼏点看看哪个能再去优化。而且ES比较适合全文检索,根据分词进行匹配打分排序,在上亿数据量之下也有非常好的搜索效果。但是ES面对结构化搜索则不稳定,使用多个条件来进行查询、按照指定条件进行排序,可能性能很差。因为其中可能会命中大量数据,然后产生大量的临时磁盘IO。

一.ES的查询缓存会保存在OS内存中。所以需要给操作系统的内存保留足够空间,不过一般都会把机器内存的一半给JVM,另一半给OS Cache。

二.磁盘IO性能和CPU性能。对于普通的搜索,磁盘IO的性能最影响搜索性能。对与计算⽐较多的搜索,CPU的性能会是⼀个瓶颈。

三.建立预索引Pre-Index。适⽤于数字类型的字段和经常做范围搜索的场景,比如可以把数字类型的字段转换成keyword类型的字段,把range查询转换为terms查询。

四.把long类型的skuID设置为keyword类型

五.强制合并一些只读的索引,避免从多个磁盘文件去搜索。

总结:最关键的其实是给OS Cache多预留一些内存,尽量让节点的数据都能加载到内存里。比如节点是32G内存的,16G给JVM,16G给OS Cache,然后节点的数据也控制好在16G内。否则如果OS Cache才16G,但节点的数据已经几百G了,那搜索时必然会进行大量的磁盘IO。也就是要想让ES提升搜索性能,主要靠将数据驻留在OS Cache里。所以要用大内存机器部署ES节点,尽量让每个节点上的主shard的数据量和OS Cache的内存量差不多。这样在搜索时,尽可能去OS Cache里查询数据,从而避免进行磁盘IO。

10.elasticsearch-analysis-ik⼯程的表结构

⼀共有两张表:extension_word扩展词库表,stop_word停⽤词库表。

CREATE TABLE `extension_word` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `word` varchar(64) NOT NULL,
    `create_time` datetime NOT NULL,
    `update_time` datetime NOT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `stop_word` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `word` varchar(64) NOT NULL,
    `create_time` datetime NOT NULL,
    `update_time` datetime NOT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

11.elasticsearch-analysis-ik⼯程的执行步骤

步骤一:读取数据库连接配置⽂件

步骤二:连接数据库

步骤三:查询扩展词库表和停⽤词库表

步骤四:添加到字典中

步骤五:使⽤⼀个线程周期性执⾏上⾯2-4步

12.elasticsearch-analysis-ik⼯程的代码

(1)添加的DictLoader类

(2)修改自带的Dictionary类

(1)添加的DictLoader类

代码位置:

org.wltea.analyzer.dic.DictLoader
//加载MySQL中的词库内容,单例
public class DictLoader {
    private static final Logger LOGGER = ESPluginLoggerFactory.getLogger(DictLoader.class.getName());
    private static final DictLoader INSTANCE = new DictLoader();
    private final String url;
    private final String username;
    private final String password;
    private final AtomicBoolean extensionWordFistLoad = new AtomicBoolean(false);
    private final AtomicReference<String> extensionWordLastLoadTimeRef = new AtomicReference<>(null);
    private final AtomicBoolean stopWordFistLoad = new AtomicBoolean(false);
    private final AtomicReference<String> stopWordLastLoadTimeRef = new AtomicReference<>(null);

    //单例类,构造函数是私有的
    private DictLoader() {
        //创建一个Properties配置数据对象,用来获取MySQL JDBC连接的配置
        Properties mysqlConfig = new Properties();
        //PathUtils会从指定目录下,对指定的文件名进行拼接,然后返回全路径名
        //所以这里会把"IK分词器配置目录 + jdbc.properties"拼接成"jdbc.properties的成全路径名"
        Path configPath = PathUtils.get(Dictionary.getSingleton().getDictRoot(), "jdbc.properties");
        try {
            //根据全路径名构建输入流,然后加载到mysqlConfig对象中,这样就可以从mysqlConfig对象读取配置值了
            mysqlConfig.load(new FileInputStream(configPath.toFile()));
            this.url = mysqlConfig.getProperty("jdbc.url");
            this.username = mysqlConfig.getProperty("jdbc.username");
            this.password = mysqlConfig.getProperty("jdbc.password");
        } catch (IOException e) {
            throw new IllegalStateException("加载jdbc.properties配置文件发生异常");
        }

        try {
            //加载MySQL驱动的类
            Class.forName("com.mysql.cj.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException("加载数据库驱动时发生异常");
        }
    }

    public static DictLoader getInstance() {
        return INSTANCE;
    }

    public void loadMysqlExtensionWords() {
        //每次从MySQL里加载词库时会执行一条SQL语句
        //这时就必须要有一个和MySQL之间建立的网络连接,才能发送SQL语句出去

        //由于这里会每分钟执行一次SQL语句
        //所以每次执行SQL语句的时候就创建一个数据库的网络连接Connection,执行完SQL后再把该Connection释放即可

        Connection connection = null;
        Statement statement = null;
        ResultSet resultSet = null;

        String sql;

        //第一次执行时会通过CAS操作把extensionWordFistLoad变量由false改成true,并且查全量词汇
        //之后的执行,extensionWordFistLoad变量已经变为true,所以CAS操作会不成功,于是只查增量词汇
        if (extensionWordFistLoad.compareAndSet(false, true)) {
            //首次加载会从数据库查全量的词汇
            sql = "SELECT word FROM extension_word";
        } else {
            //后面按照最近的修改时间来加载增量的词
            sql = "SELECT word FROM extension_word WHERE update_time >= '" + extensionWordLastLoadTimeRef.get() + "'";
        }

        //每次生成了加载词库的SQL后,都会去设置一个本次加载的时间
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String nowString = dateFormat.format(new Date());
        //设置最近一次加载词库的时间,extensionWordLastLoadTimeRef也是Atomic变量,线程安全的
        extensionWordLastLoadTimeRef.set(nowString);

        //加载扩展词词库内容
        try {
            //使用传统的JDBC编程获取连接
            connection = DriverManager.getConnection(url, username, password);
            //创建statement
            statement = connection.createStatement();
            //执行SQL语句获取结果集
            resultSet = statement.executeQuery(sql);
            LOGGER.info("从MySQL加载extensionWord, sql={}", sql);

            Set<String> extensionWords = new HashSet<>();
            while (resultSet.next()) {
                String word = resultSet.getString("word");
                if (word != null) {
                    extensionWords.add(word);
                    //为了方便看日志,可以把加载到的扩展词全都打印出来了
                    LOGGER.info("从MySQL加载extensionWord, word={}", word);
                }
            }

            //放到字典里
            Dictionary.getSingleton().addWords(extensionWords);
        } catch (Exception e) {
            LOGGER.error("从MySQL加载extensionWord发生异常", e);
        } finally {
            //把结果集resultSet、statement、连接connection都进行释放
            if (resultSet != null) {
                try {
                    resultSet.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }

            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }

            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }
        }
    }

    public void loadMysqlStopWords() {
        Connection connection = null;
        Statement statement = null;
        ResultSet resultSet = null;

        String sql;
        if (stopWordFistLoad.compareAndSet(false, true)) {
            sql = "SELECT word FROM stop_word";
        } else {
            sql = "SELECT word FROM stop_word WHERE update_time >= '" + stopWordLastLoadTimeRef.get() + "'";
        }

        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String nowString = dateFormat.format(new Date());
        stopWordLastLoadTimeRef.set(nowString);

        //加载词库内容
        try {
            connection = DriverManager.getConnection(url, username, password);
            statement = connection.createStatement();
            resultSet = statement.executeQuery(sql);
            LOGGER.info("从MySQL加载stopWord, sql={}", sql);

            Set<String> stopWords = new HashSet<>();
            while (resultSet.next()) {
                String word = resultSet.getString("word");
                if (word != null) {
                    stopWords.add(word);
                    LOGGER.info("从MySQL加载stopWord,word={}", word);
                }
            }
            // 放到字典里
            Dictionary.getSingleton().addStopWords(stopWords);
        } catch (Exception e) {
            LOGGER.error("从MySQL加载extensionWord发生异常", e);
        } finally {
            if (resultSet != null) {
                try {
                    resultSet.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }

            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }

            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }
        }
    }
}

(2)修改自带的Dictionary类

代码位置:

org.wltea.analyzer.dic.Dictionary#initial

public class Dictionary {
    ...
    //词典单例实例
    private static Dictionary singleton;
    private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    ...

    //词典初始化
    //由于IK Analyzer的词典采用Dictionary类的静态方法进行词典初始化
    //只有当Dictionary类被实际调用时才会开始载入词典,这将延长首次分词操作的时间
    //该方法提供了一个在应用加载阶段就初始化字典的手段
    public static synchronized void initial(Configuration cfg) {
        if (singleton == null) {
            synchronized (Dictionary.class) {
                if (singleton == null) {
                    singleton = new Dictionary(cfg);
                    singleton.loadMainDict();
                    singleton.loadSurnameDict();
                    singleton.loadQuantifierDict();
                    singleton.loadSuffixDict();
                    singleton.loadPrepDict();
                    singleton.loadStopWordDict();

                    //在这里开启一个线程,每隔一段时间去mysql里面加载一下词库里的内容
                    new Thread(() -> {
                        while (true) {
                            try {
                                DictLoader.getInstance().loadMysqlExtensionWords();
                                DictLoader.getInstance().loadMysqlStopWords();
                                TimeUnit.SECONDS.sleep(60);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }).start();

                    if (cfg.isEnableRemoteDict()) {
                        //建立监控线程
                        for (String location : singleton.getRemoteExtDictionarys()) {
                            //10秒是初始延迟可以修改的,60是间隔时间,单位秒
                            pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
                        }
                        for (String location : singleton.getRemoteExtStopWordDictionarys()) {
                            pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
                        }
                    }
                }
            }
        }
    }
    ...
}

13.demo-product-es⼯程的介绍

(1)该⼯程⾥⾯有两个搜索相关的接⼝

(2)该工程有两个对⽤户输⼊进⾏处理的接⼝

(3)该工程有三个初始化数据的接⼝

(1)该⼯程⾥⾯有两个搜索相关的接⼝

一.全⽂搜索接⼝

二.结构化查询接⼝

(2)该工程有两个对⽤户输⼊进⾏处理的接⼝

一.输⼊内容⾃动补全接⼝

二.输⼊内容拼写纠错接⼝

(3)该工程有三个初始化数据的接⼝

一.单线程批量插⼊商品数据接⼝

二.多线程批量插⼊商品数据接⼝

三.单线程批量插⼊suggest数据接⼝

该⼯程依赖了ElasticSearch的rest⾼级客户端库:elasticsearch-rest-high-level-client,所有对ElasticSearch的操作都是通过rest⾼级客户端库来完成的。

14.demo-product-es⼯程的商品索引

(1)索引结构

(2)数据类型说明

(3)使⽤的数据类型说明

商品索引⽤来存储所有的商品信息。

(1)索引结构

商品模型的字段以满⾜测试需要为主不复杂,⼀共有10个字段。商品的索引名为:demo_plan_sku_index_序号。因为需要做多次不同的测试,有的测试是使⽤不同的索引,⽽且在实现接⼝时并没有把接⼝写死,可以指定操作那个索引,所以索引后⾯加了⼀个序号。

索引的mappings如下:

PUT /demo_plan_sku_index_15 { 
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
            "skuId": {
                "type": "keyword"
            },
            "skuName": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            },
            "category": {
                "type": "keyword"
            },
            "basePrice": {
                "type": "integer"
            },
            "vipPrice": {
                "type": "integer"
            },
            "saleCount": {
                "type": "integer"
            },
            "commentCount": {
                "type": "integer"
            },
            "skuImgUrl": {
                "type": "keyword",
                "index": false
            },
            "createTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
            },
            "updateTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
            }
        }
    } 
}

(2)数据类型说明

elasticsearch相关⽂档链接:

数据类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/mapping-types.html

text数据类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/text.html

keyword数据类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/keyword.html

数字类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html

时间类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html

(3)使⽤的数据类型说明

一.skuName商品名称

商品名称是⼀个字符串。我们要对商品名称进⾏全⽂检索,所以skuName字段使⽤了text类型。⽤analyzer指定使⽤ik_max_word分词器,这样在保存数据时商品名称会被尽可能多的分为多个词。⽤search_analyzer指定搜索时使⽤ik_smart分词器,这样尽可能做更符合⽤户期望的分词。

二.skuId商品id

商品id⼀般是⼀个long类型的数字。我们可以使⽤ElasticSearch的数字类型,但是我们使⽤的是keyword类型。因为⽂档⾥建议:如果没有要范围查询场景,且期望查询速度更快,数字类型的字段应使⽤keyword类型。对于商品id来说,正好是⽂档中所说的情况。

⽂档链接:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html

三.category商品分类

商品分类是⼀个字符串。我们不会对商品分类做全⽂检索,⽽是对商品分类做term精准匹配的操作,所以使⽤keyword类型。

四.basePrice商品价 | vipPrice商品会员价 | saleCount商品销量 | commentCount商品评论数

这⼏个字段都是数字。对于数字类型字段,⽂档中提到应在满⾜使⽤场景要求的情况下使⽤占⽤空间更⼩的类型,这⾥我们都使⽤Integer类型。

⽂档链接:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html

五.skuImgUrl商品图⽚

商品图⽚是⼀个图⽚的url地址。我们不会对这个字段做任何搜索操作,也不需要索引这个字段,所以使⽤了index:false 指定了不要索引这个字段。

⽂档链接:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/keyword.html

六.createTime创建时间和updateTime修改时间

这两个字段是时间类型的字段,对应的ElasticSearch类型为date,然后使⽤了format指定了时间的格式。

15.demo-product-es⼯程的suggest索引

(1)索引结构

(2)数据类型说明

suggest索引⽤来存储和⽤户输⼊⾃动补全、拼写纠错、搜索推荐相关的数据的索引。这里的搜索推荐指的是:当没有⽤户要搜索的商品时推荐其他的商品。

(1)索引结构

⼀共有两个字段:word1是⽤来做⾃动补全的,word2是⽤来做拼写纠错和搜索推荐的。

索引的mapping如下:

PUT /demo_plan_sku_suggest_15
{
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1,
        "analysis": {
            "analyzer": {
                "ik_and_pinyin_analyzer": {
                    "type": "custom",
                    "tokenizer": "ik_smart",
                    "filter": "my_pinyin"
                }
            },
            "filter": {
                "my_pinyin": {
                    "type": "pinyin",
                    "keep_first_letter": true,
                    "keep_full_pinyin": true,
                    "keep_original": true,
                    "remove_duplicated_term": true
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "word1": {
                "type": "completion",
                "analyzer": "ik_and_pinyin_analyzer"
            },
            "word2": {
                "type": "text"
            }
        }
    } 
}

(2)数据类型说明

word1⽤来做⾃动补全的,ElasticSearch中有专⻔对应的completion数据类型。

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/mapping-types.html

在上⾯创建索引时我们⾃⼰定义了⼀个analyzer:ik_and_pinyin_analyzer,这个analyzer同时使⽤了ik分词器和pinyin分词器,这样⽤户输⼊汉字或者拼⾳的时候都能做⾃动补全。

16.demo-product-es⼯程的搜索相关接⼝

(1)全⽂搜索接⼝

(2)结构化查询接口

(3)用户输入自动补全处理接口

(4)输⼊内容拼写纠错接⼝

(5)输入内容推荐接口

(6)单线程批量插⼊商品数据接⼝

(7)多线程批量插⼊商品数据接⼝

(8)单线程批量插⼊suggest数据接⼝

(1)全⽂搜索接⼝

一.描述

按照⽤户输⼊的关键词在商品索引中以match query的⽅式搜索符合条件的商品。

二.controller实现

代码位置:

com.dem.elasticsearch.controller.ProductSearchController#fullTextSearch
//商品全文检索接口
@GetMapping("/fullTextSearch")
public JsonResult fullTextSearch(@RequestBody FullTextSearchRequest request) throws IOException {
    SearchResponse searchResponse = productService.fullTextSearch(request);

    Map<String, Object> resultMap = new HashMap<>();
    SearchHit[] hits = searchResponse.getHits().getHits();
    long totalCount = searchResponse.getHits().getTotalHits().value;
    resultMap.put("hits", hits);
    resultMap.put("totalCount", totalCount);
    resultMap.put("pageNum", request.getPageNum());
    resultMap.put("pageSize", request.getPageSize());

    return JsonResult.buildSuccess(resultMap);
}

三.参数说明

四.返回值说明

五.Service实现

代码位置:

com.demo.elasticsearch.service.impl.ProductServiceImpl#fullTextSearch
@Override
public SearchResponse fullTextSearch(FullTextSearchRequest request) throws IOException {
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.trackTotalHits(true);

    //1.构建match条件
    request.getQueryTexts().forEach((field, text) -> {
        searchSourceBuilder.query(QueryBuilders.matchQuery(field, text));
    });

    //2.设置搜索高亮配置(现在基本面向移动端,所以高亮处理也没太必要)
    HighlightBuilder highlightBuilder = new HighlightBuilder();
    highlightBuilder.field(request.getHighLightField());
    highlightBuilder.preTags("<span stype=color:red>"); //搜索结果里,商品标题和搜索词匹配的部分会显示为红色
    highlightBuilder.postTags("</span>");
    highlightBuilder.numOfFragments(0);
    searchSourceBuilder.highlighter(highlightBuilder);

    //3.设置搜索分页参数
    int from = (request.getPageNum() - 1) * request.getPageSize();
    searchSourceBuilder.from(from);
    searchSourceBuilder.size(request.getPageSize());

    //4.封装搜索请求
    SearchRequest searchRequest = new SearchRequest(request.getIndexName());
    searchRequest.source(searchSourceBuilder);

    //5.查询ElasticSearch
    SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

    //6.对结果进行高亮处理
    SearchHits hits = searchResponse.getHits();
    for (SearchHit hit : hits) {
        HighlightField highlightField = hit.getHighlightFields().get(request.getHighLightField());
        Map<String, Object> sourceAsMap = hit.getSourceAsMap();
        Text[] fragments = highlightField.fragments();
        StringBuilder builder = new StringBuilder();
        for (Text fragment : fragments) {
            builder.append(fragment.string());
        }
        sourceAsMap.put(request.getHighLightField(), builder.toString());
    }

    return searchResponse;
}

六.ElasticSearch相关文档

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/query-dsl-match-query.html
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/highlighting.html
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-high-search.html
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-high-search.html#java-rest-high-search-response-search-hits
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-high-search.html#java-rest-high-search-response-highlighting

(2)结构化查询接口

一.描述

按照⽤户指定的过滤条件、排序⽅式以term query的⽅式搜索服务条件的商品。代码⾥没有写任何固定的根据某个字段做term query,或者根据某个字段的范围区间过滤、或者根据某个字段排序的逻辑,⽽是通过传递⼀个queryDSL脚本作为参数,解析这个queryDSL脚本来执⾏查询操作,这样接⼝更加抽象可以满⾜不同场景的搜索,只要前端拼接参数。

二.Controller实现

代码位置:

com.demo.elasticsearch.service.impl.ProductServiceImpl#structuredSearch
//商品结构化搜索接口
@GetMapping("/structuredSearch")
public JsonResult structuredSearch(@RequestBody StructuredSearchRequest request) throws IOException {
    SearchResponse searchResponse = productService.structuredSearch(request);
    Map<String, Object> resultMap = new HashMap<>();
    SearchHit[] hits = searchResponse.getHits().getHits();
    long totalCount = searchResponse.getHits().getTotalHits().value;

    resultMap.put("hits", hits);
    resultMap.put("totalCount", totalCount);
    resultMap.put("pageNum", request.getPageNum());
    resultMap.put("pageSize", request.getPageSize());

    return JsonResult.buildSuccess(resultMap);
}

三.参数说明

四.返回值说明

五.Service实现

代码位置:

com.demo.elasticsearch.service.impl.ProductServiceImpl#structuredSearch
@Override
public SearchResponse structuredSearch(StructuredSearchRequest request) throws IOException {
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.trackTotalHits(true);

    //1.解析queryDSL
    String queryDsl = JSON.toJSONString(request.getQueryDsl());
    SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
    NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents());
    XContent xContent = XContentFactory.xContent(XContentType.JSON);
    XContentParser xContentParser = xContent.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, queryDsl);
    searchSourceBuilder.parseXContent(xContentParser);

    //2.设置搜索分页参数
    int from = (request.getPageNum() - 1) * request.getPageSize();
    searchSourceBuilder.from(from);
    searchSourceBuilder.size(request.getPageSize());

    //3.封装搜索请求
    SearchRequest searchRequest = new SearchRequest(request.getIndexName());
    searchRequest.source(searchSourceBuilder);

    //4.查询ElasticSearch
    return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
}

(3)用户输入自动补全处理接口

一.描述

当⽤户在搜索框输⼊内容时,前端可以捕获焦点事件,调⽤此接⼝返回⽤户⼀组根据⽤户输⼊补全了的搜索词。

二.Controller实现

代码位置:

com.demo.elasticsearch.controller.CommonSearchController#autoComplete
//输入内容自动补全接口
@GetMapping("/autoComplete")
public JsonResult autoComplete(@RequestBody AutoCompleteRequest request) throws IOException {
    List<String> completedWords = commonSearchService.autoComplete(request);
    return JsonResult.buildSuccess(completedWords);
}

三.参数说明

四.返回值说明

五.Service实现

代码位置:

com.demo.elasticsearch.service.impl.CommonSearchServiceImpl#autoComplete
@Override
public List<String> autoComplete(AutoCompleteRequest request) throws IOException {
    //1.构建CompletionSuggestion条件
    CompletionSuggestionBuilder completionSuggestionBuilder = SuggestBuilders.completionSuggestion(request.getFieldName());
    completionSuggestionBuilder.prefix(request.getText());
    completionSuggestionBuilder.skipDuplicates(true);
    completionSuggestionBuilder.size(request.getCount());
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
    searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, completionSuggestionBuilder));

    //2.封装搜索请求
    SearchRequest searchRequest = new SearchRequest();
    searchRequest.indices(request.getIndexName());
    searchRequest.source(searchSourceBuilder);

    //3.查询ElasticSearch
    SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

    //4.获取响应中的补全的词的列表
    CompletionSuggestion completionSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST);
    List<CompletionSuggestion.Entry.Option> options = completionSuggestion.getEntries().get(0).getOptions();
    List<String> result = new ArrayList<>();
    for (CompletionSuggestion.Entry.Option option : options) {
        result.add(option.getText().string());
    }

    return result;
}

六.ElasticSearch相关文档

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/search-suggesters.html#completion-suggester

(4)输⼊内容拼写纠错接⼝

一.描述

当⽤户在搜索框输⼊内容时,前端可以捕获焦点事件,调⽤此接⼝对⽤户的输⼊进⾏纠错。

二.Controller实现

代码位置:

com.demo.elasticsearch.controller.CommonSearchController#spellingCorrection
//输入内容拼写纠错接口
@GetMapping("/spellingCorrection")
public JsonResult spellingCorrection(@RequestBody SpellingCorrectionRequest request) throws IOException {
    String correctedWord = commonSearchService.spellingCorrection(request);
    return JsonResult.buildSuccess(correctedWord);
}

三.参数说明

四.返回值说明

五.Service实现

代码位置:

com.demo.elasticsearch.service.impl.CommonSearchServiceImpl#spellingCorrection
@Override
public String spellingCorrection(SpellingCorrectionRequest request) throws IOException {
    //1.构建PhraseSuggestion条件
    PhraseSuggestionBuilder phraseSuggestionBuilder = new PhraseSuggestionBuilder(request.getFieldName());
    phraseSuggestionBuilder.text(request.getText());
    phraseSuggestionBuilder.size(1);

    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
    searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, phraseSuggestionBuilder));

    //2.封装搜索请求
    SearchRequest searchRequest = new SearchRequest();
    searchRequest.indices(request.getIndexName());
    searchRequest.source(searchSourceBuilder);

    //3.查询ElasticSearch
    SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

    //4.获取响应中纠错后的词
    PhraseSuggestion phraseSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST);
    List<PhraseSuggestion.Entry.Option> options = phraseSuggestion.getEntries().get(0).getOptions();

    return Optional.ofNullable(options).filter(e -> !e.isEmpty()).map(e -> e.get(0)).map(e -> e.getText().string()).orElse("");
}

六.ElasticSearch相关文档

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/search-suggesters.html#phrase-suggester

(5)输入内容推荐接口

一.描述

当按照⽤户的搜索词进行搜索却发现没有搜索到数据时,可以调⽤该接⼝返回推荐搜索词给⽤户。

代码位置:

com.demo.elasticsearch.controller.CommonSearchController#recommendWhenMissing
@GetMapping("/recomendWhenMissing")
public JsonResult recommendWhenMissing(@RequestBody RecommendWhenMissingRequest request) throws IOException {
    String recommendWord = commonSearchService.recommendWhenMissing(request);
    return JsonResult.buildSuccess(recommendWord);
}

三.参数说明

四.返回值说明

五.Service实现

代码位置:

com.demo.elasticsearch.service.impl.CommonSearchServiceImpl#recommendWhenMissing
@Override
public String recommendWhenMissing(RecommendWhenMissingRequest request) throws IOException {
    //1.构建TermSuggestion条件
    TermSuggestionBuilder termSuggestionBuilder = new TermSuggestionBuilder(request.getFieldName());
    termSuggestionBuilder.text(request.getText());
    termSuggestionBuilder.analyzer(IK_SMART);
    termSuggestionBuilder.minWordLength(2);
    termSuggestionBuilder.stringDistance(TermSuggestionBuilder.StringDistanceImpl.NGRAM);

    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
    searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, termSuggestionBuilder));

    //2.封装搜索请求
    SearchRequest searchRequest = new SearchRequest();
    searchRequest.indices(request.getIndexName());
    searchRequest.source(searchSourceBuilder);

    //3.查询ElasticSearch
    SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

    //4.获取响应中推荐给用户的词
    TermSuggestion termSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST);
    List<TermSuggestion.Entry.Option> options = termSuggestion.getEntries().get(0).getOptions();

    return Optional.ofNullable(options).map(e -> e.get(0)).map(e -> e.getText().string()).orElse("");
}

六.ElasticSearch相关文档

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/search-suggesters.html#term-suggester

(6)单线程批量插⼊商品数据接⼝

一.描述

⽤来初始化数据以及测试在单线程下ElasticSearch Bulk插⼊⽂档的性能。

二.Controller实现

代码位置:

com.demo.elasticsearch.controller.MockDataController#mockData1
//单线程向ES写入模拟的商品索引数据
//真正在生产环境下,不可能使用单个线程处理一个一个batch写入来实现大批量数据的写入
//实现这个方法主要是用来和接下来的多线程批量写入的方法进行性能对比
//https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-high-document-bulk.html
@PostMapping("/mockData1")
public JsonResult mockData1(@RequestBody MockData1Dto request) throws IOException {
    if (!request.validateParams()) {
      return JsonResult.buildError("参数有误");
    }
    //索引名字
    String indexName = request.getIndexName();
    //一次批量写入多少数据
    int batchSize = request.getBatchSize();
    //进行批量写入的次数
    int batchTimes = request.getBatchTimes();

    //1.从txt文件里面加载10w条商品数据,大小才13M,可以全部一次读出来
    List<Map<String, Object>> skuList = loadSkusFromTxt();
    long startTime = System.currentTimeMillis();

    //2.每次随机取出batchSize个商品数据,然后批量写入,一共执行batchTimes次
    for (int i = 0; i < batchTimes; i++) {
        //把指定的batchSize条数据打包成一个BulkRequest对象
        BulkRequest bulkRequest = buildSkuBulkRequest(indexName, batchSize, skuList);
        //然后调用ES的restHighLevelClient.bulk()接口,将BulkRequest对象写入到ES里去
        restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        log.info("写入[{}]条商品数据", batchSize);
    }
    long endTime = System.currentTimeMillis();

    //3.记录统计信息
    int totalCount = batchSize * batchTimes;
    long elapsedSeconds = (endTime - startTime) / 1000;
    long perSecond = totalCount / elapsedSeconds;
    log.info("此次共导入[{}]条商品数据,耗时[{}]秒,平均每秒导入[{}]条数据", totalCount, elapsedSeconds, perSecond);

    Map<String, Object> result = new LinkedHashMap<>();
    result.put("startTime", DateUtil.format(new Date(startTime), DatePattern.NORM_DATETIME_PATTERN));
    result.put("endTime", DateUtil.format(new Date(endTime), DatePattern.NORM_DATETIME_PATTERN));
    result.put("totalCount", totalCount);
    result.put("elapsedSeconds", elapsedSeconds);
    result.put("perSecond", perSecond);

    return JsonResult.buildSuccess(result);
}

三.参数说明

四.返回值说明

五.ElasticSearch相关文档

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-high-document-bulk.html

(7)多线程批量插⼊商品数据接⼝

一.描述

⽤来初始化数据以及测试在多线程下ElasticSearch Bulk插⼊⽂档的性能。

二.Controller实现

代码位置:

com.demo.elasticsearch.controller.MockDataController#mockData2
//多线程向ES写入模拟的商品索引数据
@PostMapping("/mockData2")
public JsonResult mockData2(@RequestBody MockData2Dto request) throws IOException, InterruptedException {
    if (!request.validateParams()) {
        return JsonResult.buildError("参数有误");
    }

    String indexName = request.getIndexName();
    //需要进行多少次batch批量写入
    int batchTimes = request.getBatchTimes();
    //每次batch批量写入多少条数据
    int batchSize = request.getBatchSize();
    //可以同时执行batch批量写入的线程数量
    int threadCount = request.getThreadCount();
    //读取10万条数据到内存
    List<Map<String, Object>> skuList = loadSkusFromTxt();

    //CountDownLatch:一个线程完成任务后才进行countDown,最后countDown到0时才能结束
    CountDownLatch countDownLatch = new CountDownLatch(batchTimes);

    //Semaphore:一个线程可以尝试从semaphore获取一个信号,如果获取不到就阻塞等待,获取到了,信号就是这个线程的了
    //当一个线程执行完其任务之后,会把信号还回去,所以最多只能有threadCount个线程可以获取到信号量
    Semaphore semaphore = new Semaphore(threadCount);

    //虽然semaphore可以控制线程池中同时进行的任务数,但是maximumPoolSize也不能设置的和semaphore一样的大小
    //因为线程池用了SynchronousQueue队列,可能会出现实际需要执行的任务数比semaphore允许数多一两个的情况
    //不过实际执行任务的线程数最终不会达到threadCount * 2
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        threadCount,
        threadCount * 2,
        60,
        TimeUnit.SECONDS,
        new SynchronousQueue<>()
    );

    long startTime = System.currentTimeMillis();

    //batch数量可以是比线程数量多
    for (int i = 0; i < batchTimes; i++) {
        //通过semaphore保证一直最多有threadCount个线程同时在执行批量写入的操作
        //先获取一个信号量,获取到了就提交任务到线程池执行批量写入的操作,获取不到就阻塞等待有空余的信号量
        semaphore.acquireUninterruptibly();

        threadPoolExecutor.submit(() -> {
            try {
                BulkRequest bulkRequest = buildSkuBulkRequest(indexName, batchSize, skuList);
                restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                log.info("线程[{}]写入[{}]条商品数据", Thread.currentThread().getName(), batchSize);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                //从下面两行代码就可以看到,当一个线程刚刚释放完semaphore后,还要执行下一行代码,还没释放线程
                //这时新一个任务可能就获取到semaphore并提交到线程池了
                //所以线程池实际需要执行的任务数可能会比semaphore的允许数threadCount多一点
                semaphore.release();
                countDownLatch.countDown();
            }
        });
    }

    long endTime = System.currentTimeMillis();

    //在这里等待一下最后一个批次的批量写入操作执行完
    countDownLatch.await();

    //现在的使用方式,在这里需要手动的把线程池给关掉
    threadPoolExecutor.shutdown();
    int totalCount = batchSize * batchTimes;
    long elapsedSeconds = (endTime - startTime) / 1000;
    long perSecond = totalCount / elapsedSeconds;
    log.info("此次共导入[{}]条商品数据,耗时[{}]秒,平均每秒导入[{}]条数据", totalCount, elapsedSeconds, perSecond);

    Map<String, Object> result = new LinkedHashMap<>();
    result.put("startTime", DateUtil.format(new Date(startTime), DatePattern.NORM_DATETIME_PATTERN));
    result.put("endTime", DateUtil.format(new Date(endTime), DatePattern.NORM_DATETIME_PATTERN));
    result.put("totalCount", totalCount);
    result.put("elapsedSeconds", elapsedSeconds);
    result.put("perSecond", perSecond);

    return JsonResult.buildSuccess(result);
}

三.参数说明

四.返回值说明

(8)单线程批量插⼊suggest数据接⼝

一.描述

初始化⽤来处理⽤户输⼊时使⽤的数据。

二.Controller实现

//单线程写入模拟的suggest数据
//https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-high-document-bulk.html
@PostMapping("/mockData3")
public JsonResult mockData3(@RequestBody MockData3Dto request) throws IOException {
    if (!request.validateParams()) {
        return JsonResult.buildError("参数有误");
    }

    String indexName = request.getIndexName();
    int batchTimes = request.getBatchTimes();
    int batchSize = request.getBatchSize();

    //1.从txt文件里面加载10w个商品名称
    List<String> skuNameList = loadSkuNamesFromTxt();
    long startTime = System.currentTimeMillis();

    //2.从第1条数据开始导入
    int index = 0;
    for (int i = 0; i < batchTimes; i++) {
        BulkRequest bulkRequest = new BulkRequest(indexName);
        for (int j = 1; j <= batchSize; j++) {
            String skuName = skuNameList.get(index);
            IndexRequest indexRequest = new IndexRequest().source(XContentType.JSON, "word1", skuName, "word2", skuName);
            System.out.println(skuName);
            bulkRequest.add(indexRequest);
            index++;
        }
        log.info("开始写入[{}]条suggest数据", batchSize);
        restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        log.info("完成写入[{}]条suggest数据", batchSize);
    }
    long endTime = System.currentTimeMillis();

    //3.记录统计信息
    int totalCount = batchSize * batchTimes;
    long elapsedSeconds = (endTime - startTime) / 1000;
    long perSecond = totalCount / elapsedSeconds;
    log.info("此次共导入[{}]条suggest数据,耗时[{}]秒,平均每秒导入[{}]条数据", totalCount, elapsedSeconds, perSecond);

    Map<String, Object> result = new LinkedHashMap<>();
    result.put("startTime", DateUtil.format(new Date(startTime), DatePattern.NORM_DATETIME_PATTERN));
    result.put("endTime", DateUtil.format(new Date(endTime), DatePattern.NORM_DATETIME_PATTERN));
    result.put("totalCount", totalCount);
    result.put("elapsedSeconds", elapsedSeconds);
    result.put("perSecond", perSecond);

    return JsonResult.buildSuccess(result);
}

三.参数说明

四.返回值说明

后端技术栈的基础修养 文章被收录于专栏

详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等

全部评论

相关推荐

评论
1
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务