🧬🌸我的SpringCloud笔记🧬高级搜索ES🌸

img

🧬高级搜索ES

🧬简单介绍

什么是elasticsearch

image-20230226213611635


image-20230226213652461


image-20230226213724571

**elasticsearch的发展 **

Lucene是一个Java语言的搜索引擎类库,是Apache公司的顶级项目,由DougCutting于1999年研发 。

官网地址:https://lucene.apache.org/

Lucene的优势:

  • 易扩展
  • 高性能(基于倒排索引)

Lucene的缺点:

  • 只限于Java语言开发
  • 学习曲线陡峭
  • 不支持水平扩展

2004年Shay Banon基于Lucene开发了Compass

2010年Shay Banon 重写了Compass,取名为Elasticsearch。

官网地址: https://www.elastic.co/cn/

目前最新的版本是:7.12.1 相比与lucene,elasticsearch具备下列优势:

  • 支持分布式,可水平扩展
  • 提供Restful接口,可被任何语言调用

倒排索引

传统数据库(如MySQL)采用正向索引,例如给下表(tb_goods)中的id创建索引:

image-20230226214309182


elasticsearch采用倒排索引:

  • 文档(document):每条数据就是一个文档
  • 词条(term):文档按照语义分成的词语

image-20230226214844496


ES与MySQL

Elasticsearch与mysql的概念对比:

image-20230226214549511

image-20230226214913497

架构

Mysql:擅长事务类型操作,可以确保数据的安全和一致性

Elasticsearch:擅长海量数据的搜索、分析、计算

image-20230226215207273


🧬安装部署

单机部署

创建网络

因为我们还需要部署kibana容器,因此需要让es和kibana容器互联。这里先创建一个网络:

docker network create es-net

加载镜像

docker pull elasticsearch:7.12.1

创建容器并运行

运行docker命令,部署单点es:

docker run -d \
--name es7 \
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
-e "discovery.type=single-node" \
-v es-data:/usr/share/elasticsearch/data \
-v es-plugins:/usr/share/elasticsearch/plugins \
--privileged \
--network es-net \
-p 9200:9200 \
-p 9300:9300 \
elasticsearch:7.12.1

命令解释:

  • -e "cluster.name=es-docker-cluster":设置集群名称
  • -e "http.host=0.0.0.0":监听的地址,可以外网访问
  • -e "ES_JAVA_OPTS=-Xms512m -Xmx512m":内存大小
  • -e "discovery.type=single-node":非集群模式
  • -v es-data:/usr/share/elasticsearch/data:挂载逻辑卷,绑定es的数据目录
  • -v es-logs:/usr/share/elasticsearch/logs:挂载逻辑卷,绑定es的日志目录
  • -v es-plugins:/usr/share/elasticsearch/plugins:挂载逻辑卷,绑定es的插件目录
  • --privileged:授予逻辑卷访问权
  • --network es-net :加入一个名为es-net的网络中
  • -p 9200:9200:端口映射配置

设置密码:

# 先进入容器,编写 config/elasticsearch.yml
# docker exec -it elasticsearch bash


# 此处开启xpack
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true


# 推出容器 重启容器 进入容器 运行命令后设置密码
# exit
# docker restart es
# docker exec -it elasticsearch bash
# elasticsearch-setup-passwords interactive

设密码参考

在浏览器中输入:http://ip:9200 即可看到elasticsearch的响应结果。


部署kibana

kibana可以给我们提供一个elasticsearch的可视化界面,便于我们学习。

部署

运行docker命令,部署kibana

docker run -d \
--name kibana \
-e ELASTICSEARCH_HOSTS=http://es7:9200 \
--network=es-net \
-p 5601:5601 \
kibana:7.12.1
  • --network es-net :加入一个名为es-net的网络中,与elasticsearch在同一个网络中
  • -e ELASTICSEARCH_HOSTS=http://es:9200":设置elasticsearch的地址,因为kibana已经与elasticsearch在一个网络,因此可以用容器名直接访问elasticsearch
  • -p 5601:5601:端口映射配置

设置密码:

#
# ** THIS IS AN AUTO-GENERATED FILE **
#

# Default Kibana configuration for docker target
server.host: "0"
server.shutdownTimeout: "5s"
elasticsearch.hosts: [ "http://172.0.0.1:9200" ]
monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: "zh-CN"
# 此处设置elastic的用户名和密码
elasticsearch.username: elastic
elasticsearch.password: elastic

kibana启动一般比较慢,需要多等待一会,可以通过命令:

docker logs -f kibana

查看运行日志,当查看到下面的日志,说明成功:

image-20210109105135812

此时,在浏览器输入地址访问:http://192.168.150.101:5601,即可看到结果

DevTools

kibana中提供了一个DevTools界面:

image-20210506102630393

这个界面中可以编写DSL来操作elasticsearch。并且对DSL语句有自动补全功能。


安装IK分词器

在线安装ik插件

# 进入容器内部
docker exec -it elasticsearch /bin/bash

# 在线下载并安装
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.12.1.zip

#退出
exit
#重启容器
docker restart elasticsearch

离线安装ik插件

查看数据卷目录

安装插件需要知道elasticsearch的plugins目录位置,而我们用了数据卷挂载,因此需要查看elasticsearch的数据卷目录,通过下面命令查看:

docker volume inspect es-plugins

显示结果:

[
{
"CreatedAt": "2022-05-06T10:06:34+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/es-plugins/_data",
"Name": "es-plugins",
"Options": null,
"Scope": "local"
}
]

说明plugins目录被挂载到了:/var/lib/docker/volumes/es-plugins/_data 这个目录中。

解压缩分词器安装包

下面我们需要把课前资料中的ik分词器解压缩,重命名为ik

image-20210506110249144

传到es容器的插件数据卷中

也就是/var/lib/docker/volumes/es-plugins/_data

image-20210506110704293

重启容器

# 4、重启容器
docker restart es
# 查看es日志
docker logs -f es

部署es集群

