@heavysheep
2020-11-03T11:15:29.000000Z
字数 12014
阅读 644
代码质量
项目: 北京网信办数据中心
归属人: 王子
业务逻辑: 审核文本的多个方法类,包含对不同关键词、规则的匹配、同音、象形等匹配方法。
代码链接: https://git.datagrand.com/wbyy_group/bjwxb/bjwxb_alarm/tree/master/app/controller
位置: 各方法类
class WordController(object):
data_path = "data"
def __init__(self):
self._element = "match"
self._word_dict = self._load_dict()
self._ac = self._compile_ac()
def match(self, text: str):
...
def parser(self, text):
match_result = self.match(text)
for k in match_result:
match_result[k] = 0.99 if len(match_result[k]) > 1 else 0.9
return match_result
def _load_dict(self):
...
def _compile_ac(self):
ac = ahocorasick.Automaton()
for w in self._word_dict.keys():
if w.strip():
ac.add_word(w, w)
ac.make_automaton()
return ac
class PinYinController(object):
data_path = "data"
def __init__(self):
self._element = "pinyin"
self._pinyin_dict = self._load_dict()
self._ac = self._compile_ac()
def match(self, text: str):
...
def parser(self, text):
match_result = self.match(text)
for k in match_result:
match_result[k] = pow(2.2, len(match_result)) / 100
return match_result
def _load_dict(self):
...
return pinyin_dict
def _compile_ac(self):
ac = ahocorasick.Automaton()
for w in self._pinyin_dict.keys():
if w.strip():
ac.add_word(w, w)
ac.make_automaton()
return ac
@staticmethod
def _join(word: str, join_str: str = "_"):
return join_str.join(pypinyin.lazy_pinyin(word))
项目: 北京网信办数据中心
归属人: 韩伟
业务逻辑: 从ES中读取批量数据并聚类相似事件
代码链接: https://git.datagrand.com/wbyy_group/bjwxb/bjwxb_alarm/tree/master/app/controller
位置: 全文件
class ThreadWithReturn(threading.Thread):
"""
定义带返回值的线程类
Args:
threading ([type]): [description]
"""
def __init__(self, func, args=()):
super(ThreadWithReturn, self).__init__()
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def get_result(self):
try:
return self.result
except Exception:
return None
class SimiFinder:
def __init__(self):
self.es_recall = ESRecall()
self.text_simi_ = TextSimi()
@time_cost
def simi_finder(self, text_list):
try:
info_dict = self.process_data(text_list)
t_es = ThreadWithReturn(self._get_es_simi, args=(info_dict,))
t_es.start()
t_self_simi = ThreadWithReturn(
self._get_self_simi, args=(info_dict,))
t_self_simi.start()
t_es.join()
es_simi_dict = t_es.get_result()
t_self_simi.join()
self_simi_dict = t_self_simi.get_result()
for i in range(len(text_list)):
term_dict = text_list[i]
item_id = term_dict['fields']['itemid']
item_es_simi_list = es_simi_dict.get(item_id, [])
item_self_simi_list = self_simi_dict.get(item_id, [])
simi_result = item_es_simi_list + item_self_simi_list
if simi_result:
term_dict['fields'].setdefault('has_repeated', True)
else:
term_dict['fields'].setdefault('has_repeated', False)
term_dict['fields'].setdefault('repeatedItems', simi_result)
text_list[i] = term_dict
except:
import traceback
logger.error('Get simi error: {}'.format(traceback.format_exc()))
text_list=[]
return text_list
项目: 某情感分类项目
归属人: 房悦竹
业务逻辑: 获取评论内容的情感得分
代码链接: https://git.datagrand.com/wbyy_group/text_sentiment/blob/master/text_sentiment/src/sentiment_dict.py
位置: 88:207
...
# @timing
def get_review_score(self, review):
"""
返回基于通用词典规则的评论得分
:param review: 评论内容
:return: review_score: [x, y], x-正面分, y-负面分, result: {0:x, 1:y}, x-负面分, y-正面分, hit_word_list:
"""
sentences = tp.split_sentence(review)
senti_score = []
hit_word_list = []
for sen in sentences:
zhuanzhe_flag = 0 # set zhuanzhe flag
# seg = self.wordseg.cut(sen, with_stop_word=False, model=None)
seg = self.wordseg.lcut(sen)
seg = process_contain_no(seg) # 特殊处理分词结果的“不”字
cur_word_pos = 0 # current word position
senti_word_pos = 0 # sentiment word position
pos_score = 0
neg_score = 0
seg_len = len(seg)
for word in seg:
current_neg = 0
current_pos = 0
if word in self.zhuanzhe_dict:
zhuanzhe_flag = self.zhuanzhe_dict[word]
hit_word = 'zhuanzhe:' + word
hit_word_list.append(hit_word)
if word in self.pos_dict:
hit_word = 'pos:' + word
hit_word_list.append(hit_word)
doubt_flag = False # 是否找到疑问词
current_pos += 1
start_pos = max(cur_word_pos - 5, senti_word_pos)
for w in seg[start_pos:cur_word_pos]:
current_pos = self.match_adv(w, current_pos, hit_word_list)
if cur_word_pos != 0:
s_idx = max([cur_word_pos - 2, 0]) # 往前找两个词
for w in seg[s_idx:cur_word_pos]:
current_pos, doubt_flag, hit_word_list = self.match_doubt_forward(w, current_pos,
hit_word_list)
if doubt_flag:
break
if not doubt_flag: # 找到疑问词后不再向后寻找
if cur_word_pos < seg_len - 1:
e_idx = min([cur_word_pos + 3, seg_len]) # 往后寻找两个词
for w in seg[cur_word_pos + 1:e_idx]:
current_pos, doubt_flag, hit_word_list = self.match_doubt_backward(w, current_pos,
hit_word_list)
if doubt_flag:
break
pos_score += current_pos
senti_word_pos = cur_word_pos
elif word in self.neg_dict:
hit_word = 'neg:' + word
hit_word_list.append(hit_word)
doubt_flag = False # 是否找到疑问词
current_neg += 1
start_pos = max(cur_word_pos - 5, senti_word_pos)
for w in seg[start_pos:cur_word_pos]:
current_neg = self.match_adv(w, current_neg, hit_word_list)
if cur_word_pos != 0:
s_idx = max([cur_word_pos - 2, 0]) # 往前找两个词
for w in seg[s_idx:cur_word_pos]:
current_neg, doubt_flag, hit_word_list = self.match_doubt_forward(w, current_neg,
hit_word_list)
if doubt_flag:
break
if not doubt_flag: # 找到疑问词后不再向后寻找
if cur_word_pos < seg_len - 1:
e_idx = min([cur_word_pos + 3, seg_len]) # 往后寻找两个词
for w in seg[cur_word_pos + 1:e_idx]:
current_neg, doubt_flag, hit_word_list = self.match_doubt_backward(w, current_neg,
hit_word_list)
if doubt_flag:
break
neg_score += current_neg
senti_word_pos = cur_word_pos
elif word in self.insult_dict:
hit_word = 'insult:' + word
hit_word_list.append(hit_word)
current_neg += 1 * 2 # 对在insult_dict中的词赋予2.0的权重
neg_score += current_neg
cur_word_pos += 1
pos_score, neg_score = zhuanzhe_process(zhuanzhe_flag, pos_score, neg_score, senti_score)
senti_score.append(transform_to_positive_num(pos_score, neg_score))
# pay more attention to last sentence
if senti_score[-1][0] > senti_score[-1][1]:
senti_score[-1][0] *= 1.5
if senti_score[-1][1] > senti_score[-1][0]:
senti_score[-1][1] *= 1.5
review_score = sum_sentences_score(senti_score) # [Pos, Neg]
result = {}
if review_score[0] == 0 and review_score[1] == 0:
result[0] = 0.5
result[1] = 0.5
elif review_score[0] == 0: # 词典正面分为0
result[0] = 0.75 # 原值为1,改为0.8
result[1] = 0.25
elif review_score[1] == 0: # 词典负面分为0
result[0] = 0.25
result[1] = 0.75
else:
temp_sum = review_score[0] + review_score[1]
result[0] = round(review_score[1] / temp_sum, 2)
result[1] = round(review_score[0] / temp_sum, 2)
return review_score, result, hit_word_list
...
项目: VOC核心
业务逻辑: 封装ES DSL
代码链接: https://git.datagrand.com/voc/voc_api/blob/master/app/common/es.py
位置: 25:368行
class ESQuery(object):
def __init__(self, index: str = None, doc_type: str = None, bool: dict = None, collapse: dict = None,
dsl: dict = None):
"""
ES 查询dsl封装
:param index: ES index
:param doc_type: ES doc_type(如有)
:param bool: bool dsl
:param collapse: collapse dsl.折叠dsl
:param dsl: source dsl.传入时直接生成dsl,和bool collapse等解析参数有所出入
example:
# [build condition]
filter_condition = Query.term(field="is_match", value=True)
agg_condition = [Aggs.terms(name="group", field="website.keyword", size=500)]
# [input query]
query = ESQuery(index="index_1", doc_type="type_1", bool=bools(filter=filter_condition))
query.aggs(condition=agg_condition)
# [modify attr]
query.size(0)
query.sort("datetime:desc")
# [get result]
query_result, _ = query.search()
"""
if dsl and bool:
raise ValueError("不允许同时传入'dsl' 'bool'参数")
if dsl and collapse:
raise ValueError("不允许同时传入'dsl' 'collapse'参数")
self.index = index
self.doc_type = doc_type
self.params = {}
self.bool = bool
self.dsl = dsl if dsl is not None else {}
if bool:
self.dsl["query"] = bool
if collapse:
self.dsl["collapse"] = collapse
def bools(self, must: (dict, list) = None, filter: (dict, list) = None, should: (dict, list) = None,
must_not: (dict, list) = None):
"""
bool 条件
:param must: must条件
:param filter: filter条件
:param should: should条件
:param must_not: must not条件
"""
for name, condition_info in (("must", must), ("filter", filter), ("should", should), ("must_not", must_not)):
if self.dsl.get("query") is None:
self.dsl["query"] = {"bool": {}}
if self.dsl["query"].get("bool") is None:
self.dsl["query"]["bool"] = {}
if condition_info is not None:
condition = _parser_condition(condition_info)
if name not in self.dsl["query"]["bool"]:
self.dsl["query"]["bool"][name] = condition
else:
self.dsl["query"]["bool"][name].extend(condition)
def aggs(self, condition):
if isinstance(condition, list):
condition = {k: v for c in condition for k, v in c.items()}
self.dsl["aggs"] = condition
def range(self, range_condition: dict):
"""
区间条件配置方法 对应DSL _source
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/query-dsl-range-query.html]
:param range_condition: 区间条件
"""
if self.dsl.get("query") is None:
self.dsl["query"] = {"bool": {}}
if self.dsl["query"].get("bool") is None:
self.dsl["query"]["bool"] = {}
if self.dsl["query"]["bool"].get("must") is None:
self.dsl["query"]["bool"]["must"] = range_condition
return
for i, condition in enumerate(self.dsl["query"]["bool"]["must"]):
if "range" in condition.keys():
self.dsl["query"]["bool"]["must"][i] = range_condition
return
self.dsl["query"]["bool"]["must"].append(range_condition)
def source(self, include: (str, list) = None, exclude: (str, list) = None):
"""
字段配置方法 对应DSL _source
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-source-filtering.html]
:param include: 保留字段
:param exclude: 不保留字段
"""
self.dsl["_source"] = {}
if include:
self.dsl["_source"]["includes"] = include
if exclude:
self.dsl["_source"]["excludes"] = exclude
def pagination(self, page: int = 1, per_page: int = 20):
"""
分页方法 对应DSL from/size
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-from-size.html]
:param page: 页数,从1开始,默认1
:param per_page: 页容量,默认20
"""
if page is None:
page = 1
if per_page is None:
per_page = 20
from_ = (page - 1) * per_page
self.dsl["from"] = from_
self.dsl["size"] = per_page
def sort(self, condition: (str, dict)):
"""
排序方法,对应DSL sort
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-sort.html]
注意此方法是对query对象的排序方法
:param condition: 排序条件
"""
if isinstance(condition, dict):
condition = ",".join(("{}:{}".format(c_k, c_v) for c_k, c_v in condition.items()))
self.params["sort"] = condition
def size(self, num: int):
"""
单容量方法,对应DSL size
注意和pagination有冲突
:param num: 容量
"""
self.dsl.update({"size": num})
def scroll(self, scroll: str):
"""
滚动方法 对应DSL scroll
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-scroll.html]
:param scroll: 滚动ID
"""
self.params.update({"scroll": scroll})
def search(self, debug=False):
"""
search方法 对应ES request body search方法
ES文档: [https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-uri-request.html]
:param debug: 是否打印(info日志)DSL request body
"""
if debug:
logger.info(json.dumps(self.dsl, ensure_ascii=False, indent=4))
if self.dsl.get("size") is None:
self.size(50)
result = es.search(body=self.dsl, index=self.index, doc_type=self.doc_type, params=self.params)
result.update({"pager_info": self._get_page_info(result)})
return result
def _get_page_info(self, result: dict):
"""
分页信息方法,用于返回分页相关情况说明
:param result: ES查询结果
:return: 分页相关信息
"""
pager_info = {}
if self.dsl.get("from") is not None:
pager_info["page"] = self.dsl["from"]
pager_info["per_page"] = self.dsl["size"]
pager_info["total"] = result["hits"]["total"]
pager_info["pages"] = math.ceil(pager_info["total"] / pager_info["per_page"])
if self.params.get("sort") is not None:
pager_info["order_by"] = self.params["sort"]
return pager_info
def clone(self):
return ESQuery(index=self.index, doc_type=self.doc_type, dsl=deepcopy(self.dsl))
统一封装了ES DSL,避免操作字典描述逻辑,更具可读性。
项目: BERT文本相似度
业务逻辑: 推断文本相似度业务方法
代码链接: https://git.datagrand.com/wbyy_group/text_simi_bert/blob/master/app/src/text_simi.py
位置: 79:120行
def get_simi(self, query_list, candidate_list=None, simi='bert'):
"""
返回相似度
:param self_simi: 是否自相似
:param query_list: query 文本list
:param candidate_list: 待相似文本list
:return: 两个list的相似度,[len(query_list), len(candidate_list)]
"""
if candidate_list:
corpus_list = query_list + candidate_list
else:
corpus_list = query_list
if simi != 'bert':
# 分词
feature_list = [[word for word in jieba.cut(text) if word not in self.stop_word_dict] for text in corpus_list]
# 建立字典
dictionary = Dictionary(feature_list)
corpus_features = [dictionary.doc2bow(
feature) for feature in feature_list]
# 构建tfidf
tf_idf_model = TfidfModel(corpus_features)
query_tfidf = tf_idf_model[corpus_features[:len(query_list)]]
candidate_tfidf = tf_idf_model[corpus_features[len(query_list):]]
# 计算相似度
sparse_matrix = SparseMatrixSimilarity(
candidate_tfidf, len(dictionary.token2id))
simi_result = sparse_matrix.get_similarities(query_tfidf)
else:
query_vec = self._get_vecs(query_list)
if candidate_list:
corpus_vec = self._get_vecs(candidate_list)
else:
corpus_vec = query_vec
label_vec = query_vec / (query_vec ** 2).sum(axis=1, keepdims=True) ** 0.5
corpus_list = corpus_vec / (corpus_vec ** 2).sum(axis=1, keepdims=True) ** 0.5
simi_result = np.dot(label_vec, corpus_list.T)
return simi_result