@heavysheep
2020-11-06T04:00:51.000000Z
字数 8905
阅读 714
代码
class ESQuery(object):def __init__(self, index: str = None, doc_type: str = None, bool: dict = None, collapse: dict = None,dsl: dict = None):"""ES 查询dsl封装:param index: ES index:param doc_type: ES doc_type(如有):param bool: bool dsl:param collapse: collapse dsl.折叠dsl:param dsl: source dsl.传入时直接生成dsl,和bool collapse等解析参数有所出入example:# [build condition]filter_condition = Query.term(field="is_match", value=True)agg_condition = [Aggs.terms(name="group", field="website.keyword", size=500)]# [input condition]query = ESQuery(index="index_1", doc_type="type_1", bool=bools(filter=filter_condition))query.aggs(condition=agg_condition)# [modify attr]query.size(num=5)query.source(include=["title", "content", "website"])query.sort(condition="data_time:desc")# [get result]query_result = query.search()"""if dsl and bool:raise ValueError("不允许同时传入'dsl' 'bool'参数")if dsl and collapse:raise ValueError("不允许同时传入'dsl' 'collapse'参数")self.index = indexself.doc_type = doc_typeself.params = {}self.bool = boolself.dsl = dsl if dsl is not None else {}if bool:self.dsl["query"] = boolif collapse:self.dsl["collapse"] = collapsedef bools(self, must: (dict, list) = None, filter: (dict, list) = None, should: (dict, list) = None,must_not: (dict, list) = None):"""bool 条件:param must: must条件:param filter: filter条件:param should: should条件:param must_not: must not条件"""for name, condition_info in (("must", must), ("filter", filter), ("should", should), ("must_not", must_not)):if self.dsl.get("query") is None:self.dsl["query"] = {"bool": {}}if self.dsl["query"].get("bool") is None:self.dsl["query"]["bool"] = {}if condition_info is not None:condition = _parser_condition(condition_info)if name not in self.dsl["query"]["bool"]:self.dsl["query"]["bool"][name] = conditionelse:self.dsl["query"]["bool"][name].extend(condition)def aggs(self, condition):if isinstance(condition, list):condition = {k: v for c in condition for k, v in c.items()}self.dsl["aggs"] = conditiondef range(self, range_condition: dict):"""区间条件配置方法 对应DSL _sourceES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-range-query.html]:param range_condition: 区间条件"""if self.dsl.get("query") is None:self.dsl["query"] = {"bool": {}}if self.dsl["query"].get("bool") is None:self.dsl["query"]["bool"] = {}if self.dsl["query"]["bool"].get("must") is None:self.dsl["query"]["bool"]["must"] = range_conditionreturnfor i, condition in enumerate(self.dsl["query"]["bool"]["must"]):if "range" in condition.keys():self.dsl["query"]["bool"]["must"][i] = range_conditionreturnself.dsl["query"]["bool"]["must"].append(range_condition)def source(self, include: (str, list) = None, exclude: (str, list) = None):"""字段配置方法 对应DSL _sourceES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-source-filtering.html]:param include: 保留字段:param exclude: 不保留字段"""self.dsl["_source"] = {}if include:self.dsl["_source"]["includes"] = includeif exclude:self.dsl["_source"]["excludes"] = excludedef pagination(self, page: int = 1, per_page: int = 20):"""分页方法 对应DSL from/sizeES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-from-size.html]:param page: 页数,从1开始,默认1:param per_page: 页容量,默认20"""if page is None:page = 1if per_page is None:per_page = 20from_ = (page - 1) * per_pageself.dsl["from"] = from_self.dsl["size"] = per_pagedef sort(self, condition: (str, dict)):"""排序方法,对应DSL sortES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-sort.html]注意此方法是对query对象的排序方法:param condition: 排序条件"""if isinstance(condition, dict):condition = ",".join(("{}:{}".format(c_k, c_v) for c_k, c_v in condition.items()))self.params["sort"] = conditiondef size(self, num: int):"""单容量方法,对应DSL size注意和pagination有冲突:param num: 容量"""self.dsl.update({"size": num})def scroll(self, scroll: str):"""滚动方法 对应DSL scrollES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-scroll.html]:param scroll: 滚动ID"""self.params.update({"scroll": scroll})def search(self, debug=False):"""search方法 对应ES request body search方法ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-uri-request.html]:param debug: 是否打印(info日志)DSL request body"""if debug:logger.info(json.dumps(self.dsl, ensure_ascii=False, indent=4))if self.dsl.get("size") is None:self.size(50)result = es.search(body=self.dsl, index=self.index, doc_type=self.doc_type, params=self.params)result.update({"pager_info": self._get_page_info(result)})return resultdef _get_page_info(self, result: dict):"""分页信息方法,用于返回分页相关情况说明:param result: ES查询结果:return: 分页相关信息"""pager_info = {}if self.dsl.get("from") is not None:pager_info["page"] = self.dsl["from"]pager_info["per_page"] = self.dsl["size"]pager_info["total"] = result["hits"]["total"]pager_info["pages"] = math.ceil(pager_info["total"] / pager_info["per_page"])if self.params.get("sort") is not None:pager_info["order_by"] = self.params["sort"]return pager_infodef clone(self):return ESQuery(index=self.index, doc_type=self.doc_type, dsl=deepcopy(self.dsl))class Aggs(object):@staticmethoddef terms(name: str, field: str, size: int = None, include: (str, dict, list) = None, exclude: (str, list) = None,order: dict = None, child: (dict, list) = None):"""基础聚类方法ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-term-query.html]:param order::param name: 聚类名称:param field: 聚类字段名:param size: 聚类返回前size个:param include: 计入条件(匹配条件则计入聚类):param exclude: 不计入条件(匹配条件则不计入聚类):param child: aggs子条件"""field_condition = {"field": field}if size is not None:field_condition.update({"size": size})if include is not None:field_condition.update({"include": include})if exclude is not None:field_condition.update({"exclude": exclude})if order is not None:field_condition.update({"order": order})if child is not None:aggs = {}if isinstance(child, dict):aggs.update(child)else:for c in child:aggs.update(c)return {name: {"terms": field_condition, "aggs": aggs}}return {name: {"terms": field_condition}}@staticmethoddef date_histogram(name: str, field: str, interval: str, format="yyyy-MM-dd", start: str = None, end: str = None):"""时间聚类方法当有start和end时,强制返回start和end区间所有时间ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-aggregations-bucket-datehistogram-aggregation.html]:param name: 聚类名称:param field: 聚类字段名:param interval: 聚类周期:param format: 返回时间格式:param start: 返回时间区间起始:param end: 返回时间区间结束:return 返回本对象,以支持进一步聚类"""field_condition = {"field": field, "interval": interval, "format": format}if start and end:field_condition.update({"extended_bounds": {"min": start, "max": end}})return {name: {"date_histogram": field_condition}}@staticmethoddef cardinality(name: str, field: str):return {name: {"cardinality": {"field": field}}}@staticmethoddef top_hits(name: str, includes: list = None, sort: list = None, size: int = None):"""top hits必须是terms的一个child"""field_condition = {}if sort:field_condition["sort"] = sortif includes is not None:field_condition["_source"] = {"includes": includes}if size is not None:field_condition["size"] = sizereturn {name: {"top_hits": field_condition}}@staticmethoddef trans_terms_order(order_by: str):order_condition = []for oc in order_by.split(","):oc_split = oc.split(":")if len(oc_split) == 1:order_condition.append({oc_split[0]: "asc"})elif len(oc_split) == 2:order_condition.append({oc_split[0]: oc_split[1]})else:raise KeyError("order_by参数解析失败")return order_conditionclass Query(object):@staticmethoddef terms(field: str, value: list):return {"constant_score": {"filter": {"terms": {field: value}}}}@staticmethoddef match(field: str, value: (str, int), operator: str = "and"):"""查询ES中field包含value的数据ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-match-query.html]:param field: 字段名称:param value: 字段值:param operator: 或or/与and条件"""return {"match": {field: {"query": value, "operator": operator}}}@staticmethoddef match_phrase(field: str, value: (str, int)):"""短语匹配,对应DSL Match Phrase QueryES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-match-query-phrase.html]:param field: 字段名:param value: 字段值:return:"""return {"match_phrase": {field: value}}@staticmethoddef term(field: str, value: (str, int)):"""数据筛选,对应DSL termES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-term-query.html]:param field: 字段名:param value: 字段值"""return {"term": {field: value}}@staticmethoddef range(field: str, start: str, end: str, left: str = "gte", right: str = "lt"):"""筛选ES中符合range条件的数据left/right支持:gte: 大于等于gt: 大于lte: 小于等于lt: 小于ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-range-query.html]:param field: 字段名称:param start: 起始值:param end: 结束值:param left: 起始值匹配条件:param right: 结束值匹配条件"""return {"range": {field: {left: start, right: end}}}@staticmethoddef collapse(field: str, name: str = None, size: int = None, sort: list = None):field_condition = {"field": field}if name is not None or size is not None or sort is not None:inner_hits = {}if name:inner_hits["name"] = nameif size is not None:inner_hits["size"] = sizeif sort:inner_hits["sort"] = sortfield_condition["inner_hits"] = inner_hitsreturn field_conditiondef bools(must: (dict, list) = None, filter: (dict, list) = None, should: (dict, list) = None,must_not: (dict, list) = None):query = {}if must:query["must"] = _parser_condition(must)if filter:query["filter"] = _parser_condition(filter)if should:query["should"] = _parser_condition(should)if must_not:query["must_not"] = _parser_condition(must_not)if not query:return {}return {"bool": query}def _parser_condition(condition):if condition:if isinstance(condition, dict):return [condition]else:return [c for c in condition if c]