部署es集群可以直接使用docker-compose来完成,不过要求你的Linux虚拟机至少有4G的内存空间

首先编写一个docker-compose文件,内容如下:

version: '2.2'
services:
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
es02:
image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data02:/usr/share/elasticsearch/data
networks:
- elastic
es03:
image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data03:/usr/share/elasticsearch/data
networks:
- elastic

volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local

networks:
elastic:
driver: bridge

Run docker-compose to bring up the cluster:

docker-compose up

🧬IK分词器

IK分模式

IK分词器包含两种模式:

  • ik_smart:最少切分

  • ik_max_word:最细切分

GET /_analyze
{
"analyzer": "ik_max_word",
"text": "黑马程序员学习java太棒了"
}

结果:

{
"tokens" : [
{
"token" : "黑马",
"start_offset" : 0,
"end_offset" : 2,
"type" : "CN_WORD",
"position" : 0
},
{
"token" : "程序员",
"start_offset" : 2,
"end_offset" : 5,
"type" : "CN_WORD",
"position" : 1
},
{
"token" : "程序",
"start_offset" : 2,
"end_offset" : 4,
"type" : "CN_WORD",
"position" : 2
},
{
"token" : "员",
"start_offset" : 4,
"end_offset" : 5,
"type" : "CN_CHAR",
"position" : 3
},
{
"token" : "学习",
"start_offset" : 5,
"end_offset" : 7,
"type" : "CN_WORD",
"position" : 4
},
{
"token" : "java",
"start_offset" : 7,
"end_offset" : 11,
"type" : "ENGLISH",
"position" : 5
},
{
"token" : "太棒了",
"start_offset" : 11,
"end_offset" : 14,
"type" : "CN_WORD",
"position" : 6
},
{
"token" : "太棒",
"start_offset" : 11,
"end_offset" : 13,
"type" : "CN_WORD",
"position" : 7
},
{
"token" : "了",
"start_offset" : 13,
"end_offset" : 14,
"type" : "CN_CHAR",
"position" : 8
}
]
}

扩展词词典

随着互联网的发展,“造词运动”也越发的频繁。出现了很多新的词语,在原有的词汇列表中并不存在。比如:“奥力给”,“传智播客” 等。

所以我们的词汇也需要不断的更新,IK分词器提供了扩展词汇的功能。

1)打开IK分词器config目录: /var/lib/docker/volumes/es-plugins/_data

image-20210506112225508

2)在IKAnalyzer.cfg.xml配置文件内容添加:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 *** 添加扩展词典-->
<entry key="ext_dict">ext.dic</entry>
</properties>

3)新建一个 ext.dic,可以参考config目录下复制一个配置文件进行修改

传智播客
奥力给

4)重启elasticsearch

docker restart es

# 查看 日志
docker logs -f elasticsearch

image-20201115230900504

日志中已经成功加载ext.dic配置文件

5)测试效果:

GET /_analyze
{
"analyzer": "ik_max_word",
"text": "传智播客Java就业超过90%,奥力给!"
}

注意当前文件的编码必须是 UTF-8 格式,严禁使用Windows记事本编辑

停用词词典

在互联网项目中,在网络间传输的速度很快,所以很多语言是不允许在网络上传递的,如:关于宗教、政治等敏感词语,那么我们在搜索时也应该忽略当前词汇。

IK分词器也提供了强大的停用词功能,让我们在索引时就直接忽略当前的停用词汇表中的内容。

1)IKAnalyzer.cfg.xml配置文件内容添加:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典-->
<entry key="ext_dict">ext.dic</entry>
<!--用户可以在这里配置自己的扩展停止词字典 *** 添加停用词词典-->
<entry key="ext_stopwords">stopword.dic</entry>
</properties>

3)在 stopword.dic 添加停用词




4)重启elasticsearch

# 重启服务
docker restart elasticsearch
docker restart kibana

# 查看 日志
docker logs -f elasticsearch

日志中已经成功加载stopword.dic配置文件

5)测试效果:

GET /_analyze
{
"analyzer": "ik_max_word",
"text": "传智播客Java就业率超过95%,习大大都点赞,奥力给!"
}

注意当前文件的编码必须是 UTF-8 格式,严禁使用Windows记事本编辑


🧬API-Index

mapping属性

mapping是对索引库中文档的约束,常见的mapping属性包括:

  • type:字段数据类型,常见的简单类型有:
    • 字符串:text(可分词的文本)、keyword(精确值,例如:品牌、国家、ip地址)
    • 数值:longintegershortbytedoublefloat
    • 布尔:boolean
    • 日期:date
    • 对象:object
  • index:是否创建索引,默认为true
  • analyzer:使用哪种分词器
  • properties:该字段的子字段

创建索引库和映射

PUT /索引库名称
{
  "mappings": {
    "properties": {
      "字段名":{
        "type": "text",
        "analyzer": "ik_smart"
      },
      "字段名2":{
        "type": "keyword",
        "index": "false"
      },
      "字段名3":{
        "properties": {
          "子字段": {
            "type": "keyword"
          }
        }
      }
// ...略
    }
  }
}

示例:

PUT /heima
{
  "mappings": {
    "properties": {
      "info":{
        "type""text",
        "analyzer""ik_smart"
      },
      "email":{
        "type""keyword",
        "index""falsae"
      },
      "name":{
        "properties": {
          "firstName": {
            "type""keyword"
          }
        }
      },
// ... 略
    }
  }
}

查询文档

GET /index ### 

删除文档

DELETE /index

修改文档–添加新映射

PUT /incex/_mapping

PUT /ganga/_mapping
{
"properties":{
"新字段":{
"type": "integer"
}
}
}

🧬API-document

添加文档 - POST

方式一:

POST /ganga/_doc/1
{
"info": "尴尬酱万岁!奥里给!",
"email": "2282514478@qq.com",
"age": 9,
"name": {
"first": "尬酱",
"last": "尴"
},
"newField": "666"
}

方式二:

POST /ganga/_create/1
{
"info": "尴尬酱万岁!奥里给!",
"email": "2282514478@qq.com",
"age": 9,
"name": {
"first": "尬酱",
"last": "尴"
},
"newField": "666"
}

