文档
Elasticsearch 从零到实战:站内搜索系统
1. 背景与概念
1.1 倒排索引原理
Elasticsearch 的核心是倒排索引——不同于顺序扫描,它为每个词建立"词→文档列表"映射:
正向索引: doc1 → ["深入", "理解", "计算机", "系统"]
倒排索引: "计算机" → [doc1, doc3, doc7]
"系统" → [doc1, doc4, doc9]
查询"计算机 系统"时,直接取交集或并集,O(1) 定位。
1.2 核心概念速查
| ES 概念 | 类比 SQL | 说明 |
|---|---|---|
| Index | Database | 文档集合 |
| Type(7.x 废弃) | Table | 同一索引下不再分类型 |
| Document | Row | JSON 格式记录 |
| Field | Column | 文档属性 |
| Mapping | Schema | 字段类型定义 |
| Shard | Partition | 数据分片(水平切分) |
| Replica | Replica | 副本(高可用) |
2. 分步实战:构建电商商品搜索
场景
电商平台需要支持:关键词搜索、分类过滤、价格区间、排序、自动补全。
步骤一:设计索引 Mapping
PUT /products
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": {
"analyzer": {
"ik_smart_analyzer": {
"type": "custom",
"tokenizer": "ik_smart"
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {"type": "keyword"}
}
},
"category": {"type": "keyword"},
"price": {"type": "double"},
"brand": {"type": "keyword"},
"rating": {"type": "float"},
"stock": {"type": "integer"},
"description": {"type": "text"},
"created_at": {"type": "date"}
}
}
}
步骤二:插入商品数据
from elasticsearch import Elasticsearch
from faker import Faker
import random
es = Elasticsearch(['http://localhost:9200'])
fake = Faker()
categories = ['电子产品', '图书', '服装', '食品', '家居']
brands = ['华为', '苹果', '小米', '索尼', '三星', '美的']
for i in range(1, 101):
product = {
'name': f"{fake.word().capitalize()} {random.choice(['Pro', 'Max', 'Lite', 'Air'])}",
'category': random.choice(categories),
'price': round(random.uniform(9.9, 9999), 2),
'brand': random.choice(brands),
'rating': round(random.uniform(1, 5), 1),
'stock': random.randint(0, 500),
'description': fake.sentence(),
'created_at': fake.date_this_year().isoformat()
}
es.index(index='products', id=i, body=product)
es.indices.refresh(index='products')
print(f"已索引 100 个商品, 总数: {es.count(index='products')['count']}")
步骤三:实现搜索功能
def search_products(keyword, category=None, min_price=None, max_price=None, sort_by=None):
"""电商搜索引擎"""
must = []
filters = []
if keyword:
must.append({
'multi_match': {
'query': keyword,
'fields': ['name^3', 'description', 'brand^2']
}
})
else:
must.append({'match_all': {}}) # 无关键词时返回全部
if category:
filters.append({'term': {'category': category}})
if min_price is not None or max_price is not None:
price_range = {}
if min_price: price_range['gte'] = min_price
if max_price: price_range['lte'] = max_price
filters.append({'range': {'price': price_range}})
body = {
'query': {
'bool': {
'must': must,
'filter': filters
}
},
'highlight': {
'fields': {'name': {}, 'description': {}}
}
}
# 排序
if sort_by == 'price_asc':
body['sort'] = [{'price': 'asc'}]
elif sort_by == 'price_desc':
body['sort'] = [{'price': 'desc'}]
elif sort_by == 'rating':
body['sort'] = [{'rating': 'desc'}]
result = es.search(index='products', body=body)
return result['hits']
# 搜索示例
hits = search_products(keyword='手机', category='电子产品', min_price=100, max_price=5000, sort_by='rating')
print(f"找到 {hits['total']['value']} 条结果")
for hit in hits['hits']:
print(f" {hit['_source']['name']} - ¥{hit['_source']['price']} - ⭐{hit['_source']['rating']}")
步骤四:聚合分析
# 按分类统计数量
agg_result = es.search(index='products', body={
'size': 0,
'aggs': {
'category_stats': {'terms': {'field': 'category'}},
'avg_price': {'avg': {'field': 'price'}},
'price_histogram': {
'histogram': {'field': 'price', 'interval': 500}
}
}
})
print(agg_result['aggregations']['avg_price'])
3. 思考题
- 中文搜索与英文搜索有何不同?为什么中文需要 IK 分词器?
- 当商品数量达到亿级,如何调整分片策略保证搜索性能?
- 实现"搜索关键词自动补全"功能,需要使用什么类型的 mapping?