@heavysheep
2020-11-06T12:00:51.000000Z
字数 8905
阅读 551
代码
class ESQuery(object):
def __init__(self, index: str = None, doc_type: str = None, bool: dict = None, collapse: dict = None,
dsl: dict = None):
"""
ES 查询dsl封装
:param index: ES index
:param doc_type: ES doc_type(如有)
:param bool: bool dsl
:param collapse: collapse dsl.折叠dsl
:param dsl: source dsl.传入时直接生成dsl,和bool collapse等解析参数有所出入
example:
# [build condition]
filter_condition = Query.term(field="is_match", value=True)
agg_condition = [Aggs.terms(name="group", field="website.keyword", size=500)]
# [input condition]
query = ESQuery(index="index_1", doc_type="type_1", bool=bools(filter=filter_condition))
query.aggs(condition=agg_condition)
# [modify attr]
query.size(num=5)
query.source(include=["title", "content", "website"])
query.sort(condition="data_time:desc")
# [get result]
query_result = query.search()
"""
if dsl and bool:
raise ValueError("不允许同时传入'dsl' 'bool'参数")
if dsl and collapse:
raise ValueError("不允许同时传入'dsl' 'collapse'参数")
self.index = index
self.doc_type = doc_type
self.params = {}
self.bool = bool
self.dsl = dsl if dsl is not None else {}
if bool:
self.dsl["query"] = bool
if collapse:
self.dsl["collapse"] = collapse
def bools(self, must: (dict, list) = None, filter: (dict, list) = None, should: (dict, list) = None,
must_not: (dict, list) = None):
"""
bool 条件
:param must: must条件
:param filter: filter条件
:param should: should条件
:param must_not: must not条件
"""
for name, condition_info in (("must", must), ("filter", filter), ("should", should), ("must_not", must_not)):
if self.dsl.get("query") is None:
self.dsl["query"] = {"bool": {}}
if self.dsl["query"].get("bool") is None:
self.dsl["query"]["bool"] = {}
if condition_info is not None:
condition = _parser_condition(condition_info)
if name not in self.dsl["query"]["bool"]:
self.dsl["query"]["bool"][name] = condition
else:
self.dsl["query"]["bool"][name].extend(condition)
def aggs(self, condition):
if isinstance(condition, list):
condition = {k: v for c in condition for k, v in c.items()}
self.dsl["aggs"] = condition
def range(self, range_condition: dict):
"""
区间条件配置方法 对应DSL _source
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-range-query.html]
:param range_condition: 区间条件
"""
if self.dsl.get("query") is None:
self.dsl["query"] = {"bool": {}}
if self.dsl["query"].get("bool") is None:
self.dsl["query"]["bool"] = {}
if self.dsl["query"]["bool"].get("must") is None:
self.dsl["query"]["bool"]["must"] = range_condition
return
for i, condition in enumerate(self.dsl["query"]["bool"]["must"]):
if "range" in condition.keys():
self.dsl["query"]["bool"]["must"][i] = range_condition
return
self.dsl["query"]["bool"]["must"].append(range_condition)
def source(self, include: (str, list) = None, exclude: (str, list) = None):
"""
字段配置方法 对应DSL _source
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-source-filtering.html]
:param include: 保留字段
:param exclude: 不保留字段
"""
self.dsl["_source"] = {}
if include:
self.dsl["_source"]["includes"] = include
if exclude:
self.dsl["_source"]["excludes"] = exclude
def pagination(self, page: int = 1, per_page: int = 20):
"""
分页方法 对应DSL from/size
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-from-size.html]
:param page: 页数,从1开始,默认1
:param per_page: 页容量,默认20
"""
if page is None:
page = 1
if per_page is None:
per_page = 20
from_ = (page - 1) * per_page
self.dsl["from"] = from_
self.dsl["size"] = per_page
def sort(self, condition: (str, dict)):
"""
排序方法,对应DSL sort
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-sort.html]
注意此方法是对query对象的排序方法
:param condition: 排序条件
"""
if isinstance(condition, dict):
condition = ",".join(("{}:{}".format(c_k, c_v) for c_k, c_v in condition.items()))
self.params["sort"] = condition
def size(self, num: int):
"""
单容量方法,对应DSL size
注意和pagination有冲突
:param num: 容量
"""
self.dsl.update({"size": num})
def scroll(self, scroll: str):
"""
滚动方法 对应DSL scroll
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-scroll.html]
:param scroll: 滚动ID
"""
self.params.update({"scroll": scroll})
def search(self, debug=False):
"""
search方法 对应ES request body search方法
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-uri-request.html]
:param debug: 是否打印(info日志)DSL request body
"""
if debug:
logger.info(json.dumps(self.dsl, ensure_ascii=False, indent=4))
if self.dsl.get("size") is None:
self.size(50)
result = es.search(body=self.dsl, index=self.index, doc_type=self.doc_type, params=self.params)
result.update({"pager_info": self._get_page_info(result)})
return result
def _get_page_info(self, result: dict):
"""
分页信息方法,用于返回分页相关情况说明
:param result: ES查询结果
:return: 分页相关信息
"""
pager_info = {}
if self.dsl.get("from") is not None:
pager_info["page"] = self.dsl["from"]
pager_info["per_page"] = self.dsl["size"]
pager_info["total"] = result["hits"]["total"]
pager_info["pages"] = math.ceil(pager_info["total"] / pager_info["per_page"])
if self.params.get("sort") is not None:
pager_info["order_by"] = self.params["sort"]
return pager_info
def clone(self):
return ESQuery(index=self.index, doc_type=self.doc_type, dsl=deepcopy(self.dsl))
class Aggs(object):
@staticmethod
def terms(name: str, field: str, size: int = None, include: (str, dict, list) = None, exclude: (str, list) = None,
order: dict = None, child: (dict, list) = None):
"""
基础聚类方法
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-term-query.html]
:param order:
:param name: 聚类名称
:param field: 聚类字段名
:param size: 聚类返回前size个
:param include: 计入条件(匹配条件则计入聚类)
:param exclude: 不计入条件(匹配条件则不计入聚类)
:param child: aggs子条件
"""
field_condition = {"field": field}
if size is not None:
field_condition.update({"size": size})
if include is not None:
field_condition.update({"include": include})
if exclude is not None:
field_condition.update({"exclude": exclude})
if order is not None:
field_condition.update({"order": order})
if child is not None:
aggs = {}
if isinstance(child, dict):
aggs.update(child)
else:
for c in child:
aggs.update(c)
return {name: {"terms": field_condition, "aggs": aggs}}
return {name: {"terms": field_condition}}
@staticmethod
def date_histogram(name: str, field: str, interval: str, format="yyyy-MM-dd", start: str = None, end: str = None):
"""
时间聚类方法
当有start和end时,强制返回start和end区间所有时间
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-aggregations-bucket-datehistogram-aggregation.html]
:param name: 聚类名称
:param field: 聚类字段名
:param interval: 聚类周期
:param format: 返回时间格式
:param start: 返回时间区间起始
:param end: 返回时间区间结束
:return 返回本对象,以支持进一步聚类
"""
field_condition = {"field": field, "interval": interval, "format": format}
if start and end:
field_condition.update({"extended_bounds": {"min": start, "max": end}})
return {name: {"date_histogram": field_condition}}
@staticmethod
def cardinality(name: str, field: str):
return {name: {"cardinality": {"field": field}}}
@staticmethod
def top_hits(name: str, includes: list = None, sort: list = None, size: int = None):
"""
top hits必须是terms的一个child
"""
field_condition = {}
if sort:
field_condition["sort"] = sort
if includes is not None:
field_condition["_source"] = {"includes": includes}
if size is not None:
field_condition["size"] = size
return {name: {"top_hits": field_condition}}
@staticmethod
def trans_terms_order(order_by: str):
order_condition = []
for oc in order_by.split(","):
oc_split = oc.split(":")
if len(oc_split) == 1:
order_condition.append({oc_split[0]: "asc"})
elif len(oc_split) == 2:
order_condition.append({oc_split[0]: oc_split[1]})
else:
raise KeyError("order_by参数解析失败")
return order_condition
class Query(object):
@staticmethod
def terms(field: str, value: list):
return {"constant_score": {"filter": {"terms": {field: value}}}}
@staticmethod
def match(field: str, value: (str, int), operator: str = "and"):
"""
查询ES中field包含value的数据
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-match-query.html]
:param field: 字段名称
:param value: 字段值
:param operator: 或or/与and条件
"""
return {"match": {field: {"query": value, "operator": operator}}}
@staticmethod
def match_phrase(field: str, value: (str, int)):
"""
短语匹配,对应DSL Match Phrase Query
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-match-query-phrase.html]
:param field: 字段名
:param value: 字段值
:return:
"""
return {"match_phrase": {field: value}}
@staticmethod
def term(field: str, value: (str, int)):
"""
数据筛选,对应DSL term
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-term-query.html]
:param field: 字段名
:param value: 字段值
"""
return {"term": {field: value}}
@staticmethod
def range(field: str, start: str, end: str, left: str = "gte", right: str = "lt"):
"""
筛选ES中符合range条件的数据
left/right支持:
gte: 大于等于
gt: 大于
lte: 小于等于
lt: 小于
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-range-query.html]
:param field: 字段名称
:param start: 起始值
:param end: 结束值
:param left: 起始值匹配条件
:param right: 结束值匹配条件
"""
return {"range": {field: {left: start, right: end}}}
@staticmethod
def collapse(field: str, name: str = None, size: int = None, sort: list = None):
field_condition = {"field": field}
if name is not None or size is not None or sort is not None:
inner_hits = {}
if name:
inner_hits["name"] = name
if size is not None:
inner_hits["size"] = size
if sort:
inner_hits["sort"] = sort
field_condition["inner_hits"] = inner_hits
return field_condition
def bools(must: (dict, list) = None, filter: (dict, list) = None, should: (dict, list) = None,
must_not: (dict, list) = None):
query = {}
if must:
query["must"] = _parser_condition(must)
if filter:
query["filter"] = _parser_condition(filter)
if should:
query["should"] = _parser_condition(should)
if must_not:
query["must_not"] = _parser_condition(must_not)
if not query:
return {}
return {"bool": query}
def _parser_condition(condition):
if condition:
if isinstance(condition, dict):
return [condition]
else:
return [c for c in condition if c]