根据ID查询文档 - GET

GET /ganga/_doc/1

根据ID删除文档 - DELETE

DELETE /ganga/_doc/1

根据ID全量修改 - PUT

PUT /ganga/_doc/1
{
"info": "尴尬酱万岁!奥里给!嘤",
"email": "2282514478@qq.com",
"age": 9,
"name": {
"first": "尬酱",
"last": "尴"
},
"newField": "666666"
}

PUT /ganga/_doc/1
{
"info": "如果其他字段不写或写错了,都会被修改掉",
"newField": "6"
}

根据ID增量修改 - POST

POST /ganga/_update/1

{
"doc": {
"info": "尴尬酱万岁",
"newField": "6"
}
}

Elasticsearch提供了基于JSON的DSL(Domain Specific Language)来定义查询。常见的查询类型包括:

  • 查询所有:查询出所有数据,一般测试用。例如:match_all

  • 全文检索(full text)查询:利用分词器对用户输入内容分词,然后去倒排索引库中匹配。例如:

    • match_query
    • multi_match_query
  • 精确查询:根据精确词条值查找数据,一般是查找keyword、数值、日期、boolean等类型字段。例如:

    • ids
    • range
    • term
  • 地理(geo)查询:根据经纬度查询。例如:

    • geo_distance
    • geo_bounding_box
  • 复合(compound)查询:复合查询可以将上述各种查询条件组合起来,合并查询条件。例如:

    • bool
    • function_score

查询所有文档-match_all

GET /hotel/_search
{
"query": {
"match_all": {
}
}
}

全文检索查询-match

# 全文检索查询 match
GET /hotel/_search
{
"query": {
"match":{
"all": "上海"
}
}
}

全文检索查询-multi_match

# 全文检索查询 multi_match
# 这种方式相对与match 结果相同 但是性能会很差 match利用copy_to复制到了all字段中
GET /hotel/_search
{
"query": {
"multi_match": {
"query": "上海",
"fields": ["name","brand","city"]
}
}
}

精确查找-term

# 精确查找 term
GET /hotel/_search
{
"query": {
"term": {
"starName": {
"value": "五钻" //不进行分词的字段查询
}
}
}
}

精确查找-range

# 精确查找 range
GET /hotel/_search
{
"query": {
"range":{
"price": {
"gte": 1000, // "gte"大于等于 "gt"大于
"lte": 1302 // "lte"小于等于 "lt"小于
}
}
}
}

地理坐标查询-geo_distance

# 地理坐标查询
# 附近查询,也叫做距离查询(geo_distance)俗称:"方圆内"
GET /hotel/_search
{
"query": {
"geo_distance":{
"distance": "10km",
"location": "31.21, 121.5"
}
}
}
GET /hotel/_search
{
"query": {
"geo_distance":{
"distance": "5km",
"location": "31.21, 121.5"
}
}
}

复合查询-算分函数

function score 查询中包含四部分内容:

  • 原始查询条件:query部分,基于这个条件搜索文档,并且基于BM25算法给文档打分,原始算分(query score)
  • 过滤条件:filter部分,符合该条件的文档才会重新算分
  • 算分函数:符合filter条件的文档要根据这个函数做运算,得到的函数算分(function score),有四种函数
    • weight:函数结果是常量
    • field_value_factor:以文档中的某个字段值作为函数结果
    • random_score:以随机数作为函数结果
    • script_score:自定义算分函数算法
  • 运算模式:算分函数的结果、原始查询的相关性算分,两者之间的运算方式,包括:
    • multiply:相乘
    • replace:用function score替换query score
    • 其它,例如:sum、avg、max、min

function score的运行流程如下:

  • 1)根据原始条件查询搜索文档,并且计算相关性算分,称为原始算分(query score)
  • 2)根据过滤条件,过滤文档
  • 3)符合过滤条件的文档,基于算分函数运算,得到函数算分(function score)
  • 4)将原始算分(query score)和函数算分(function score)基于运算模式做运算,得到最终结果,作为相关性算分。

因此,其中的关键点是:

  • 过滤条件:决定哪些文档的算分被修改
  • 算分函数:决定函数算分的算法
  • 运算模式:决定最终算分结果

image-20210721193458182

# 复合查询 fuction score 算分函数查询,可以控制文档相关性算分,控制文档排名
# 早期使用的打分算法是TF-IDF算法
# 在后来的5.1版本升级中,elasticsearch将算法改进为BM25算法
GET /hotel/_search
GET /hotel/_search
{
  "query": {
    "function_score": {
      "query": { .... }, // 原始查询,可以是任意条件
      "functions": [ // 算分函数
        {
          "filter": { // 满足的条件,品牌必须是如家
            "term": {
              "brand": "如家"
            }
          },
          "weight": 2 // 算分权重为2
        }
      ],
"boost_mode": "sum" // 加权模式,求和
    }
  }
}

复合查询-布尔查询

布尔查询是一个或多个查询子句的组合,每一个子句就是一个子查询。子查询的组合方式有:

  • must:必须匹配每个子查询,类似“与”
  • should:选择性匹配子查询,类似“或”
  • must_not:必须不匹配,不参与算分,类似“非”
  • filter:必须匹配,不参与算分
GET /ganga/_search
{
  "query": {
    "bool": {
      "must": [
        {"term": {"city": "上海" }}
      ],
      "should": [
        {"term": {"brand": "皇冠假日" }},
{"term": {"brand": "华美达" }}
      ],
      "must_not": [
        { "range": { "price": { "lte": 500 } }}
      ],
      "filter": [
        { "range": {"score": { "gte": 45 } }}
      ]
    }
  }
}

🧬API-聚合函数

TODO:


🧬搜索结果处理

搜索的结果可以按照用户指定的方式去处理或展示。

排序

elasticsearch默认是根据相关度算分(_score)来排序,但是也支持自定义方式对搜索结果排序。可以排序字段类型有:keyword类型、数值类型、地理坐标类型、日期类型等。

