通过知识图谱我们可以构建一个简单的医疗问答系统。在问答过程中可以方便的检索问题。

GitHub:https://github.com/cshmzin/zstp-project/tree/main/%E5%8C%BB%E7%96%97%E6%9C%BA%E5%99%A8%E4%BA%BA

实验环境

  • neo4j数据库
  • py_aho_corasick模块

简介

  • 数据提取模块(从互联网获取数据)
  • 知识图谱数据库构建模块(将数据清洗构建知识图谱)
  • 节点匹配模块(匹配节点获取关系)
  • 问题匹配模块(匹配问题构建查询)
  • 回答构建模块(输出)

实验代码

数据集的构建

数据集不做展示,可在github提取。

我们构建如下7个实体和11个关系:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
        drugs = [] # 药品
        foods = [] # 食物
        checks = [] # 检查
        departments = [] #科室
        producers = [] #药品大类
        diseases = [] #疾病
        symptoms = []#症状
        disease_infos = []#疾病信息


        rels_department = [] # 科室-科室关系
        rels_noteat = [] # 疾病-忌吃食物关系
        rels_doeat = [] # 疾病-宜吃食物关系
        rels_recommandeat = [] # 疾病-推荐吃食物关系
        rels_commonddrug = [] # 疾病-通用药品关系
        rels_recommanddrug = [] # 疾病-热门药品关系
        rels_check = [] # 疾病-检查关系
        rels_drug_producer = [] # 厂商-药物关系
        rels_symptom = [] #疾病症状关系
        rels_acompany = [] # 疾病并发关系
        rels_category = [] # 疾病与科室之间的关系

然后将数据从json文件中提取出来:

  • 疾病属性的获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    def diseases_property(self,disease,data_json):
        disease_dict = {}
        disease_dict['name'] = disease
        disease_dict['desc'] = ''  # 描述
        disease_dict['prevent'] = ''  # 解决方法
        disease_dict['cause'] = ''  # 造成原因
        disease_dict['get_prob'] = ''  # 疾病发生率
        disease_dict['easy_get'] = ''  # 病病易发人群
        disease_dict['cure_way'] = ''  # 治疗方法
        disease_dict['cure_lasttime'] = ''  # 治疗时间
        disease_dict['cured_prob'] = ''  # 治疗成功率
        if 'desc' in data_json:
            disease_dict['desc'] = data_json['desc']
        if 'prevent' in data_json:
            disease_dict['prevent'] = data_json['prevent']
        if 'cause' in data_json:
            disease_dict['cause'] = data_json['cause']
        if 'get_prob' in data_json:
            disease_dict['get_prob'] = data_json['get_prob']
        if 'easy_get' in data_json:
            disease_dict['easy_get'] = data_json['easy_get']
        if 'cure_way' in data_json:
            disease_dict['cure_way'] = data_json['cure_way']
        if 'cure_lasttime' in data_json:
            disease_dict['cure_lasttime'] = data_json['cure_lasttime']
        if 'cured_prob' in data_json:
            disease_dict['cured_prob'] = data_json['cured_prob']
        return disease_dict
  • 获取实体及其关系(例子):

1
2
3
4
5
6
7
8
9
            #构建科室及相关关系
            if 'cure_department' in data_json:
                cure_department = data_json['cure_department']
                if len(cure_department) == 1: #只有一个表示无上下级
                    rels_category.append([disease, cure_department[0]])
                if len(cure_department) == 2: #2个表示有上下级
                    rels_department.append([cure_department[1], cure_department[0]])
                    rels_category.append([disease, cure_department[1]])
                departments += cure_department
  • 创建node

1
2
3
4
5
    def create_node(self,label,nodes):
        for node_name in nodes:
            node = Node(label, name=node_name)
            self.link.create(node)
        print(f'创建节点:{label},共{len(nodes)}个')
  • 循环将全部数据创建实体节点

1
2
3
4
5
6
7
8
        Drugs, Foods, Checks, Departments, Producers, Symptoms, Diseases, disease_infos = self.read_data()[0]
        self.create_diseases_nodes(disease_infos)
        self.create_node('Drug', Drugs)
        self.create_node('Food', Foods)
        self.create_node('Check', Checks)
        self.create_node('Department', Departments)
        self.create_node('Producer', Producers)
        self.create_node('Symptom', Symptoms)
  • 创建关系

1
2
3
4
5
6
7
8
9
    def create_relationship(self, start_node, end_node, edges, rel_type, rel_name):
        # 去重处理
        edges = list(set([tuple(edge) for edge in edges]))
        edges = [list(edge) for edge in edges]

        for edge in edges:
            p,q = edge
            query = "match(p:%s),(q:%s) where p.name='%s'and q.name='%s' create (p)-[rel:%s{name:'%s'}]->(q)" % (start_node, end_node, p, q, rel_type, rel_name)
            self.link.run(query)
  • 循环将全部数据创建关系

