@heavysheep
2020-11-03T03:15:29.000000Z
字数 12014
阅读 789
代码质量
项目: 北京网信办数据中心
归属人: 王子
业务逻辑: 审核文本的多个方法类,包含对不同关键词、规则的匹配、同音、象形等匹配方法。
代码链接: https://git.datagrand.com/wbyy_group/bjwxb/bjwxb_alarm/tree/master/app/controller
位置: 各方法类
class WordController(object):data_path = "data"def __init__(self):self._element = "match"self._word_dict = self._load_dict()self._ac = self._compile_ac()def match(self, text: str):...def parser(self, text):match_result = self.match(text)for k in match_result:match_result[k] = 0.99 if len(match_result[k]) > 1 else 0.9return match_resultdef _load_dict(self):...def _compile_ac(self):ac = ahocorasick.Automaton()for w in self._word_dict.keys():if w.strip():ac.add_word(w, w)ac.make_automaton()return ac
class PinYinController(object):data_path = "data"def __init__(self):self._element = "pinyin"self._pinyin_dict = self._load_dict()self._ac = self._compile_ac()def match(self, text: str):...def parser(self, text):match_result = self.match(text)for k in match_result:match_result[k] = pow(2.2, len(match_result)) / 100return match_resultdef _load_dict(self):...return pinyin_dictdef _compile_ac(self):ac = ahocorasick.Automaton()for w in self._pinyin_dict.keys():if w.strip():ac.add_word(w, w)ac.make_automaton()return ac@staticmethoddef _join(word: str, join_str: str = "_"):return join_str.join(pypinyin.lazy_pinyin(word))
项目: 北京网信办数据中心
归属人: 韩伟
业务逻辑: 从ES中读取批量数据并聚类相似事件
代码链接: https://git.datagrand.com/wbyy_group/bjwxb/bjwxb_alarm/tree/master/app/controller
位置: 全文件
class ThreadWithReturn(threading.Thread):"""定义带返回值的线程类Args:threading ([type]): [description]"""def __init__(self, func, args=()):super(ThreadWithReturn, self).__init__()self.func = funcself.args = argsdef run(self):self.result = self.func(*self.args)def get_result(self):try:return self.resultexcept Exception:return Noneclass SimiFinder:def __init__(self):self.es_recall = ESRecall()self.text_simi_ = TextSimi()@time_costdef simi_finder(self, text_list):try:info_dict = self.process_data(text_list)t_es = ThreadWithReturn(self._get_es_simi, args=(info_dict,))t_es.start()t_self_simi = ThreadWithReturn(self._get_self_simi, args=(info_dict,))t_self_simi.start()t_es.join()es_simi_dict = t_es.get_result()t_self_simi.join()self_simi_dict = t_self_simi.get_result()for i in range(len(text_list)):term_dict = text_list[i]item_id = term_dict['fields']['itemid']item_es_simi_list = es_simi_dict.get(item_id, [])item_self_simi_list = self_simi_dict.get(item_id, [])simi_result = item_es_simi_list + item_self_simi_listif simi_result:term_dict['fields'].setdefault('has_repeated', True)else:term_dict['fields'].setdefault('has_repeated', False)term_dict['fields'].setdefault('repeatedItems', simi_result)text_list[i] = term_dictexcept:import tracebacklogger.error('Get simi error: {}'.format(traceback.format_exc()))text_list=[]return text_list
项目: 某情感分类项目
归属人: 房悦竹
业务逻辑: 获取评论内容的情感得分
代码链接: https://git.datagrand.com/wbyy_group/text_sentiment/blob/master/text_sentiment/src/sentiment_dict.py
位置: 88:207
...# @timingdef get_review_score(self, review):"""返回基于通用词典规则的评论得分:param review: 评论内容:return: review_score: [x, y], x-正面分, y-负面分, result: {0:x, 1:y}, x-负面分, y-正面分, hit_word_list:"""sentences = tp.split_sentence(review)senti_score = []hit_word_list = []for sen in sentences:zhuanzhe_flag = 0 # set zhuanzhe flag# seg = self.wordseg.cut(sen, with_stop_word=False, model=None)seg = self.wordseg.lcut(sen)seg = process_contain_no(seg) # 特殊处理分词结果的“不”字cur_word_pos = 0 # current word positionsenti_word_pos = 0 # sentiment word positionpos_score = 0neg_score = 0seg_len = len(seg)for word in seg:current_neg = 0current_pos = 0if word in self.zhuanzhe_dict:zhuanzhe_flag = self.zhuanzhe_dict[word]hit_word = 'zhuanzhe:' + wordhit_word_list.append(hit_word)if word in self.pos_dict:hit_word = 'pos:' + wordhit_word_list.append(hit_word)doubt_flag = False # 是否找到疑问词current_pos += 1start_pos = max(cur_word_pos - 5, senti_word_pos)for w in seg[start_pos:cur_word_pos]:current_pos = self.match_adv(w, current_pos, hit_word_list)if cur_word_pos != 0:s_idx = max([cur_word_pos - 2, 0]) # 往前找两个词for w in seg[s_idx:cur_word_pos]:current_pos, doubt_flag, hit_word_list = self.match_doubt_forward(w, current_pos,hit_word_list)if doubt_flag:breakif not doubt_flag: # 找到疑问词后不再向后寻找if cur_word_pos < seg_len - 1:e_idx = min([cur_word_pos + 3, seg_len]) # 往后寻找两个词for w in seg[cur_word_pos + 1:e_idx]:current_pos, doubt_flag, hit_word_list = self.match_doubt_backward(w, current_pos,hit_word_list)if doubt_flag:breakpos_score += current_possenti_word_pos = cur_word_poselif word in self.neg_dict:hit_word = 'neg:' + wordhit_word_list.append(hit_word)doubt_flag = False # 是否找到疑问词current_neg += 1start_pos = max(cur_word_pos - 5, senti_word_pos)for w in seg[start_pos:cur_word_pos]:current_neg = self.match_adv(w, current_neg, hit_word_list)if cur_word_pos != 0:s_idx = max([cur_word_pos - 2, 0]) # 往前找两个词for w in seg[s_idx:cur_word_pos]:current_neg, doubt_flag, hit_word_list = self.match_doubt_forward(w, current_neg,hit_word_list)if doubt_flag:breakif not doubt_flag: # 找到疑问词后不再向后寻找if cur_word_pos < seg_len - 1:e_idx = min([cur_word_pos + 3, seg_len]) # 往后寻找两个词for w in seg[cur_word_pos + 1:e_idx]:current_neg, doubt_flag, hit_word_list = self.match_doubt_backward(w, current_neg,hit_word_list)if doubt_flag:breakneg_score += current_negsenti_word_pos = cur_word_poselif word in self.insult_dict:hit_word = 'insult:' + wordhit_word_list.append(hit_word)current_neg += 1 * 2 # 对在insult_dict中的词赋予2.0的权重neg_score += current_negcur_word_pos += 1pos_score, neg_score = zhuanzhe_process(zhuanzhe_flag, pos_score, neg_score, senti_score)senti_score.append(transform_to_positive_num(pos_score, neg_score))# pay more attention to last sentenceif senti_score[-1][0] > senti_score[-1][1]:senti_score[-1][0] *= 1.5if senti_score[-1][1] > senti_score[-1][0]:senti_score[-1][1] *= 1.5review_score = sum_sentences_score(senti_score) # [Pos, Neg]result = {}if review_score[0] == 0 and review_score[1] == 0:result[0] = 0.5result[1] = 0.5elif review_score[0] == 0: # 词典正面分为0result[0] = 0.75 # 原值为1,改为0.8result[1] = 0.25elif review_score[1] == 0: # 词典负面分为0result[0] = 0.25result[1] = 0.75else:temp_sum = review_score[0] + review_score[1]result[0] = round(review_score[1] / temp_sum, 2)result[1] = round(review_score[0] / temp_sum, 2)return review_score, result, hit_word_list...
项目: VOC核心
业务逻辑: 封装ES DSL
代码链接: https://git.datagrand.com/voc/voc_api/blob/master/app/common/es.py
位置: 25:368行
class ESQuery(object):def __init__(self, index: str = None, doc_type: str = None, bool: dict = None, collapse: dict = None,dsl: dict = None):"""ES 查询dsl封装:param index: ES index:param doc_type: ES doc_type(如有):param bool: bool dsl:param collapse: collapse dsl.折叠dsl:param dsl: source dsl.传入时直接生成dsl,和bool collapse等解析参数有所出入example:# [build condition]filter_condition = Query.term(field="is_match", value=True)agg_condition = [Aggs.terms(name="group", field="website.keyword", size=500)]# [input query]query = ESQuery(index="index_1", doc_type="type_1", bool=bools(filter=filter_condition))query.aggs(condition=agg_condition)# [modify attr]query.size(0)query.sort("datetime:desc")# [get result]query_result, _ = query.search()"""if dsl and bool:raise ValueError("不允许同时传入'dsl' 'bool'参数")if dsl and collapse:raise ValueError("不允许同时传入'dsl' 'collapse'参数")self.index = indexself.doc_type = doc_typeself.params = {}self.bool = boolself.dsl = dsl if dsl is not None else {}if bool:self.dsl["query"] = boolif collapse:self.dsl["collapse"] = collapsedef bools(self, must: (dict, list) = None, filter: (dict, list) = None, should: (dict, list) = None,must_not: (dict, list) = None):"""bool 条件:param must: must条件:param filter: filter条件:param should: should条件:param must_not: must not条件"""for name, condition_info in (("must", must), ("filter", filter), ("should", should), ("must_not", must_not)):if self.dsl.get("query") is None:self.dsl["query"] = {"bool": {}}if self.dsl["query"].get("bool") is None:self.dsl["query"]["bool"] = {}if condition_info is not None:condition = _parser_condition(condition_info)if name not in self.dsl["query"]["bool"]:self.dsl["query"]["bool"][name] = conditionelse:self.dsl["query"]["bool"][name].extend(condition)def aggs(self, condition):if isinstance(condition, list):condition = {k: v for c in condition for k, v in c.items()}self.dsl["aggs"] = conditiondef range(self, range_condition: dict):"""区间条件配置方法 对应DSL _sourceES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-range-query.html]:param range_condition: 区间条件"""if self.dsl.get("query") is None:self.dsl["query"] = {"bool": {}}if self.dsl["query"].get("bool") is None:self.dsl["query"]["bool"] = {}if self.dsl["query"]["bool"].get("must") is None:self.dsl["query"]["bool"]["must"] = range_conditionreturnfor i, condition in enumerate(self.dsl["query"]["bool"]["must"]):if "range" in condition.keys():self.dsl["query"]["bool"]["must"][i] = range_conditionreturnself.dsl["query"]["bool"]["must"].append(range_condition)def source(self, include: (str, list) = None, exclude: (str, list) = None):"""字段配置方法 对应DSL _sourceES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-source-filtering.html]:param include: 保留字段:param exclude: 不保留字段"""self.dsl["_source"] = {}if include:self.dsl["_source"]["includes"] = includeif exclude:self.dsl["_source"]["excludes"] = excludedef pagination(self, page: int = 1, per_page: int = 20):"""分页方法 对应DSL from/sizeES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-from-size.html]:param page: 页数,从1开始,默认1:param per_page: 页容量,默认20"""if page is None:page = 1if per_page is None:per_page = 20from_ = (page - 1) * per_pageself.dsl["from"] = from_self.dsl["size"] = per_pagedef sort(self, condition: (str, dict)):"""排序方法,对应DSL sortES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-sort.html]注意此方法是对query对象的排序方法:param condition: 排序条件"""if isinstance(condition, dict):condition = ",".join(("{}:{}".format(c_k, c_v) for c_k, c_v in condition.items()))self.params["sort"] = conditiondef size(self, num: int):"""单容量方法,对应DSL size注意和pagination有冲突:param num: 容量"""self.dsl.update({"size": num})def scroll(self, scroll: str):"""滚动方法 对应DSL scrollES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-scroll.html]:param scroll: 滚动ID"""self.params.update({"scroll": scroll})def search(self, debug=False):"""search方法 对应ES request body search方法ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-uri-request.html]:param debug: 是否打印(info日志)DSL request body"""if debug:logger.info(json.dumps(self.dsl, ensure_ascii=False, indent=4))if self.dsl.get("size") is None:self.size(50)result = es.search(body=self.dsl, index=self.index, doc_type=self.doc_type, params=self.params)result.update({"pager_info": self._get_page_info(result)})return resultdef _get_page_info(self, result: dict):"""分页信息方法,用于返回分页相关情况说明:param result: ES查询结果:return: 分页相关信息"""pager_info = {}if self.dsl.get("from") is not None:pager_info["page"] = self.dsl["from"]pager_info["per_page"] = self.dsl["size"]pager_info["total"] = result["hits"]["total"]pager_info["pages"] = math.ceil(pager_info["total"] / pager_info["per_page"])if self.params.get("sort") is not None:pager_info["order_by"] = self.params["sort"]return pager_infodef clone(self):return ESQuery(index=self.index, doc_type=self.doc_type, dsl=deepcopy(self.dsl))
统一封装了ES DSL,避免操作字典描述逻辑,更具可读性。
项目: BERT文本相似度
业务逻辑: 推断文本相似度业务方法
代码链接: https://git.datagrand.com/wbyy_group/text_simi_bert/blob/master/app/src/text_simi.py
位置: 79:120行
def get_simi(self, query_list, candidate_list=None, simi='bert'):"""返回相似度:param self_simi: 是否自相似:param query_list: query 文本list:param candidate_list: 待相似文本list:return: 两个list的相似度,[len(query_list), len(candidate_list)]"""if candidate_list:corpus_list = query_list + candidate_listelse:corpus_list = query_listif simi != 'bert':# 分词feature_list = [[word for word in jieba.cut(text) if word not in self.stop_word_dict] for text in corpus_list]# 建立字典dictionary = Dictionary(feature_list)corpus_features = [dictionary.doc2bow(feature) for feature in feature_list]# 构建tfidftf_idf_model = TfidfModel(corpus_features)query_tfidf = tf_idf_model[corpus_features[:len(query_list)]]candidate_tfidf = tf_idf_model[corpus_features[len(query_list):]]# 计算相似度sparse_matrix = SparseMatrixSimilarity(candidate_tfidf, len(dictionary.token2id))simi_result = sparse_matrix.get_similarities(query_tfidf)else:query_vec = self._get_vecs(query_list)if candidate_list:corpus_vec = self._get_vecs(candidate_list)else:corpus_vec = query_veclabel_vec = query_vec / (query_vec ** 2).sum(axis=1, keepdims=True) ** 0.5corpus_list = corpus_vec / (corpus_vec ** 2).sum(axis=1, keepdims=True) ** 0.5simi_result = np.dot(label_vec, corpus_list.T)return simi_result