普通字段排序

keyword、数值、日期类型排序的语法基本一致。

GET /indexName/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "FIELD": "desc"  // 排序字段、排序方式ASC、DESC
    }
  ]
}

地理坐标排序

地理坐标排序略有不同。

GET /indexName/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "_geo_distance" : {
          "FIELD" : "纬度,经度", // 文档中geo_point类型的字段名、目标坐标点
          "order" : "asc", // 排序方式
          "unit" : "km" // 排序的距离单位
      }
    }
  ]
}

这个查询的含义是:

  • 指定一个坐标,作为目标点
  • 计算每一个文档中,指定字段(必须是geo_point类型)的坐标 到目标点的距离是多少
  • 根据距离排序

需求描述:实现对酒店数据按照到你的位置坐标的距离升序排序

获取你的位置的经纬度的方式:https://lbs.amap.com/demo/jsapi-v2/example/map/click-to-get-lnglat/

假设我的位置是:31.034661,121.612282,寻找我周围距离最近的酒店。

image-20210721200214690


分页

elasticsearch 默认情况下只返回top10的数据。而如果要查询更多数据就需要修改分页参数了。elasticsearch中通过修改from、size参数来控制要返回的分页结果:

  • from:从第几个文档开始
  • size:总共查询几个文档

类似于mysql中的limit ?, ?

分页的基本语法如下:

GET /hotel/_search
{
  "query": {
    "match_all": {}
  },
  "from": 0, // 分页开始的位置,默认为0
  "size": 10, // 期望获取的文档总数
  "sort": [
    {"price": "asc"}
  ]
}

深度分页问题

现在,我要查询990~1000的数据,查询逻辑要这么写:

GET /hotel/_search
{
  "query": {
    "match_all": {}
  },
  "from": 990, // 分页开始的位置,默认为0
  "size": 10, // 期望获取的文档总数
  "sort": [
    {"price": "asc"}
  ]
}

这里是查询990开始的数据,也就是 第990~第1000条 数据。

不过,elasticsearch内部分页时,必须先查询 0~1000条,然后截取其中的990 ~ 1000的这10条:

image-20210721200643029

查询TOP1000,如果es是单点模式,这并无太大影响。

但是elasticsearch将来一定是集群,例如我集群有5个节点,我要查询TOP1000的数据,并不是每个节点查询200条就可以了。

因为节点A的TOP200,在另一个节点可能排到10000名以外了。

因此要想获取整个集群的TOP1000,必须先查询出每个节点的TOP1000,汇总结果后,重新排名,重新截取TOP1000。

image-20210721201003229

那如果我要查询9900~10000的数据呢?是不是要先查询TOP10000呢?那每个节点都要查询10000条?汇总到内存中?

当查询分页深度较大时,汇总数据过多,对内存和CPU会产生非常大的压力,因此elasticsearch会禁止from+ size 超过10000的请求。

针对深度分页,ES提供了两种解决方案,官方文档

  • search after:分页时需要排序,原理是从上一次的排序值开始,查询下一页数据。官方推荐使用的方式。
  • scroll:原理将排序后的文档id形成快照,保存在内存。官方已经不推荐使用。

小结

分页查询的常见实现方案以及优缺点:

  • from + size

    • 优点:支持随机翻页
    • 缺点:深度分页问题,默认查询上限(from + size)是10000
    • 场景:百度、京东、谷歌、淘宝这样的随机翻页搜索
  • after search

    • 优点:没有查询上限(单次查询的size不超过10000)
    • 缺点:只能向后逐页查询,不支持随机翻页
    • 场景:没有随机翻页需求的搜索,例如手机向下滚动翻页
  • scroll

    • 优点:没有查询上限(单次查询的size不超过10000)
    • 缺点:会有额外内存消耗,并且搜索结果是非实时的
    • 场景:海量数据的获取和迁移。从ES7.1开始不推荐,建议用 after search方案。

高亮

高亮的语法

GET /hotel/_search
{
  "query": {
    "match": {
      "FIELD": "TEXT" // 查询条件,高亮一定要使用全文检索查询
    }
  },
  "highlight": {
    "fields": { // 指定要高亮的字段
      "FIELD": {
        "pre_tags": "<em>",  // 用来标记高亮字段的前置标签
        "post_tags": "</em>" // 用来标记高亮字段的后置标签
      }
    }
  }
}

注意:

  • 高亮是对关键字高亮,因此搜索条件必须带有关键字,而不能是范围这样的查询。
  • 默认情况下,高亮的字段,必须与搜索指定的字段一致,否则无法高亮
  • 如果要对非搜索字段高亮,则需要添加一个属性:required_field_match=false

示例

image-20210721203349633


总结

查询的DSL是一个大的JSON对象,包含下列属性:

  • query:查询条件
  • from和size:分页条件
  • sort:排序条件
  • highlight:高亮条件

image-20210721203657850

-


🧬RestClient


酒店数据索引库分析

这是表结构:

image-20230228234510099

分析索引

分析逻辑:

  1. 字段的索引类型
  2. 如果是字符串,是否需要分词
  3. 不分词用keyword
  4. 分词用test,分词器analyzer是什么?ik_max_word/ik_smart/...
  5. 该字段是进行搜索,默认搜索,不搜索:"index": false

数据库

