🧬🌸我的SpringCloud笔记🧬高级搜索ES🌸
🧬高级搜索ES
🧬简单介绍
什么是elasticsearch
**elasticsearch的发展 **
Lucene是一个Java语言的搜索引擎类库,是Apache公司的顶级项目,由DougCutting于1999年研发 。
官网地址:https://lucene.apache.org/
Lucene的优势:
Lucene的缺点:
- 只限于Java语言开发
- 学习曲线陡峭
- 不支持水平扩展
2004年Shay Banon基于Lucene开发了Compass
2010年Shay Banon 重写了Compass,取名为Elasticsearch。
官网地址: https://www.elastic.co/cn/
目前最新的版本是:7.12.1 相比与lucene,elasticsearch具备下列优势:
- 支持分布式,可水平扩展
- 提供Restful接口,可被任何语言调用
倒排索引
传统数据库(如MySQL)采用正向索引,例如给下表(tb_goods)中的id创建索引:
elasticsearch采用倒排索引:
- 文档(document):每条数据就是一个文档
- 词条(term):文档按照语义分成的词语
ES与MySQL
Elasticsearch与mysql的概念对比:
架构
Mysql:擅长事务类型操作,可以确保数据的安全和一致性
Elasticsearch:擅长海量数据的搜索、分析、计算
🧬安装部署
单机部署
创建网络
因为我们还需要部署kibana容器,因此需要让es和kibana容器互联。这里先创建一个网络:
docker network create es-net
|
加载镜像
docker pull elasticsearch:7.12.1
|
创建容器并运行
运行docker命令,部署单点es:
docker run -d \ --name es7 \ -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ -e "discovery.type=single-node" \ -v es-data:/usr/share/elasticsearch/data \ -v es-plugins:/usr/share/elasticsearch/plugins \ --privileged \ --network es-net \ -p 9200:9200 \ -p 9300:9300 \ elasticsearch:7.12.1
|
命令解释:
-e "cluster.name=es-docker-cluster"
:设置集群名称
-e "http.host=0.0.0.0"
:监听的地址,可以外网访问
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m"
:内存大小
-e "discovery.type=single-node"
:非集群模式
-v es-data:/usr/share/elasticsearch/data
:挂载逻辑卷,绑定es的数据目录
-v es-logs:/usr/share/elasticsearch/logs
:挂载逻辑卷,绑定es的日志目录
-v es-plugins:/usr/share/elasticsearch/plugins
:挂载逻辑卷,绑定es的插件目录
--privileged
:授予逻辑卷访问权
--network es-net
:加入一个名为es-net的网络中
-p 9200:9200
:端口映射配置
设置密码:
http.cors.enabled: true http.cors.allow-origin: "*" http.cors.allow-headers: Authorization xpack.security.enabled: true xpack.security.transport.ssl.enabled: true
|
设密码参考
在浏览器中输入:http://ip:9200 即可看到elasticsearch的响应结果。
部署kibana
kibana可以给我们提供一个elasticsearch的可视化界面,便于我们学习。
部署
运行docker命令,部署kibana
docker run -d \ --name kibana \ -e ELASTICSEARCH_HOSTS=http://es7:9200 \ --network=es-net \ -p 5601:5601 \ kibana:7.12.1
|
--network es-net
:加入一个名为es-net的网络中,与elasticsearch在同一个网络中
-e ELASTICSEARCH_HOSTS=http://es:9200"
:设置elasticsearch的地址,因为kibana已经与elasticsearch在一个网络,因此可以用容器名直接访问elasticsearch
-p 5601:5601
:端口映射配置
设置密码:
server.host: "0" server.shutdownTimeout: "5s" elasticsearch.hosts: [ "http://172.0.0.1:9200" ] monitoring.ui.container.elasticsearch.enabled: true i18n.locale: "zh-CN"
elasticsearch.username: elastic elasticsearch.password: elastic
|
kibana启动一般比较慢,需要多等待一会,可以通过命令:
查看运行日志,当查看到下面的日志,说明成功:
此时,在浏览器输入地址访问:http://192.168.150.101:5601,即可看到结果
DevTools
kibana中提供了一个DevTools界面:
这个界面中可以编写DSL来操作elasticsearch。并且对DSL语句有自动补全功能。
安装IK分词器
在线安装ik插件
# 进入容器内部 docker exec -it elasticsearch /bin/bash
# 在线下载并安装 ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.12.1.zip
#退出 exit #重启容器 docker restart elasticsearch
|
离线安装ik插件
查看数据卷目录
安装插件需要知道elasticsearch的plugins目录位置,而我们用了数据卷挂载,因此需要查看elasticsearch的数据卷目录,通过下面命令查看:
docker volume inspect es-plugins
|
显示结果:
[ { "CreatedAt": "2022-05-06T10:06:34+08:00", "Driver": "local", "Labels": null, "Mountpoint": "/var/lib/docker/volumes/es-plugins/_data", "Name": "es-plugins", "Options": null, "Scope": "local" } ]
|
说明plugins目录被挂载到了:/var/lib/docker/volumes/es-plugins/_data
这个目录中。
解压缩分词器安装包
下面我们需要把课前资料中的ik分词器解压缩,重命名为ik
传到es容器的插件数据卷中
也就是/var/lib/docker/volumes/es-plugins/_data
:
重启容器
# 4、重启容器 docker restart es
|
部署es集群
部署es集群可以直接使用docker-compose来完成,不过要求你的Linux虚拟机至少有4G的内存空间
首先编写一个docker-compose文件,内容如下:
version: '2.2' services: es01: image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1 container_name: es01 environment: - node.name=es01 - cluster.name=es-docker-cluster - discovery.seed_hosts=es02,es03 - cluster.initial_master_nodes=es01,es02,es03 - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ulimits: memlock: soft: -1 hard: -1 volumes: - data01:/usr/share/elasticsearch/data ports: - 9200:9200 networks: - elastic es02: image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1 container_name: es02 environment: - node.name=es02 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es03 - cluster.initial_master_nodes=es01,es02,es03 - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ulimits: memlock: soft: -1 hard: -1 volumes: - data02:/usr/share/elasticsearch/data networks: - elastic es03: image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1 container_name: es03 environment: - node.name=es03 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es02 - cluster.initial_master_nodes=es01,es02,es03 - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ulimits: memlock: soft: -1 hard: -1 volumes: - data03:/usr/share/elasticsearch/data networks: - elastic
volumes: data01: driver: local data02: driver: local data03: driver: local
networks: elastic: driver: bridge
|
Run docker-compose
to bring up the cluster:
🧬IK分词器
IK分模式
IK分词器包含两种模式:
ik_smart
:最少切分
ik_max_word
:最细切分
GET /_analyze { "analyzer": "ik_max_word", "text": "黑马程序员学习java太棒了" }
|
结果:
{ "tokens" : [ { "token" : "黑马", "start_offset" : 0, "end_offset" : 2, "type" : "CN_WORD", "position" : 0 }, { "token" : "程序员", "start_offset" : 2, "end_offset" : 5, "type" : "CN_WORD", "position" : 1 }, { "token" : "程序", "start_offset" : 2, "end_offset" : 4, "type" : "CN_WORD", "position" : 2 }, { "token" : "员", "start_offset" : 4, "end_offset" : 5, "type" : "CN_CHAR", "position" : 3 }, { "token" : "学习", "start_offset" : 5, "end_offset" : 7, "type" : "CN_WORD", "position" : 4 }, { "token" : "java", "start_offset" : 7, "end_offset" : 11, "type" : "ENGLISH", "position" : 5 }, { "token" : "太棒了", "start_offset" : 11, "end_offset" : 14, "type" : "CN_WORD", "position" : 6 }, { "token" : "太棒", "start_offset" : 11, "end_offset" : 13, "type" : "CN_WORD", "position" : 7 }, { "token" : "了", "start_offset" : 13, "end_offset" : 14, "type" : "CN_CHAR", "position" : 8 } ] }
|
扩展词词典
随着互联网的发展,“造词运动”也越发的频繁。出现了很多新的词语,在原有的词汇列表中并不存在。比如:“奥力给”,“传智播客” 等。
所以我们的词汇也需要不断的更新,IK分词器提供了扩展词汇的功能。
1)打开IK分词器config目录: /var/lib/docker/volumes/es-plugins/_data
2)在IKAnalyzer.cfg.xml配置文件内容添加:
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 扩展配置</comment> <entry key="ext_dict">ext.dic</entry> </properties>
|
3)新建一个 ext.dic,可以参考config目录下复制一个配置文件进行修改
4)重启elasticsearch
docker restart es
docker logs -f elasticsearch
|
日志中已经成功加载ext.dic配置文件
5)测试效果:
GET /_analyze { "analyzer": "ik_max_word", "text": "传智播客Java就业超过90%,奥力给!" }
|
注意当前文件的编码必须是 UTF-8 格式,严禁使用Windows记事本编辑
停用词词典
在互联网项目中,在网络间传输的速度很快,所以很多语言是不允许在网络上传递的,如:关于宗教、政治等敏感词语,那么我们在搜索时也应该忽略当前词汇。
IK分词器也提供了强大的停用词功能,让我们在索引时就直接忽略当前的停用词汇表中的内容。
1)IKAnalyzer.cfg.xml配置文件内容添加:
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 扩展配置</comment> <entry key="ext_dict">ext.dic</entry> <entry key="ext_stopwords">stopword.dic</entry> </properties>
|
3)在 stopword.dic 添加停用词
4)重启elasticsearch
docker restart elasticsearch docker restart kibana
docker logs -f elasticsearch
|
日志中已经成功加载stopword.dic配置文件
5)测试效果:
GET /_analyze { "analyzer": "ik_max_word", "text": "传智播客Java就业率超过95%,习大大都点赞,奥力给!" }
|
注意当前文件的编码必须是 UTF-8 格式,严禁使用Windows记事本编辑
🧬API-Index
mapping属性
mapping是对索引库中文档的约束,常见的mapping属性包括:
type
:字段数据类型,常见的简单类型有:
- 字符串:
text
(可分词的文本)、keyword
(精确值,例如:品牌、国家、ip地址)
- 数值:
long
、integer
、short
、byte
、double
、float
、
- 布尔:
boolean
- 日期:
date
- 对象:
object
index
:是否创建索引,默认为true
analyzer
:使用哪种分词器
properties
:该字段的子字段
创建索引库和映射
PUT /索引库名称 { "mappings": { "properties": { "字段名":{ "type": "text", "analyzer": "ik_smart" }, "字段名2":{ "type": "keyword", "index": "false" }, "字段名3":{ "properties": { "子字段": { "type": "keyword" } } } } } }
|
示例:
PUT /heima { "mappings": { "properties": { "info":{ "type": "text", "analyzer": "ik_smart" }, "email":{ "type": "keyword", "index": "falsae" }, "name":{ "properties": { "firstName": { "type": "keyword" } } }, // ... 略 } } }
|
查询文档
删除文档
修改文档–添加新映射
PUT /incex/_mapping
PUT /ganga/_mapping { "properties":{ "新字段":{ "type": "integer" } } }
|
🧬API-document
添加文档 - POST
方式一:
POST /ganga/_doc/1 { "info": "尴尬酱万岁!奥里给!", "email": "2282514478@qq.com", "age": 9, "name": { "first": "尬酱", "last": "尴" }, "newField": "666" }
|
方式二:
POST /ganga/_create/1 { "info": "尴尬酱万岁!奥里给!", "email": "2282514478@qq.com", "age": 9, "name": { "first": "尬酱", "last": "尴" }, "newField": "666" }
|
根据ID查询文档 - GET
根据ID删除文档 - DELETE
根据ID全量修改 - PUT
PUT /ganga/_doc/1 { "info": "尴尬酱万岁!奥里给!嘤", "email": "2282514478@qq.com", "age": 9, "name": { "first": "尬酱", "last": "尴" }, "newField": "666666" }
PUT /ganga/_doc/1 { "info": "如果其他字段不写或写错了,都会被修改掉", "newField": "6" }
|
根据ID增量修改 - POST
POST /ganga/_update/1
{ "doc": { "info": "尴尬酱万岁", "newField": "6" } }
|
🧬API-Search
Elasticsearch提供了基于JSON的DSL(Domain Specific Language)来定义查询。常见的查询类型包括:
查询所有:查询出所有数据,一般测试用。例如:match_all
全文检索(full text)查询:利用分词器对用户输入内容分词,然后去倒排索引库中匹配。例如:
- match_query
- multi_match_query
精确查询:根据精确词条值查找数据,一般是查找keyword、数值、日期、boolean等类型字段。例如:
地理(geo)查询:根据经纬度查询。例如:
- geo_distance
- geo_bounding_box
复合(compound)查询:复合查询可以将上述各种查询条件组合起来,合并查询条件。例如:
查询所有文档-match_all
GET /hotel/_search { "query": { "match_all": { } } }
|
全文检索查询-match
# 全文检索查询 match GET /hotel/_search { "query": { "match":{ "all": "上海" } } }
|
全文检索查询-multi_match
# 全文检索查询 multi_match # 这种方式相对与match 结果相同 但是性能会很差 match利用copy_to复制到了all字段中 GET /hotel/_search { "query": { "multi_match": { "query": "上海", "fields": ["name","brand","city"] } } }
|
精确查找-term
# 精确查找 term GET /hotel/_search { "query": { "term": { "starName": { "value": "五钻" } } } }
|
精确查找-range
# 精确查找 range GET /hotel/_search { "query": { "range":{ "price": { "gte": 1000, "lte": 1302 } } } }
|
地理坐标查询-geo_distance
# 地理坐标查询 # 附近查询,也叫做距离查询(geo_distance)俗称:"方圆内" GET /hotel/_search { "query": { "geo_distance":{ "distance": "10km", "location": "31.21, 121.5" } } } GET /hotel/_search { "query": { "geo_distance":{ "distance": "5km", "location": "31.21, 121.5" } } }
|
复合查询-算分函数
function score 查询中包含四部分内容:
- 原始查询条件:query部分,基于这个条件搜索文档,并且基于BM25算法给文档打分,原始算分(query score)
- 过滤条件:filter部分,符合该条件的文档才会重新算分
- 算分函数:符合filter条件的文档要根据这个函数做运算,得到的函数算分(function score),有四种函数
- weight:函数结果是常量
- field_value_factor:以文档中的某个字段值作为函数结果
- random_score:以随机数作为函数结果
- script_score:自定义算分函数算法
- 运算模式:算分函数的结果、原始查询的相关性算分,两者之间的运算方式,包括:
- multiply:相乘
- replace:用function score替换query score
- 其它,例如:sum、avg、max、min
function score的运行流程如下:
- 1)根据原始条件查询搜索文档,并且计算相关性算分,称为原始算分(query score)
- 2)根据过滤条件,过滤文档
- 3)符合过滤条件的文档,基于算分函数运算,得到函数算分(function score)
- 4)将原始算分(query score)和函数算分(function score)基于运算模式做运算,得到最终结果,作为相关性算分。
因此,其中的关键点是:
- 过滤条件:决定哪些文档的算分被修改
- 算分函数:决定函数算分的算法
- 运算模式:决定最终算分结果
# 复合查询 fuction score 算分函数查询,可以控制文档相关性算分,控制文档排名 # 早期使用的打分算法是TF-IDF算法 # 在后来的5.1版本升级中,elasticsearch将算法改进为BM25算法 GET /hotel/_search GET /hotel/_search { "query": { "function_score": { "query": { .... }, "functions": [ { "filter": { "term": { "brand": "如家" } }, "weight": 2 } ], "boost_mode": "sum" } } }
|
复合查询-布尔查询
布尔查询是一个或多个查询子句的组合,每一个子句就是一个子查询。子查询的组合方式有:
- must:必须匹配每个子查询,类似“与”
- should:选择性匹配子查询,类似“或”
- must_not:必须不匹配,不参与算分,类似“非”
- filter:必须匹配,不参与算分
GET /ganga/_search { "query": { "bool": { "must": [ {"term": {"city": "上海" }} ], "should": [ {"term": {"brand": "皇冠假日" }}, {"term": {"brand": "华美达" }} ], "must_not": [ { "range": { "price": { "lte": 500 } }} ], "filter": [ { "range": {"score": { "gte": 45 } }} ] } } }
|
🧬API-聚合函数
TODO:
🧬搜索结果处理
搜索的结果可以按照用户指定的方式去处理或展示。
排序
elasticsearch默认是根据相关度算分(_score)来排序,但是也支持自定义方式对搜索结果排序。可以排序字段类型有:keyword类型、数值类型、地理坐标类型、日期类型等。
普通字段排序
keyword、数值、日期类型排序的语法基本一致。
GET /indexName/_search { "query": { "match_all": {} }, "sort": [ { "FIELD": "desc" } ] }
|
地理坐标排序
地理坐标排序略有不同。
GET /indexName/_search { "query": { "match_all": {} }, "sort": [ { "_geo_distance" : { "FIELD" : "纬度,经度", "order" : "asc", "unit" : "km" } } ] }
|
这个查询的含义是:
- 指定一个坐标,作为目标点
- 计算每一个文档中,指定字段(必须是geo_point类型)的坐标 到目标点的距离是多少
- 根据距离排序
需求描述:实现对酒店数据按照到你的位置坐标的距离升序排序
获取你的位置的经纬度的方式:https://lbs.amap.com/demo/jsapi-v2/example/map/click-to-get-lnglat/
假设我的位置是:31.034661,121.612282,寻找我周围距离最近的酒店。
分页
elasticsearch 默认情况下只返回top10的数据。而如果要查询更多数据就需要修改分页参数了。elasticsearch中通过修改from、size参数来控制要返回的分页结果:
- from:从第几个文档开始
- size:总共查询几个文档
类似于mysql中的limit ?, ?
分页的基本语法如下:
GET /hotel/_search { "query": { "match_all": {} }, "from": 0, "size": 10, "sort": [ {"price": "asc"} ] }
|
深度分页问题
现在,我要查询990~1000的数据,查询逻辑要这么写:
GET /hotel/_search { "query": { "match_all": {} }, "from": 990, "size": 10, "sort": [ {"price": "asc"} ] }
|
这里是查询990开始的数据,也就是 第990~第1000条 数据。
不过,elasticsearch内部分页时,必须先查询 0~1000条,然后截取其中的990 ~ 1000的这10条:
查询TOP1000,如果es是单点模式,这并无太大影响。
但是elasticsearch将来一定是集群,例如我集群有5个节点,我要查询TOP1000的数据,并不是每个节点查询200条就可以了。
因为节点A的TOP200,在另一个节点可能排到10000名以外了。
因此要想获取整个集群的TOP1000,必须先查询出每个节点的TOP1000,汇总结果后,重新排名,重新截取TOP1000。
那如果我要查询9900~10000的数据呢?是不是要先查询TOP10000呢?那每个节点都要查询10000条?汇总到内存中?
当查询分页深度较大时,汇总数据过多,对内存和CPU会产生非常大的压力,因此elasticsearch会禁止from+ size 超过10000的请求。
针对深度分页,ES提供了两种解决方案,官方文档:
- search after:分页时需要排序,原理是从上一次的排序值开始,查询下一页数据。官方推荐使用的方式。
- scroll:原理将排序后的文档id形成快照,保存在内存。官方已经不推荐使用。
小结
分页查询的常见实现方案以及优缺点:
from + size
:
- 优点:支持随机翻页
- 缺点:深度分页问题,默认查询上限(from + size)是10000
- 场景:百度、京东、谷歌、淘宝这样的随机翻页搜索
after search
:
- 优点:没有查询上限(单次查询的size不超过10000)
- 缺点:只能向后逐页查询,不支持随机翻页
- 场景:没有随机翻页需求的搜索,例如手机向下滚动翻页
scroll
:
- 优点:没有查询上限(单次查询的size不超过10000)
- 缺点:会有额外内存消耗,并且搜索结果是非实时的
- 场景:海量数据的获取和迁移。从ES7.1开始不推荐,建议用 after search方案。
高亮
高亮的语法:
GET /hotel/_search { "query": { "match": { "FIELD": "TEXT" } }, "highlight": { "fields": { "FIELD": { "pre_tags": "<em>", "post_tags": "</em>" } } } }
|
注意:
- 高亮是对关键字高亮,因此搜索条件必须带有关键字,而不能是范围这样的查询。
- 默认情况下,高亮的字段,必须与搜索指定的字段一致,否则无法高亮
- 如果要对非搜索字段高亮,则需要添加一个属性:required_field_match=false
示例:
总结
查询的DSL是一个大的JSON对象,包含下列属性:
- query:查询条件
- from和size:分页条件
- sort:排序条件
- highlight:高亮条件
-
🧬RestClient
酒店数据索引库分析
这是表结构:
分析索引
分析逻辑:
- 字段的索引类型
- 如果是字符串,是否需要分词
- 不分词用keyword
- 分词用test,分词器analyzer是什么?
ik_max_word
/ik_smart
/...
- 该字段是进行搜索,默认搜索,不搜索:
"index": false
数据库
CREATE TABLE `tb_hotel` ( `id` bigint(20) NOT NULL COMMENT '酒店id', `name` varchar(255) NOT NULL COMMENT '酒店名称', `address` varchar(255) NOT NULL COMMENT '酒店地址', `price` int(10) NOT NULL COMMENT '酒店价格', `score` int(2) NOT NULL COMMENT '酒店评分', `brand` varchar(32) NOT NULL COMMENT '酒店品牌', `city` varchar(32) NOT NULL COMMENT '所在城市', `star_name` varchar(16) DEFAULT NULL COMMENT '酒店星级,1星到5星,1钻到5钻', `business` varchar(255) DEFAULT NULL COMMENT '商圈', `latitude` varchar(32) NOT NULL COMMENT '纬度', `longitude` varchar(32) NOT NULL COMMENT '经度', `pic` varchar(255) DEFAULT NULL COMMENT '酒店图片', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;
|
索引映射
{ "mappings": { "properties": { "id": { "type": "keyword" }, "name": { "type": "text", "analyzer": "ik_max_word", "copy_to": "all" }, "address": { "type": "keyword", "index": false }, "price": { "type": "integer" }, "score": { "type": "integer" }, "brand": { "type": "keyword", "copy_to": "all" }, "city": { "type": "keyword", "copy_to": "all" }, "starName": { "type": "keyword" }, "business": { "type": "keyword" }, "location": { "type": "geo_point" }, "pic": { "type": "keyword", "index": false }, "all": { "type": "text", "analyzer": "ik_max_word" } } } }
|
RestClient初始化配置
pom.xml
<properties> <java.version>1.8</java.version> <elasticsearch.version>7.12.1</elasticsearch.version> </properties>
<dependencies>
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> </dependency> </dependencies>
|
@SpringBootTest(classes = HotelDemoApplication.class) public class IndexDSLTest { private RestHighLevelClient restClient; @BeforeEach public void initRest() { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("user", "password")); this.restClient = new RestHighLevelClient( RestClient.builder( new HttpHost("ayaka520", 9200, "http")) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.disableAuthCaching(); return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }) ); }
@AfterEach public void closeRest() throws IOException { restClient.close(); } }
|
实体类的转换
MySQL
package com.ganga.hotel.pojo;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data;
@Data @TableName("tb_hotel") public class Hotel { @TableId(type = IdType.INPUT) private Long id; private String name; private String address; private Integer price; private Integer score; private String brand; private String city; private String starName; private String business; private String longitude; private String latitude; private String pic; }
|
ES实体类
package com.ganga.hotel.pojo;
import lombok.Data; import lombok.NoArgsConstructor;
@Data @NoArgsConstructor public class HotelDoc { private Long id; private String name; private String address; private Integer price; private Integer score; private String brand; private String city; private String starName; private String business; private String location; private String pic;
public HotelDoc(Hotel hotel) { this.id = hotel.getId(); this.name = hotel.getName(); this.address = hotel.getAddress(); this.price = hotel.getPrice(); this.score = hotel.getScore(); this.brand = hotel.getBrand(); this.city = hotel.getCity(); this.starName = hotel.getStarName(); this.business = hotel.getBusiness(); this.location = hotel.getLatitude() + ", " + hotel.getLongitude(); this.pic = hotel.getPic(); } }
|
对索引库-DSL
添加索引库
@Test public void createIndexTest() throws IOException { CreateIndexRequest request = new CreateIndexRequest("hotel"); request.source(CREATE_INDEX_HOTEL, XContentType.JSON); restClient.indices().create(request, RequestOptions.DEFAULT); }
|
删除索引库
@Test public void deleteIndexTest() throws IOException { DeleteIndexRequest request = new DeleteIndexRequest("hotel"); restClient.indices().delete(request, RequestOptions.DEFAULT); }
|
查询索引是否存在
@Test public void getIndexTest() throws IOException { GetIndexRequest request = new GetIndexRequest("hotel"); boolean exists = restClient.indices().exists(request, RequestOptions.DEFAULT); System.out.println(exists); }
|
对文档的-DSL
根据id添加文档
@Test public void addDocumentTest() throws IOException { Hotel hotel = hotelService.getById(id); HotelDoc hotelDoc = new HotelDoc(hotel); String jsonString = JSON.toJSONString(hotelDoc); IndexRequest request = new IndexRequest("hotel").id(id.toString()); request.source(jsonString, XContentType.JSON); restClient.index(request, RequestOptions.DEFAULT); }
|
根据id查询文档
@Test public void getDocumentById() throws IOException { GetRequest request = new GetRequest("hotel").id(id.toString()); GetResponse response = restClient.get(request, RequestOptions.DEFAULT); String json = response.getSourceAsString(); HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); System.out.println(hotelDoc); }
|
根据id修改文档
@Test public void updateDocumentById() throws IOException { restClient.update( new UpdateRequest("hotel", id.toString()).doc( "name", "神里绫华--第二字段", "address", "神里绫华--第二字段" ), RequestOptions.DEFAULT ); }
|
根据id删除文档
@Test public void deleteDocumentById() throws IOException { restClient.delete( new DeleteRequest("hotel").id(id.toString()), RequestOptions.DEFAULT ); }
|
根据id批量导入文档
@Test public void bulkAddDocById() throws IOException { List<Hotel> hotels = hotelService.list(); BulkRequest request = new BulkRequest("hotel"); hotels.forEach(hotel -> { String json = JSON.toJSONString(new HotelDoc(hotel)); request.add(new IndexRequest("hotel").id(hotel.getId().toString()).source(json, XContentType.JSON)); }); restClient.bulk(request, RequestOptions.DEFAULT); }
|
文档搜索-Search
查询所有
@Test public void matchAllQueryTest() throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source().query(QueryBuilders.matchAllQuery()); SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); SearchHits hits = response.getHits(); long total = hits.getTotalHits().value; System.out.println("共搜索到" + total + "条数据"); for (SearchHit hit : hits.getHits()) { String json = hit.getSourceAsString(); HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); System.out.println(hotelDoc + "\n"); } }
|
检索查询
@Test public void matchQueryTest() throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source().query( QueryBuilders.matchQuery("all", "如家") ); SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); handleResponse(response); }
|
复合查询
精确查询
@Test public void boolQueryTest() throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source().query( QueryBuilders.boolQuery() .must(QueryBuilders.matchQuery("name", "和颐")) .must(QueryBuilders.rangeQuery("price").gte(100).lte(300)) .mustNot(QueryBuilders.termQuery("starName", "一钻")) .mustNot(QueryBuilders.termQuery("starName", "二钻")) ); SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); handleResponse(response); }
|
分页查询
排序查询
@Test public void OrderPageQueryTest() throws IOException { int page = 2, size = 5; SearchRequest request = new SearchRequest("hotel"); SearchSourceBuilder source = request.source(); source.query(QueryBuilders.matchQuery("all", "如家")); source.from((page - 1) * size).size(size); source.sort("price", SortOrder.ASC); SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); handleResponse(response); }
|
高亮查询
@Test public void addHighlightQueryTest() throws IOException { int page = 2, size = 5; SearchRequest request = new SearchRequest("hotel"); SearchSourceBuilder source = request.source(); source.query(QueryBuilders.matchQuery("all", "如家")); source.from((page - 1) * size).size(size); source.sort("price", SortOrder.ASC); source.highlighter( new HighlightBuilder() .field("name") .requireFieldMatch(false) ); SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); handleResponse(response); }
|
解析结果
private static void handleResponse(SearchResponse response) { long total = response.getHits().getTotalHits().value; System.out.println("共搜索到" + total + "条数据"); for (SearchHit hit : response.getHits().getHits()) { String json = hit.getSourceAsString(); HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); Map<String, HighlightField> highlightFields = hit.getHighlightFields(); if (!CollectionUtils.isEmpty(highlightFields)){ HighlightField field = highlightFields.get("name"); if (field != null){ String name = field.getFragments()[0].string(); hotelDoc.setName(name); } } System.out.println(hotelDoc + "\n"); } }
|
对文档的-聚合
TODO:
🧬酒店案例 - 搜索
客户端
@Configuration public class RestClientConfig {
@Bean public RestHighLevelClient restClient() { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials( AuthScope.ANY, new UsernamePasswordCredentials("elastic", "password"));
return new RestHighLevelClient( org.elasticsearch.client.RestClient.builder( new HttpHost("ayaka520", 9200, "http")) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.disableAuthCaching(); return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }) ); } }
|
实体类
Hotel : 接收 MySQL 数据
@Data @TableName("tb_hotel") public class Hotel { @TableId(type = IdType.INPUT) private Long id; private String name; private String address; private Integer price; private Integer score; private String brand; private String city; private String starName; private String business; private String longitude; private String latitude; private String pic; }
|
HotelDoc : 用来接收 ES 数据
@Data @NoArgsConstructor public class HotelDoc { private Long id; private String name; private String address; private Integer price; private Integer score; private String brand; private String city; private String starName; private String business; private String location; private String pic; private Object distance; private Boolean isAD;
public HotelDoc(Hotel hotel) { this.id = hotel.getId(); this.name = hotel.getName(); this.address = hotel.getAddress(); this.price = hotel.getPrice(); this.score = hotel.getScore(); this.brand = hotel.getBrand(); this.city = hotel.getCity(); this.starName = hotel.getStarName(); this.business = hotel.getBusiness(); this.location = hotel.getLatitude() + ", " + hotel.getLongitude(); this.pic = hotel.getPic(); } }
|
RequestParams : 接收前端数据
@Data public class RequestParams {
private String key; private Integer page; private Integer size; private String sortBy;
private String brand; private String city; private String starName; private Integer minPrice; private Integer maxPrice; private String location;
}
|
PageResult : 用来返回前端数据
@Data public class PageResult {
private Long total; private List<HotelDoc> hotels;
public PageResult() { }
public PageResult(Long total, List<HotelDoc> hotels) { this.total = total; this.hotels = hotels; } }
|
模拟
模拟开通广告
@Autowired private RestHighLevelClient restClient;
@Test public void addADTest() throws IOException { BulkRequest request = new BulkRequest("hotel"); request.add(new UpdateRequest("hotel",ADD_AD_TEST_01_ID).doc("isAD",true)); request.add(new UpdateRequest("hotel",ADD_AD_TEST_02_ID).doc("isAD",true)); request.add(new UpdateRequest("hotel",ADD_AD_TEST_03_ID).doc("isAD",true)); restClient.bulk(request, RequestOptions.DEFAULT); }
|
接口
Post请求 /hotel/list
@RestController() @RequestMapping("/hotel") public class HotelController {
@Autowired private HotelService hotelService;
@PostMapping("/list") public PageResult queryList(@RequestBody RequestParams requestParams){ return hotelService.searchList(requestParams); } }
|
业务实现
@Service public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired private RestHighLevelClient restClient;
@Override public PageResult searchList(RequestParams requestParams) {
try { String key = requestParams.getKey(); int page = requestParams.getPage(); int size = requestParams.getSize(); SearchRequest request = new SearchRequest("hotel"); SearchSourceBuilder source = request.source(); BoolQueryBuilder boolQuery = new BoolQueryBuilder(); if (ObjectUtils.isEmpty(key)) { boolQuery.must(QueryBuilders.matchAllQuery()); } else { boolQuery.must(QueryBuilders.matchQuery("all", key)); } if (ObjectUtils.isNotEmpty(requestParams.getBrand())) { boolQuery.filter(QueryBuilders.termQuery("brand", requestParams.getBrand())); } if (ObjectUtils.isNotEmpty(requestParams.getCity())) { boolQuery.filter(QueryBuilders.termQuery("city", requestParams.getCity())); } if (ObjectUtils.isNotEmpty(requestParams.getStarName())) { boolQuery.filter(QueryBuilders.termQuery("starName", requestParams.getStarName())); } if (ObjectUtils.isNotEmpty(requestParams.getMinPrice()) && ObjectUtils.isNotEmpty(requestParams.getMaxPrice())) { boolQuery.filter(QueryBuilders.rangeQuery("price") .gte(requestParams.getMinPrice()) .lte(requestParams.getMaxPrice())); } FunctionScoreQueryBuilder functionScore = QueryBuilders.functionScoreQuery( boolQuery, new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{ new FunctionScoreQueryBuilder.FilterFunctionBuilder( QueryBuilders.termQuery("isAD", true), ScoreFunctionBuilders.weightFactorFunction(10) ) }); source.query(functionScore);
if (ObjectUtils.isNotEmpty(requestParams.getLocation())) { source.sort(SortBuilders.geoDistanceSort("location", new GeoPoint(requestParams.getLocation())) .order(SortOrder.ASC) .unit(DistanceUnit.KILOMETERS) ); } source.from((page - 1) * size).size(size); source.highlighter(new HighlightBuilder().field("name").requireFieldMatch(false)); SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); return this.handleResponse(response); } catch (IOException e) { throw new RuntimeException(e); }
}
private PageResult handleResponse(SearchResponse response) { long total = response.getHits().getTotalHits().value; ArrayList<HotelDoc> hotels = new ArrayList<>(); for (SearchHit hit : response.getHits().getHits()) { String json = hit.getSourceAsString(); HotelDoc hotel = JSON.parseObject(json, HotelDoc.class); Object[] sorts = hit.getSortValues(); if (ObjectUtils.isNotEmpty(sorts)) { Object sort = sorts[0]; hotel.setDistance(sort); } Map<String, HighlightField> highlights = hit.getHighlightFields(); if (ObjectUtils.isNotEmpty(highlights)) { HighlightField field = highlights.get("name"); if (ObjectUtils.isNotEmpty(field)) { hotel.setName(field.getFragments()[0].string()); } } hotels.add(hotel); } return new PageResult(total, hotels); } }
|
🧬酒店数据 - 同步
思路分析
elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步。
思路分析
常见的数据同步方案有三种:
方案一:同步调用
基本步骤如下:
- hotel-demo对外提供接口,用来修改elasticsearch中的数据
- 酒店管理服务在完成数据库操作后,直接调用hotel-demo提供的接口,
方案二:异步通知
流程如下:
- hotel-admin对mysql数据库数据完成增、删、改后,发送MQ消息
- hotel-demo监听MQ,接收到消息后完成elasticsearch数据修改
方案三:监听binlog
流程如下:
- 给mysql开启binlog功能
- mysql完成增、删、改操作都会记录在binlog中
- hotel-demo基于canal监听binlog变化,实时更新elasticsearch中的内容
**选择: **
方式一:同步调用
方式二:异步通知
- 优点:低耦合,实现难度一般
- 缺点:依赖mq的可靠性
方式三:监听binlog
- 优点:完全解除服务间耦合
- 缺点:开启binlog增加数据库负担、实现复杂度高
依赖 / 配置
增删改 - 发送方
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
application.yaml
spring: rabbitmq: host: xxx username: xxx password: xxx virtual-host: /
|
消息接收方
pom.xml
<properties> <java.version>1.8</java.version> <elasticsearch.version>7.12.1</elasticsearch.version> </properties>
<dependencies> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> </dependency> </dependencies> </properties>
|
注入RestClient
package com.ganga.hotel.clients;
import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RestClientConfig {
@Bean public RestHighLevelClient restClient() { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials( AuthScope.ANY, new UsernamePasswordCredentials("username", "password"));
return new RestHighLevelClient( org.elasticsearch.client.RestClient.builder( new HttpHost("xxx", 9200, "http")) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.disableAuthCaching(); return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }) ); }
}
|
MqConstant常量
package com.ganga.hotel.constant;
public class MqConstant {
public static final String HOTEL_EXCHANGE = "hotel.exchange";
public static final String HOTEL_INSERT_QUEUE = "hotel.insert.queue"; public static final String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
public static final String HOTEL_INSERT_KEY = "hotel.insert"; public static final String HOTEL_DELETE_KEY = "hotel.delete"; }
|
MqConfig配置
package com.ganga.hotel.config;
import com.ganga.hotel.constant.MqConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class MqConfig {
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }
@Bean public TopicExchange topicExchange() { return new TopicExchange(MqConstant.HOTEL_EXCHANGE, true, false); }
@Bean public Queue insertQueue() { return new Queue(MqConstant.HOTEL_INSERT_QUEUE, true); }
@Bean public Queue deleteQueue() { return new Queue(MqConstant.HOTEL_DELETE_QUEUE, true); }
@Bean public Binding bindingInsertQueue() { return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstant.HOTEL_INSERT_KEY); }
@Bean public Binding bindingDeleteQueue(){ return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstant.HOTEL_DELETE_KEY); }
}
|
HotelController 发送消息
package com.ganga.hotel.web;
import com.ganga.hotel.pojo.Hotel; import com.ganga.hotel.pojo.PageResult; import com.ganga.hotel.service.IHotelService; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*;
import java.security.InvalidParameterException;
import static com.ganga.hotel.constant.MqConstant.*;
@RestController @RequestMapping("hotel") public class HotelController {
@Autowired private IHotelService hotelService; @Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/{id}") public Hotel queryById(@PathVariable("id") Long id) { return hotelService.getById(id); }
@GetMapping("/list") public PageResult hotelList( @RequestParam(value = "page", defaultValue = "1") Integer page, @RequestParam(value = "size", defaultValue = "1") Integer size ) { Page<Hotel> result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords()); }
@PostMapping public void saveHotel(@RequestBody Hotel hotel) {
boolean isSuccess = hotelService.save(hotel); System.out.println(hotel.getId()); if (isSuccess) { rabbitTemplate.convertAndSend(HOTEL_EXCHANGE, HOTEL_INSERT_KEY, hotel.getId()); } }
@PutMapping() public void updateById(@RequestBody Hotel hotel) { if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } boolean isSuccess = hotelService.updateById(hotel); if (isSuccess) { rabbitTemplate.convertAndSend(HOTEL_EXCHANGE, HOTEL_INSERT_KEY, hotel.getId()); } }
@DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { boolean isSuccess = hotelService.removeById(id); if (isSuccess) { rabbitTemplate.convertAndSend(HOTEL_EXCHANGE, HOTEL_DELETE_KEY, id); } } }
|
MqListener-监听消息
package com.ganga.hotel.mq;
import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; import com.ganga.hotel.constant.MqConstant; import com.ganga.hotel.pojo.Hotel; import com.ganga.hotel.pojo.HotelDoc; import com.ganga.hotel.service.impl.HotelService; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListeners; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component public class MqListener {
@Autowired private HotelService hotelService; @Autowired private RestHighLevelClient restClient;
@RabbitListener(queues = MqConstant.HOTEL_INSERT_QUEUE) public void insertOrUpdateQueue(Long id) { Hotel hotel = hotelService.getById(id); HotelDoc hotelDoc = new HotelDoc(hotel); System.err.println(JSON.toJSONString(hotelDoc)); try { IndexRequest request = new IndexRequest("hotel").id(id.toString()); request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); restClient.index(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); }
}
@RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE) public void deleteQueue(String id) { System.err.println(id); try { DeleteRequest request = new DeleteRequest("hotel").id(id); restClient.delete(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
}
|