[关闭]
@heavysheep 2020-11-06T12:00:51.000000Z 字数 8905 阅读 551

ES封装

代码


  1. class ESQuery(object):
  2. def __init__(self, index: str = None, doc_type: str = None, bool: dict = None, collapse: dict = None,
  3. dsl: dict = None):
  4. """
  5. ES 查询dsl封装
  6. :param index: ES index
  7. :param doc_type: ES doc_type(如有)
  8. :param bool: bool dsl
  9. :param collapse: collapse dsl.折叠dsl
  10. :param dsl: source dsl.传入时直接生成dsl,和bool collapse等解析参数有所出入
  11. example:
  12. # [build condition]
  13. filter_condition = Query.term(field="is_match", value=True)
  14. agg_condition = [Aggs.terms(name="group", field="website.keyword", size=500)]
  15. # [input condition]
  16. query = ESQuery(index="index_1", doc_type="type_1", bool=bools(filter=filter_condition))
  17. query.aggs(condition=agg_condition)
  18. # [modify attr]
  19. query.size(num=5)
  20. query.source(include=["title", "content", "website"])
  21. query.sort(condition="data_time:desc")
  22. # [get result]
  23. query_result = query.search()
  24. """
  25. if dsl and bool:
  26. raise ValueError("不允许同时传入'dsl' 'bool'参数")
  27. if dsl and collapse:
  28. raise ValueError("不允许同时传入'dsl' 'collapse'参数")
  29. self.index = index
  30. self.doc_type = doc_type
  31. self.params = {}
  32. self.bool = bool
  33. self.dsl = dsl if dsl is not None else {}
  34. if bool:
  35. self.dsl["query"] = bool
  36. if collapse:
  37. self.dsl["collapse"] = collapse
  38. def bools(self, must: (dict, list) = None, filter: (dict, list) = None, should: (dict, list) = None,
  39. must_not: (dict, list) = None):
  40. """
  41. bool 条件
  42. :param must: must条件
  43. :param filter: filter条件
  44. :param should: should条件
  45. :param must_not: must not条件
  46. """
  47. for name, condition_info in (("must", must), ("filter", filter), ("should", should), ("must_not", must_not)):
  48. if self.dsl.get("query") is None:
  49. self.dsl["query"] = {"bool": {}}
  50. if self.dsl["query"].get("bool") is None:
  51. self.dsl["query"]["bool"] = {}
  52. if condition_info is not None:
  53. condition = _parser_condition(condition_info)
  54. if name not in self.dsl["query"]["bool"]:
  55. self.dsl["query"]["bool"][name] = condition
  56. else:
  57. self.dsl["query"]["bool"][name].extend(condition)
  58. def aggs(self, condition):
  59. if isinstance(condition, list):
  60. condition = {k: v for c in condition for k, v in c.items()}
  61. self.dsl["aggs"] = condition
  62. def range(self, range_condition: dict):
  63. """
  64. 区间条件配置方法 对应DSL _source
  65. ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-range-query.html]
  66. :param range_condition: 区间条件
  67. """
  68. if self.dsl.get("query") is None:
  69. self.dsl["query"] = {"bool": {}}
  70. if self.dsl["query"].get("bool") is None:
  71. self.dsl["query"]["bool"] = {}
  72. if self.dsl["query"]["bool"].get("must") is None:
  73. self.dsl["query"]["bool"]["must"] = range_condition
  74. return
  75. for i, condition in enumerate(self.dsl["query"]["bool"]["must"]):
  76. if "range" in condition.keys():
  77. self.dsl["query"]["bool"]["must"][i] = range_condition
  78. return
  79. self.dsl["query"]["bool"]["must"].append(range_condition)
  80. def source(self, include: (str, list) = None, exclude: (str, list) = None):
  81. """
  82. 字段配置方法 对应DSL _source
  83. ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-source-filtering.html]
  84. :param include: 保留字段
  85. :param exclude: 不保留字段
  86. """
  87. self.dsl["_source"] = {}
  88. if include:
  89. self.dsl["_source"]["includes"] = include
  90. if exclude:
  91. self.dsl["_source"]["excludes"] = exclude
  92. def pagination(self, page: int = 1, per_page: int = 20):
  93. """
  94. 分页方法 对应DSL from/size
  95. ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-from-size.html]
  96. :param page: 页数,从1开始,默认1
  97. :param per_page: 页容量,默认20
  98. """
  99. if page is None:
  100. page = 1
  101. if per_page is None:
  102. per_page = 20
  103. from_ = (page - 1) * per_page
  104. self.dsl["from"] = from_
  105. self.dsl["size"] = per_page
  106. def sort(self, condition: (str, dict)):
  107. """
  108. 排序方法,对应DSL sort
  109. ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-sort.html]
  110. 注意此方法是对query对象的排序方法
  111. :param condition: 排序条件
  112. """
  113. if isinstance(condition, dict):
  114. condition = ",".join(("{}:{}".format(c_k, c_v) for c_k, c_v in condition.items()))
  115. self.params["sort"] = condition
  116. def size(self, num: int):
  117. """
  118. 单容量方法,对应DSL size
  119. 注意和pagination有冲突
  120. :param num: 容量
  121. """
  122. self.dsl.update({"size": num})
  123. def scroll(self, scroll: str):
  124. """
  125. 滚动方法 对应DSL scroll
  126. ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-scroll.html]
  127. :param scroll: 滚动ID
  128. """
  129. self.params.update({"scroll": scroll})
  130. def search(self, debug=False):
  131. """
  132. search方法 对应ES request body search方法
  133. ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-uri-request.html]
  134. :param debug: 是否打印(info日志)DSL request body
  135. """
  136. if debug:
  137. logger.info(json.dumps(self.dsl, ensure_ascii=False, indent=4))
  138. if self.dsl.get("size") is None:
  139. self.size(50)
  140. result = es.search(body=self.dsl, index=self.index, doc_type=self.doc_type, params=self.params)
  141. result.update({"pager_info": self._get_page_info(result)})
  142. return result
  143. def _get_page_info(self, result: dict):
  144. """
  145. 分页信息方法,用于返回分页相关情况说明
  146. :param result: ES查询结果
  147. :return: 分页相关信息
  148. """
  149. pager_info = {}
  150. if self.dsl.get("from") is not None:
  151. pager_info["page"] = self.dsl["from"]
  152. pager_info["per_page"] = self.dsl["size"]
  153. pager_info["total"] = result["hits"]["total"]
  154. pager_info["pages"] = math.ceil(pager_info["total"] / pager_info["per_page"])
  155. if self.params.get("sort") is not None:
  156. pager_info["order_by"] = self.params["sort"]
  157. return pager_info
  158. def clone(self):
  159. return ESQuery(index=self.index, doc_type=self.doc_type, dsl=deepcopy(self.dsl))
  160. class Aggs(object):
  161. @staticmethod
  162. def terms(name: str, field: str, size: int = None, include: (str, dict, list) = None, exclude: (str, list) = None,
  163. order: dict = None, child: (dict, list) = None):
  164. """
  165. 基础聚类方法
  166. ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-term-query.html]
  167. :param order:
  168. :param name: 聚类名称
  169. :param field: 聚类字段名
  170. :param size: 聚类返回前size个
  171. :param include: 计入条件(匹配条件则计入聚类)
  172. :param exclude: 不计入条件(匹配条件则不计入聚类)
  173. :param child: aggs子条件
  174. """
  175. field_condition = {"field": field}
  176. if size is not None:
  177. field_condition.update({"size": size})
  178. if include is not None:
  179. field_condition.update({"include": include})
  180. if exclude is not None:
  181. field_condition.update({"exclude": exclude})
  182. if order is not None:
  183. field_condition.update({"order": order})
  184. if child is not None:
  185. aggs = {}
  186. if isinstance(child, dict):
  187. aggs.update(child)
  188. else:
  189. for c in child:
  190. aggs.update(c)
  191. return {name: {"terms": field_condition, "aggs": aggs}}
  192. return {name: {"terms": field_condition}}
  193. @staticmethod
  194. def date_histogram(name: str, field: str, interval: str, format="yyyy-MM-dd", start: str = None, end: str = None):
  195. """
  196. 时间聚类方法
  197. 当有start和end时,强制返回start和end区间所有时间
  198. ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-aggregations-bucket-datehistogram-aggregation.html]
  199. :param name: 聚类名称
  200. :param field: 聚类字段名
  201. :param interval: 聚类周期
  202. :param format: 返回时间格式
  203. :param start: 返回时间区间起始
  204. :param end: 返回时间区间结束
  205. :return 返回本对象,以支持进一步聚类
  206. """
  207. field_condition = {"field": field, "interval": interval, "format": format}
  208. if start and end:
  209. field_condition.update({"extended_bounds": {"min": start, "max": end}})
  210. return {name: {"date_histogram": field_condition}}
  211. @staticmethod
  212. def cardinality(name: str, field: str):
  213. return {name: {"cardinality": {"field": field}}}
  214. @staticmethod
  215. def top_hits(name: str, includes: list = None, sort: list = None, size: int = None):
  216. """
  217. top hits必须是terms的一个child
  218. """
  219. field_condition = {}
  220. if sort:
  221. field_condition["sort"] = sort
  222. if includes is not None:
  223. field_condition["_source"] = {"includes": includes}
  224. if size is not None:
  225. field_condition["size"] = size
  226. return {name: {"top_hits": field_condition}}
  227. @staticmethod
  228. def trans_terms_order(order_by: str):
  229. order_condition = []
  230. for oc in order_by.split(","):
  231. oc_split = oc.split(":")
  232. if len(oc_split) == 1:
  233. order_condition.append({oc_split[0]: "asc"})
  234. elif len(oc_split) == 2:
  235. order_condition.append({oc_split[0]: oc_split[1]})
  236. else:
  237. raise KeyError("order_by参数解析失败")
  238. return order_condition
  239. class Query(object):
  240. @staticmethod
  241. def terms(field: str, value: list):
  242. return {"constant_score": {"filter": {"terms": {field: value}}}}
  243. @staticmethod
  244. def match(field: str, value: (str, int), operator: str = "and"):
  245. """
  246. 查询ES中field包含value的数据
  247. ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-match-query.html]
  248. :param field: 字段名称
  249. :param value: 字段值
  250. :param operator: 或or/与and条件
  251. """
  252. return {"match": {field: {"query": value, "operator": operator}}}
  253. @staticmethod
  254. def match_phrase(field: str, value: (str, int)):
  255. """
  256. 短语匹配,对应DSL Match Phrase Query
  257. ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-match-query-phrase.html]
  258. :param field: 字段名
  259. :param value: 字段值
  260. :return:
  261. """
  262. return {"match_phrase": {field: value}}
  263. @staticmethod
  264. def term(field: str, value: (str, int)):
  265. """
  266. 数据筛选,对应DSL term
  267. ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-term-query.html]
  268. :param field: 字段名
  269. :param value: 字段值
  270. """
  271. return {"term": {field: value}}
  272. @staticmethod
  273. def range(field: str, start: str, end: str, left: str = "gte", right: str = "lt"):
  274. """
  275. 筛选ES中符合range条件的数据
  276. left/right支持:
  277. gte: 大于等于
  278. gt: 大于
  279. lte: 小于等于
  280. lt: 小于
  281. ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-range-query.html]
  282. :param field: 字段名称
  283. :param start: 起始值
  284. :param end: 结束值
  285. :param left: 起始值匹配条件
  286. :param right: 结束值匹配条件
  287. """
  288. return {"range": {field: {left: start, right: end}}}
  289. @staticmethod
  290. def collapse(field: str, name: str = None, size: int = None, sort: list = None):
  291. field_condition = {"field": field}
  292. if name is not None or size is not None or sort is not None:
  293. inner_hits = {}
  294. if name:
  295. inner_hits["name"] = name
  296. if size is not None:
  297. inner_hits["size"] = size
  298. if sort:
  299. inner_hits["sort"] = sort
  300. field_condition["inner_hits"] = inner_hits
  301. return field_condition
  302. def bools(must: (dict, list) = None, filter: (dict, list) = None, should: (dict, list) = None,
  303. must_not: (dict, list) = None):
  304. query = {}
  305. if must:
  306. query["must"] = _parser_condition(must)
  307. if filter:
  308. query["filter"] = _parser_condition(filter)
  309. if should:
  310. query["should"] = _parser_condition(should)
  311. if must_not:
  312. query["must_not"] = _parser_condition(must_not)
  313. if not query:
  314. return {}
  315. return {"bool": query}
  316. def _parser_condition(condition):
  317. if condition:
  318. if isinstance(condition, dict):
  319. return [condition]
  320. else:
  321. return [c for c in condition if c]
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注