CREATE TABLE `tb_hotel` (
`id` bigint(20) NOT NULL COMMENT '酒店id',
`name` varchar(255) NOT NULL COMMENT '酒店名称',
`address` varchar(255) NOT NULL COMMENT '酒店地址',
`price` int(10) NOT NULL COMMENT '酒店价格',
`score` int(2) NOT NULL COMMENT '酒店评分',
`brand` varchar(32) NOT NULL COMMENT '酒店品牌',
`city` varchar(32) NOT NULL COMMENT '所在城市',
`star_name` varchar(16) DEFAULT NULL COMMENT '酒店星级,1星到5星,1钻到5钻',
`business` varchar(255) DEFAULT NULL COMMENT '商圈',
`latitude` varchar(32) NOT NULL COMMENT '纬度',
`longitude` varchar(32) NOT NULL COMMENT '经度',
`pic` varchar(255) DEFAULT NULL COMMENT '酒店图片',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;

索引映射

{
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "text",
"analyzer": "ik_max_word",
"copy_to": "all"
},
"address": {
"type": "keyword",
"index": false
},
"price": {
"type": "integer"
},
"score": {
"type": "integer"
},
"brand": {
"type": "keyword",
"copy_to": "all"
},
"city": {
"type": "keyword",
"copy_to": "all"
},
"starName": {
"type": "keyword"
},
"business": {
"type": "keyword"
},
"location": {
"type": "geo_point"
},
"pic": {
"type": "keyword",
"index": false
},
"all": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}

RestClient初始化配置

pom.xml

<properties>
<java.version>1.8</java.version>
<!--由于springboot管理了es 这里要指定es的版本 与 es服务段版本一致-->
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties>

<dependencies>
<!--es-high依赖 - RestAPI-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
</dependencies>
@SpringBootTest(classes = HotelDemoApplication.class)
public class IndexDSLTest {

private RestHighLevelClient restClient;

@BeforeEach //创建 RestClient
public void initRest() {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("user", "password"));
this.restClient = new RestHighLevelClient(
RestClient.builder(
new HttpHost("ayaka520", 9200, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
})
);
}

@AfterEach //清除客户端
public void closeRest() throws IOException {
restClient.close();
}

//Test

}

实体类的转换

MySQL

package com.ganga.hotel.pojo;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("tb_hotel")
public class Hotel {
@TableId(type = IdType.INPUT)
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String longitude;
private String latitude;
private String pic;
}

ES实体类

package com.ganga.hotel.pojo;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class HotelDoc {
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String location;
private String pic;

public HotelDoc(Hotel hotel) {
this.id = hotel.getId();
this.name = hotel.getName();
this.address = hotel.getAddress();
this.price = hotel.getPrice();
this.score = hotel.getScore();
this.brand = hotel.getBrand();
this.city = hotel.getCity();
this.starName = hotel.getStarName();
this.business = hotel.getBusiness();
this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
this.pic = hotel.getPic();
}
}

对索引库-DSL

添加索引库

/**
* 添加索引库
* @throws IOException
*/
@Test
public void createIndexTest() throws IOException {
//1.设置Request对象,声明请求方式和路径 : GET /hotel
CreateIndexRequest request = new CreateIndexRequest("hotel");
//2.设置请求参数,也就是DSL语句。
request.source(CREATE_INDEX_HOTEL, XContentType.JSON);
//3.发起请求
restClient.indices().create(request, RequestOptions.DEFAULT);
}

删除索引库

/**
* 删除索引
* @throws IOException
*/
@Test
public void deleteIndexTest() throws IOException {
//1.设置Request对象,声明请求方式和路径 : DELETE /hotel
DeleteIndexRequest request = new DeleteIndexRequest("hotel");
//2.设置请求参数,也就是DSL语句。 无参数
//3.发起请求
restClient.indices().delete(request, RequestOptions.DEFAULT);
}

查询索引是否存在

/**
* 查询索引是否存在
* @throws IOException
*/
@Test
public void getIndexTest() throws IOException {
//1.设置Request对象,声明请求方式和路径 : GET /hotel
GetIndexRequest request = new GetIndexRequest("hotel");
//2.设置请求参数,也就是DSL语句。 无参数
//3.发起请求
boolean exists = restClient.indices().exists(request, RequestOptions.DEFAULT);
System.out.println(exists);
}


对文档的-DSL

根据id添加文档

/**
* 根据 id 添加文档
*
* @throws IOException
*/
@Test
public void addDocumentTest() throws IOException {
//1.从数据库中查询店铺
Hotel hotel = hotelService.getById(id);
//2.转换为 HotelDoc 类型
HotelDoc hotelDoc = new HotelDoc(hotel);
//3.序列化为 JSON
String jsonString = JSON.toJSONString(hotelDoc);
//1.创建request对象
IndexRequest request = new IndexRequest("hotel").id(id.toString());//注意要加上.id()
//2.准备DSL语句
request.source(jsonString, XContentType.JSON);
//3.发起请求
restClient.index(request, RequestOptions.DEFAULT);
}

根据id查询文档

/**
* 根据 id 查询文档
*
* @throws IOException
*/
@Test
public void getDocumentById() throws IOException {
//1.创建request对象
GetRequest request = new GetRequest("hotel").id(id.toString());
//2.准备DSL语句
//request.id("197488318");
//3.发起请求
GetResponse response = restClient.get(request, RequestOptions.DEFAULT);
//4.反序列化
String json = response.getSourceAsString();
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
System.out.println(hotelDoc);
}



根据id修改文档

/**
* 根据id 修改文档
*
* @throws IOException
*/
@Test
public void updateDocumentById() throws IOException {
//这是增量修改!!!
restClient.update( //坑: 两个参数 一个是 index 一个是id 而不是链式..... 坑:
new UpdateRequest("hotel", id.toString()).doc(
"name", "神里绫华--第二字段",
"address", "神里绫华--第二字段"
),
RequestOptions.DEFAULT
);
}

根据id删除文档

/**
* 根据 id 删除文档
*
* @throws IOException
*/
@Test
public void deleteDocumentById() throws IOException {
restClient.delete(
new DeleteRequest("hotel").id(id.toString()),
RequestOptions.DEFAULT
);
}

根据id批量导入文档

/**
* 根据id 批量导入文档
*/
@Test
public void bulkAddDocById() throws IOException {
//从 MySql中获取所有数据
List<Hotel> hotels = hotelService.list();
//创建 BulkRequest对象
BulkRequest request = new BulkRequest("hotel");
hotels.forEach(hotel -> {
//转换为 HotelDoc 并 序列化为 JSON
String json = JSON.toJSONString(new HotelDoc(hotel));
//使用 add() 方法添加 单个 request方法 //坑:别忘了加 .id()
request.add(new IndexRequest("hotel").id(hotel.getId().toString()).source(json, XContentType.JSON));
});
//这里使用 restClient.bulk()
restClient.bulk(request, RequestOptions.DEFAULT);
}