1
2
3
4
5
6
7
8
9
10
11
12
        rels_check, rels_recommandeat, rels_noteat, rels_doeat, rels_department, rels_commonddrug, rels_drug_producer, rels_recommanddrug,rels_symptom, rels_acompany, rels_category = self.read_data()[1]
        self.create_relationship('Disease', 'Food', rels_recommandeat, 'recommand_eat', '推荐食谱')
        self.create_relationship('Disease', 'Food', rels_noteat, 'no_eat', '忌吃')
        self.create_relationship('Disease', 'Food', rels_doeat, 'do_eat', '宜吃')
        self.create_relationship('Department', 'Department', rels_department, 'belongs_to', '属于')
        self.create_relationship('Disease', 'Drug', rels_commonddrug, 'common_drug', '常用药品')
        self.create_relationship('Producer', 'Drug', rels_drug_producer, 'drugs_of', '生产药品')
        self.create_relationship('Disease', 'Drug', rels_recommanddrug, 'recommand_drug', '好评药品')
        self.create_relationship('Disease', 'Check', rels_check, 'need_check', '诊断检查')
        self.create_relationship('Disease', 'Symptom', rels_symptom, 'has_symptom', '症状')
        self.create_relationship('Disease', 'Disease', rels_acompany, 'acompany_with', '并发症')
        self.create_relationship('Disease', 'Department', rels_category, 'belongs_to', '所属科室')

数据集构建完成后可以查看如下图:

构建对话系统

在主类中我们构建了三个模块:

  • QuestionClassifier() 【对问题进行分类,确定问题种类】
  • QuestionPaser() 【对问题进行解析,获取相对应的查询语句】
  • AnswerSearcher() 【通过查询语句搜索数据库,构造回答】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ChatBotGraph:
    def __init__(self):
        self.classifier = QuestionClassifier()
        self.parser = QuestionPaser()
        self.searcher = AnswerSearcher()

    def chat_main(self, sent):
        answer = '。。。。。。。。。。'
        res_classify = self.classifier.classify(sent)
        if not res_classify:return answer

        res_sql = self.parser.parser_main(res_classify)
        final_answers = self.searcher.search_main(res_sql)
        if not final_answers:return answer

        else:return '\n'.join(final_answers)

问题分类

我们将问题分为如下几类,通过字段匹配和关键词匹配完成。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
   def classify(self, question):
        data = {}

        medical_dict = self.check_medical(question)
        if medical_dict == {}:return {}
        data['args'] = medical_dict
        #收集问句当中所涉及到的实体类型
        types = []
        for type_ in medical_dict.values():
            types += type_

        question_types = []

        # 症状
        if self.check_words(self.symptom_qwds, question) and ('disease' in types):
            question_type = 'disease_symptom'
            question_types.append(question_type)
        if self.check_words(self.symptom_qwds, question) and ('symptom' in types):
            question_type = 'symptom_disease'
            question_types.append(question_type)

        # 原因
        if self.check_words(self.cause_qwds, question) and ('disease' in types):
            question_type = 'disease_cause'
            question_types.append(question_type)
        # 并发症
        if self.check_words(self.acompany_qwds, question) and ('disease' in types):
            question_type = 'disease_acompany'
            question_types.append(question_type)

        # 推荐食品
        if self.check_words(self.food_qwds, question) and 'disease' in types:
            deny_status = self.check_words(self.deny_words, question)
            if deny_status:
                question_type = 'disease_not_food'
            else:
                question_type = 'disease_do_food'
            question_types.append(question_type)




。。。。。。。。。。。。。。。。。。。。。

问题解析

我们将分类后来问题进行解析,为每一类问题构建相应的查询代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
        for question_type in question_types:
            sql_ = {}
            sql_['question_type'] = question_type
            sql = []
            if question_type == 'disease_symptom':
                sql = self.sql_transfer(question_type, entity_dict.get('disease'))

            elif question_type == 'symptom_disease':
                sql = self.sql_transfer(question_type, entity_dict.get('symptom'))

            elif question_type == 'disease_cause':
                sql = self.sql_transfer(question_type, entity_dict.get('disease'))

            elif question_type == 'disease_acompany':
                sql = self.sql_transfer(question_type, entity_dict.get('disease'))


。。。。。。。。。。。。

回答构造

根据对应的qustion_type,调用相应的回复模板


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    def answer_prettify(self, question_type, answers):
        final_answer = []
        if not answers:
            return ''
        if question_type == 'disease_symptom':
            desc = [i['n.name'] for i in answers]
            subject = answers[0]['m.name']
            final_answer = '{0}的症状包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

        elif question_type == 'symptom_disease':
            desc = [i['m.name'] for i in answers]
            subject = answers[0]['n.name']
            final_answer = '症状{0}可能染上的疾病有:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

        elif question_type == 'disease_cause':
            desc = [i['m.cause'] for i in answers]
            subject = answers[0]['m.name']
            final_answer = '{0}可能的成因有:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

        elif question_type == 'disease_prevent':
            desc = [i['m.prevent'] for i in answers]
            subject = answers[0]['m.name']
            final_answer = '{0}的预防措施包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

测试

 
目前共有0条评论
  • 暂无Trackback
你目前的身份是游客,评论请输入昵称和电邮!