查询所有

/**
* match_all 查询所有
*
* @throws IOException
*/
@Test
public void matchAllQueryTest() throws IOException {
//1.准备request请求
SearchRequest request = new SearchRequest("hotel");
//2.准备DSL语句
request.source().query(QueryBuilders.matchAllQuery());
//3.发起请求
SearchResponse response = restClient.search(request, RequestOptions.DEFAULT);


//解析相应结果 应封装为一个函数 下面有
SearchHits hits = response.getHits();
long total = hits.getTotalHits().value;
System.out.println("共搜索到" + total + "条数据");
//解析数据
for (SearchHit hit : hits.getHits()) {
String json = hit.getSourceAsString();
//反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
System.out.println(hotelDoc + "\n");
}
}
// #### 解析结果 👇

检索查询

/**
* 检索查询
*
* @throws IOException
*/
@Test
public void matchQueryTest() throws IOException {
//发起请求
SearchRequest request = new SearchRequest("hotel");
request.source().query(
QueryBuilders.matchQuery("all", "如家")
);
SearchResponse response = restClient.search(request, RequestOptions.DEFAULT);
//解析结果
handleResponse(response);
}

复合查询

精确查询

/**
* 复合查询 (布尔查询)
* 精确查询 (term、range)
*
* @throws IOException
*/
@Test
public void boolQueryTest() throws IOException {
SearchRequest request = new SearchRequest("hotel");
request.source().query(
//先构建一个 boolQuery() 布尔查询
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("name", "和颐")) //match 检索查询
.must(QueryBuilders.rangeQuery("price").gte(100).lte(300)) //range 数值查询
.mustNot(QueryBuilders.termQuery("starName", "一钻")) //term 精确查询
.mustNot(QueryBuilders.termQuery("starName", "二钻"))
//.must(QueryBuilders.) // 必须匹配
//.should(QueryBuilders.) // 选择性匹配
//.mustNot(QueryBuilders.) // 必须不匹配 不参与算分
//.filter(QueryBuilders.) // 必须匹配 不参与算分
);
SearchResponse response = restClient.search(request, RequestOptions.DEFAULT);
// 解析结果
handleResponse(response);
}

分页查询

排序查询

/**
* 分页查询
* 排序查询
* @throws IOException
*/
@Test
public void OrderPageQueryTest() throws IOException {
// 接收分页参数
int page = 2, size = 5; // 查第二页 每页5条
// 创建 request
SearchRequest request = new SearchRequest("hotel");
// source 相当于 json数据的 { } query,from,size,...都在其内部
SearchSourceBuilder source = request.source();
// 查询条件
source.query(QueryBuilders.matchQuery("all", "如家"));
// 分页
source.from((page - 1) * size).size(size); // 以后 page 会变得 第一页就为 0
// 排序
source.sort("price", SortOrder.ASC);
// 发送
SearchResponse response = restClient.search(request, RequestOptions.DEFAULT);
// 结果处理
handleResponse(response);
}

高亮查询

/**
* 高亮查询
* 高亮数据解析 this.handleResponse(response)
* @throws IOException
*/
@Test
public void addHighlightQueryTest() throws IOException {
// 接收分页参数
int page = 2, size = 5; // 查第二页 每页5条
// 创建 request
SearchRequest request = new SearchRequest("hotel");
// source 相当于 json数据的 { } query,from,size,...都在其内部
SearchSourceBuilder source = request.source();
// 查询条件
source.query(QueryBuilders.matchQuery("all", "如家"));
// 分页
source.from((page - 1) * size).size(size); // 以后 page 会变得 第一页就为 0
// 排序
source.sort("price", SortOrder.ASC);
// 高亮
source.highlighter(
new HighlightBuilder() // 这是要 new 一个 HighlightBuilder()
.field("name") // field 指定高亮的字段
.requireFieldMatch(false) // false 对非搜索字段高亮,默认为 true 不对非搜索字段高亮
);
// 发送
SearchResponse response = restClient.search(request, RequestOptions.DEFAULT);
// 结果处理
handleResponse(response);
// 为了实现高亮 还需要对结果解析进行 继续 处理
}

解析结果

/**
* 解析结果
*
* @param response
*/
private static void handleResponse(SearchResponse response) {
//解析结果
long total = response.getHits().getTotalHits().value;
System.out.println("共搜索到" + total + "条数据");
for (SearchHit hit : response.getHits().getHits()) {
// 获取查询数据
String json = hit.getSourceAsString();
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
// 获取高亮数据
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
if (!CollectionUtils.isEmpty(highlightFields)){
// 获取高亮字段
HighlightField field = highlightFields.get("name");
if (field != null){
String name = field.getFragments()[0].string();
//替换原有数据
hotelDoc.setName(name);
}
}
System.out.println(hotelDoc + "\n");
}
}

对文档的-聚合

TODO:


🧬酒店案例 - 搜索

客户端

@Configuration
public class RestClientConfig {

@Bean
public RestHighLevelClient restClient() {
// Http 地址端口 及 用户认证
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "password"));

//构建RestHighLevelClient对象
return new RestHighLevelClient(
org.elasticsearch.client.RestClient.builder(
new HttpHost("ayaka520", 9200, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
})
);
}
}

实体类

Hotel : 接收 MySQL 数据

@Data
@TableName("tb_hotel")
public class Hotel {
@TableId(type = IdType.INPUT)
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String longitude;
private String latitude;
private String pic;
}

HotelDoc : 用来接收 ES 数据

@Data
@NoArgsConstructor
public class HotelDoc {
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String location;
private String pic;
//用户相对距离值 非映射
private Object distance;
//是否开通了广告 true->开通
private Boolean isAD;

public HotelDoc(Hotel hotel) {
this.id = hotel.getId();
this.name = hotel.getName();
this.address = hotel.getAddress();
this.price = hotel.getPrice();
this.score = hotel.getScore();
this.brand = hotel.getBrand();
this.city = hotel.getCity();
this.starName = hotel.getStarName();
this.business = hotel.getBusiness();
this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
this.pic = hotel.getPic();
}
}

RequestParams : 接收前端数据

@Data
public class RequestParams {

private String key;
private Integer page;
private Integer size;
private String sortBy;

private String brand;
private String city;
private String starName;
private Integer minPrice;
private Integer maxPrice;
private String location;

}

PageResult : 用来返回前端数据

@Data
public class PageResult {

private Long total;
private List<HotelDoc> hotels;

public PageResult() {
}

public PageResult(Long total, List<HotelDoc> hotels) {
this.total = total;
this.hotels = hotels;
}
}

模拟

模拟开通广告

@Autowired
private RestHighLevelClient restClient;
/**
* 给几个hotel添加广告
* @throws IOException
*/
@Test
public void addADTest() throws IOException {
BulkRequest request = new BulkRequest("hotel");
request.add(new UpdateRequest("hotel",ADD_AD_TEST_01_ID).doc("isAD",true));
request.add(new UpdateRequest("hotel",ADD_AD_TEST_02_ID).doc("isAD",true));
request.add(new UpdateRequest("hotel",ADD_AD_TEST_03_ID).doc("isAD",true));
restClient.bulk(request, RequestOptions.DEFAULT);
}

接口

Post请求 /hotel/list

@RestController()
@RequestMapping("/hotel")
public class HotelController {

@Autowired
private HotelService hotelService;

@PostMapping("/list")
public PageResult queryList(@RequestBody RequestParams requestParams){
return hotelService.searchList(requestParams);
}
}

业务实现

@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {

@Autowired
private RestHighLevelClient restClient;

@Override
public PageResult searchList(RequestParams requestParams) {

try {
// 解析参数
String key = requestParams.getKey();
int page = requestParams.getPage();
int size = requestParams.getSize();
// 创建请求
SearchRequest request = new SearchRequest("hotel");
SearchSourceBuilder source = request.source();
// 布尔查询 - 原始查询
BoolQueryBuilder boolQuery = new BoolQueryBuilder();
if (ObjectUtils.isEmpty(key)) {
// 无搜索内容
boolQuery.must(QueryBuilders.matchAllQuery());
} else {
// 有搜索内容
boolQuery.must(QueryBuilders.matchQuery("all", key));
}
// 条件搜索 brand 精确查询
if (ObjectUtils.isNotEmpty(requestParams.getBrand())) {
boolQuery.filter(QueryBuilders.termQuery("brand", requestParams.getBrand()));
}
// 条件搜索 city 精确查询
if (ObjectUtils.isNotEmpty(requestParams.getCity())) {
boolQuery.filter(QueryBuilders.termQuery("city", requestParams.getCity()));
}
// 条件搜索 starName 精确查询
if (ObjectUtils.isNotEmpty(requestParams.getStarName())) {
boolQuery.filter(QueryBuilders.termQuery("starName", requestParams.getStarName()));
}
// 条件查询 min/maxPrice 范围查询
if (ObjectUtils.isNotEmpty(requestParams.getMinPrice()) && ObjectUtils.isNotEmpty(requestParams.getMaxPrice())) {
boolQuery.filter(QueryBuilders.rangeQuery("price")
.gte(requestParams.getMinPrice())
.lte(requestParams.getMaxPrice()));
}
// 排序 算分函数 广告优先
FunctionScoreQueryBuilder functionScore = QueryBuilders.functionScoreQuery(
// 原始查询,相关性算分的查询
boolQuery,
// function score的数组
new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
// 其中的一个function score 元素
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
// 过滤条件
QueryBuilders.termQuery("isAD", true),
// 算分函数
ScoreFunctionBuilders.weightFactorFunction(10)
)
});
// source 内的 query 内的 functionScoreQuery 内的 boolQuery
// 原始查询boolQuery放入functionScore functionScore放入query query放入source
source.query(functionScore);


// 排序 位置
if (ObjectUtils.isNotEmpty(requestParams.getLocation())) {
source.sort(SortBuilders.geoDistanceSort("location", new GeoPoint(requestParams.getLocation()))
.order(SortOrder.ASC)
.unit(DistanceUnit.KILOMETERS)
);
}
// 分页
source.from((page - 1) * size).size(size);
// 高亮
source.highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
// 发送请求
SearchResponse response = restClient.search(request, RequestOptions.DEFAULT);
// 请求结果处理
return this.handleResponse(response);
} catch (IOException e) {
throw new RuntimeException(e);
}

}


/**
* 解析 封装数据
*
* @param response es查询全部数据
* @return PageResult 数据
*/
private PageResult handleResponse(SearchResponse response) {
// 解析查询条数
long total = response.getHits().getTotalHits().value;
// 解析酒店数据
ArrayList<HotelDoc> hotels = new ArrayList<>();
for (SearchHit hit : response.getHits().getHits()) {
String json = hit.getSourceAsString();
HotelDoc hotel = JSON.parseObject(json, HotelDoc.class);
//解析排序字段 --> 距离
Object[] sorts = hit.getSortValues();
if (ObjectUtils.isNotEmpty(sorts)) {
Object sort = sorts[0];
hotel.setDistance(sort);
}
//高亮
Map<String, HighlightField> highlights = hit.getHighlightFields();
if (ObjectUtils.isNotEmpty(highlights)) {
HighlightField field = highlights.get("name");
if (ObjectUtils.isNotEmpty(field)) {
hotel.setName(field.getFragments()[0].string());
}
}
//添加到集合
hotels.add(hotel);
}
// 封装 返回
return new PageResult(total, hotels);
}
}

🧬酒店数据 - 同步

思路分析

elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步

思路分析

常见的数据同步方案有三种:

  • 同步调用
  • 异步通知
  • 监听binlog

方案一:同步调用

image-20210723214931869

基本步骤如下:

  • hotel-demo对外提供接口,用来修改elasticsearch中的数据
  • 酒店管理服务在完成数据库操作后,直接调用hotel-demo提供的接口,

方案二:异步通知

image-20210723215140735

流程如下:

  • hotel-admin对mysql数据库数据完成增、删、改后,发送MQ消息
  • hotel-demo监听MQ,接收到消息后完成elasticsearch数据修改

方案三:监听binlog

image-20210723215518541

流程如下:

  • 给mysql开启binlog功能
  • mysql完成增、删、改操作都会记录在binlog中
  • hotel-demo基于canal监听binlog变化,实时更新elasticsearch中的内容

**选择: **

方式一:同步调用

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高

方式二:异步通知

  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性

方式三:监听binlog

  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高

依赖 / 配置

增删改 - 发送方

pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yaml

spring:
rabbitmq:
host: xxx
username: xxx
password: xxx
virtual-host: /

消息接收方

pom.xml

<properties>
<java.version>1.8</java.version>
<!--由于springboot管理了es 这里要指定es的版本 与 es服务段版本一致-->
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties>

<dependencies>

<!--es-high依赖 - RestAPI-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>

</dependencies>
</properties>

注入RestClient

package com.ganga.hotel.clients;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RestClientConfig {

@Bean
public RestHighLevelClient restClient() {
// Http 地址端口 及 用户认证
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials("username", "password"));

//构建RestHighLevelClient对象
return new RestHighLevelClient(
org.elasticsearch.client.RestClient.builder(
new HttpHost("xxx", 9200, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
})
);
}

}

MqConstant常量

package com.ganga.hotel.constant;

public class MqConstant {

/**
* 交换机
*/
public static final String HOTEL_EXCHANGE = "hotel.exchange";

/**
* 消息队列
*/
public static final String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
public static final String HOTEL_DELETE_QUEUE = "hotel.delete.queue";


/**
* 新增 或 修改 的 RoutingKey
*/
public static final String HOTEL_INSERT_KEY = "hotel.insert";
public static final String HOTEL_DELETE_KEY = "hotel.delete";
}

MqConfig配置

package com.ganga.hotel.config;

import com.ganga.hotel.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {

/**
* 自定义 消息转换器
* @return new Jackson2JsonMessageConverter()
*/
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}

/**
*
* @return Topic交换机
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MqConstant.HOTEL_EXCHANGE, true, false);
}

/**
*
* @return 新增 / 修改 的消息队列
*/
@Bean
public Queue insertQueue() {
return new Queue(MqConstant.HOTEL_INSERT_QUEUE, true);
}

/**
* 删除 的消息队列
* @return
*/
@Bean
public Queue deleteQueue() {
return new Queue(MqConstant.HOTEL_DELETE_QUEUE, true);
}

/**
* 消息队列 与 交换机 绑定
* 注意:别忘了加上 RoutingKey !!!
* @return Binding
*/
@Bean
public Binding bindingInsertQueue() {
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstant.HOTEL_INSERT_KEY);
}

@Bean
public Binding bindingDeleteQueue(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstant.HOTEL_DELETE_KEY);
}

}

HotelController 发送消息

package com.ganga.hotel.web;

import com.ganga.hotel.pojo.Hotel;
import com.ganga.hotel.pojo.PageResult;
import com.ganga.hotel.service.IHotelService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.security.InvalidParameterException;

import static com.ganga.hotel.constant.MqConstant.*;

@RestController
@RequestMapping("hotel")
public class HotelController {

@Autowired
private IHotelService hotelService;

//注入 RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id) {
return hotelService.getById(id);
}

@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
) {
Page<Hotel> result = hotelService.page(new Page<>(page, size));

return new PageResult(result.getTotal(), result.getRecords());
}

/**
* 新增
*
* @param hotel
*/
@PostMapping
public void saveHotel(@RequestBody Hotel hotel) {

//数据库新增数据
boolean isSuccess = hotelService.save(hotel);
System.out.println(hotel.getId());
//发送消息 更新索引
if (isSuccess) {
// HOTEL_EXCHANGE 交换机, KEY , 数据 -> 为了减小消息体积 只发个id就行
rabbitTemplate.convertAndSend(HOTEL_EXCHANGE, HOTEL_INSERT_KEY, hotel.getId());
}
}

/**
* 修改
*
* @param hotel
*/
@PutMapping()
public void updateById(@RequestBody Hotel hotel) {
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
boolean isSuccess = hotelService.updateById(hotel);
if (isSuccess) {
// 发送消息
rabbitTemplate.convertAndSend(HOTEL_EXCHANGE, HOTEL_INSERT_KEY, hotel.getId());
}
}

/**
* 删除
*
* @param id
*/
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
boolean isSuccess = hotelService.removeById(id);
if (isSuccess) {
// 发送消息
rabbitTemplate.convertAndSend(HOTEL_EXCHANGE, HOTEL_DELETE_KEY, id);
}
}
}

MqListener-监听消息

package com.ganga.hotel.mq;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.ganga.hotel.constant.MqConstant;
import com.ganga.hotel.pojo.Hotel;
import com.ganga.hotel.pojo.HotelDoc;
import com.ganga.hotel.service.impl.HotelService;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListeners;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class MqListener {

@Autowired
private HotelService hotelService;
@Autowired
private RestHighLevelClient restClient;

@RabbitListener(queues = MqConstant.HOTEL_INSERT_QUEUE)
public void insertOrUpdateQueue(Long id) {
//根据队列中的 id 查询数据库
Hotel hotel = hotelService.getById(id);
HotelDoc hotelDoc = new HotelDoc(hotel);
System.err.println(JSON.toJSONString(hotelDoc));
try {
//更新 ES 文档
IndexRequest request = new IndexRequest("hotel").id(id.toString());
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
restClient.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}

}

@RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE)
public void deleteQueue(String id) {
System.err.println(id);
try {
// 根据 id 删除 ES 中的文档
DeleteRequest request = new DeleteRequest("hotel").id(id);
restClient.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}