”
-
一.什么是命名实体识别
-
二.数据集描述
-
三.数据预处理
-
1.提取识别类别
-
2.实体标记编码转换
-
3.文本分割
-
4.长短句处理
-
-
四.实体标记自动化处理
-
1.分割句子对应的标签字典生成
-
2.提取词性和词边界
-
3.获取拼音特征
-
4.按字标记及数据存储
-
5.多文本处理
-
-
五.完整代码
-
1.data_process.py
-
2.prepare_data.py
-
-
六.总结
一.什么是命名实体识别
二.数据集描述
三.数据预处理
1.提取识别类别
#encoding:utf-8
import os
#----------------------------功能:获取实体类别及个数---------------------------------
def get_entities(dir):
entities = {} #字段实体类别
files = os.listdir(dir) #遍历路径
return files
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
path = "data/train_data"
print(get_entities(path))
#encoding:utf-8
import os
#----------------------------功能:获取实体类别及个数---------------------------------
def get_entities(dirPath):
entities = {} #字段实体类别
files = os.listdir(dirPath) #遍历路径
#获取所有文件的名字并去重 0.ann => 0
filenames = set([file.split('.')[0] for file in files])
filenames = list(filenames)
#print(filenames)
#重新构造ANN文件名并遍历文件
for filename in filenames:
path = os.path.join(dirPath, filename+".ann")
print(path)
#读文件
with open(path, 'r', encoding='utf8') as f:
for line in f.readlines():
#TAB键分割获取实体类型
name = line.split('t')[1]
print(name)
return filenames
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
path = "data/train_data"
print(get_entities(path))
data/train_data126_20.ann
Disease 6938 6940;6941 6945
Disease 6998 7000;7001 7005
Disease 7053 7059
Disease 7873 7879
Anatomy 7144 7148
Drug 33 37
Drug 158 162
Drug 324 328
Drug 450 454
.....
#encoding:utf-8
import os
#----------------------------功能:获取实体类别及个数---------------------------------
def get_entities(dirPath):
entities = {} #存储实体类别
files = os.listdir(dirPath) #遍历路径
#获取所有文件的名字并去重 0.ann => 0
filenames = set([file.split('.')[0] for file in files])
filenames = list(filenames)
#print(filenames)
#重新构造ANN文件名并遍历文件
for filename in filenames:
path = os.path.join(dirPath, filename+".ann")
print(path)
#读文件
with open(path, 'r', encoding='utf8') as f:
for line in f.readlines():
#TAB键分割获取实体类型
name = line.split('t')[1]
#print(name)
value = name.split(' ')[0]
#print(value)
#实体加入字典并统计个数
if value in entities:
entities[value] += 1 #在实体集合中数量加1
else:
entities[value] = 1 #创建键值且值为1
#返回实体集
return entities
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
path = "data/train_data"
print(get_entities(path))
2.实体标记编码转换
#----------------------------功能:命名实体BIO标注--------------------------------
def get_labelencoder(entities):
#排序
entities = sorted(entities.items(), key=lambda x: x[1], reverse=True)
print(entities)
#获取实体类别名称
entities = [x[0] for x in entities]
print(entities)
#encoding:utf-8
import os
#----------------------------功能:获取实体类别及个数---------------------------------
def get_entities(dirPath):
entities = {} #存储实体类别
files = os.listdir(dirPath) #遍历路径
#获取所有文件的名字并去重 0.ann => 0
filenames = set([file.split('.')[0] for file in files])
filenames = list(filenames)
#print(filenames)
#重新构造ANN文件名并遍历文件
for filename in filenames:
path = os.path.join(dirPath, filename+".ann")
#print(path)
#读文件
with open(path, 'r', encoding='utf8') as f:
for line in f.readlines():
#TAB键分割获取实体类型
name = line.split('t')[1]
#print(name)
value = name.split(' ')[0]
#print(value)
#实体加入字典并统计个数
if value in entities:
entities[value] += 1 #在实体集合中数量加1
else:
entities[value] = 1 #创建键值且值为1
#返回实体集
return entities
#----------------------------功能:命名实体BIO标注--------------------------------
def get_labelencoder(entities):
#排序
entities = sorted(entities.items(), key=lambda x: x[1], reverse=True)
print(entities)
#获取实体类别名称
entities = [x[0] for x in entities]
print(entities)
#标记实体
id2label = []
id2label.append('O')
#生成实体标记
for entity in entities:
id2label.append('B-'+entity)
id2label.append('I-'+entity)
#字典键值生成
label2id = {id2label[i]:i for i in range(len(id2label))}
return id2label, label2id
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
path = "data/train_data"
#获取实体类别及个数
entities = get_entities(path)
print(entities)
print(len(entities))
#完成实体标记 列表 字典
#得到标签和下标的映射
label, label_dic = get_labelencoder(entities)
print(label)
print(len(label))
print(label_dic)
3.文本分割
import re
#-------------------------功能:自定义分隔符文本分割------------------------------
def split_text(text):
pattern = '。|,|,|;|?'
#获取字符的下标位置
for m in re.finditer(pattern, text):
print(m)
start = m.span()[0] #标点符号位置
print(text[start])
start = m.span()[0] - 5
end = m.span()[1] + 5
print('****', text[start:end], '****')
break
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
path = "data/train_data"
#自定义分割文本
text = path + "/0.txt"
print(text)
with open(text, 'r', encoding='utf8') as f:
text = f.read()
split_text(text)
#encoding:utf-8
import os
import re
#----------------------------功能:获取实体类别及个数---------------------------------
def get_entities(dirPath):
entities = {} #存储实体类别
files = os.listdir(dirPath) #遍历路径
#获取所有文件的名字并去重 0.ann => 0
filenames = set([file.split('.')[0] for file in files])
filenames = list(filenames)
#print(filenames)
#重新构造ANN文件名并遍历文件
for filename in filenames:
path = os.path.join(dirPath, filename+".ann")
#print(path)
#读文件
with open(path, 'r', encoding='utf8') as f:
for line in f.readlines():
#TAB键分割获取实体类型
name = line.split('t')[1]
#print(name)
value = name.split(' ')[0]
#print(value)
#实体加入字典并统计个数
if value in entities:
entities[value] += 1 #在实体集合中数量加1
else:
entities[value] = 1 #创建键值且值为1
#返回实体集
return entities
#----------------------------功能:命名实体BIO标注--------------------------------
def get_labelencoder(entities):
#排序
entities = sorted(entities.items(), key=lambda x: x[1], reverse=True)
print(entities)
#获取实体类别名称
entities = [x[0] for x in entities]
print(entities)
#标记实体
id2label = []
id2label.append('O')
#生成实体标记
for entity in entities:
id2label.append('B-'+entity)
id2label.append('I-'+entity)
#字典键值生成
label2id = {id2label[i]:i for i in range(len(id2label))}
return id2label, label2id
#-------------------------功能:自定义分隔符文本分割------------------------------
def split_text(text):
pattern = '。|,|,|;|;|?|?|.'
#获取字符的下标位置
for m in re.finditer(pattern, text):
"""
print(m)
start = m.span()[0] #标点符号位置
print(text[start])
start = m.span()[0] - 5
end = m.span()[1] + 5
print('****', text[start:end], '****')
"""
#特殊符号下标
idx = m.span()[0]
#判断是否断句
if text[idx-1]=='n': #当前符号前是换行符
print(path)
print('****', text[idx-20:idx+20], '****')
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
dirPath = "data/train_data"
#获取实体类别及个数
entities = get_entities(dirPath)
print(entities)
print(len(entities))
#完成实体标记 列表 字典
#得到标签和下标的映射
label, label_dic = get_labelencoder(entities)
print(label)
print(len(label))
print(label_dic, 'nn')
#遍历路径
files = os.listdir(dirPath)
filenames = set([file.split('.')[0] for file in files])
filenames = list(filenames)
for filename in filenames:
path = os.path.join(dirPath, filename+".txt") #TXT文件
#print(path)
with open(path, 'r', encoding='utf8') as f:
text = f.read()
#分割文本
split_text(text)
print("n")
if text[idx-1].isdigit() and text[idx+1].isdigit():
continue
if text[idx-1].isdigit() and text[idx+1].isspace() and text[idx+2].isdigit():
continue
if text[idx-1].islower() and text[idx+1].islower():
continue
if text[idx-1].islower() and text[idx+1].isdigit():
continue
if text[idx-1].isupper() and text[idx+1].isdigit():
continue
if text[idx-1].isdigit() and text[idx+1].islower():
continue
if text[idx-1].isdigit() and text[idx+1].isupper():
continue
pattern2 = '([一二三四五六七八九十零])|[一二三四五六七八九十零]、|'
pattern2 += '注:|附录 |表 d|Tab d+|[摘要]|[提要]|表d[^。,,;;]+?n|'
pattern2 += '图 d|Fig d|[Abdtract]|[Summary]|前 言|【摘要】|【关键词】|'
pattern2 += '结 果|讨 论|and |or |with |by |because of |as well as '
for m in re.finditer(pattern2, text):
idx = m.span()[0]
print('****', text[idx-20:idx+20], '****')
#------------------------功能:判断字符是不是汉字-----------------------
def ischinese(char):
if 'u4e00' <=char <= 'u9fff':
return True
return False
def split_text(dirPath):
.....
#判断序列且包含汉字的分割(2.接下来...) 同时小数不进行切割
pattern3 = 'd.' #数字+点
for m in re.finditer(pattern3, text):
idx = m.span()[0]
if ischinese(text[idx+2]): #第三个字符为中文汉字
print('****', text[idx-20:idx+20], '****')
#encoding:utf-8
import os
import re
#----------------------------功能:获取实体类别及个数---------------------------------
def get_entities(dirPath):
entities = {} #存储实体类别
files = os.listdir(dirPath) #遍历路径
#获取所有文件的名字并去重 0.ann => 0
filenames = set([file.split('.')[0] for file in files])
filenames = list(filenames)
#print(filenames)
#重新构造ANN文件名并遍历文件
for filename in filenames:
path = os.path.join(dirPath, filename+".ann")
#print(path)
#读文件
with open(path, 'r', encoding='utf8') as f:
for line in f.readlines():
#TAB键分割获取实体类型
name = line.split('t')[1]
#print(name)
value = name.split(' ')[0]
#print(value)
#实体加入字典并统计个数
if value in entities:
entities[value] += 1 #在实体集合中数量加1
else:
entities[value] = 1 #创建键值且值为1
#返回实体集
return entities
#----------------------------功能:命名实体BIO标注--------------------------------
def get_labelencoder(entities):
#排序
entities = sorted(entities.items(), key=lambda x: x[1], reverse=True)
print(entities)
#获取实体类别名称
entities = [x[0] for x in entities]
print(entities)
#标记实体
id2label = []
id2label.append('O')
#生成实体标记
for entity in entities:
id2label.append('B-'+entity)
id2label.append('I-'+entity)
#字典键值生成
label2id = {id2label[i]:i for i in range(len(id2label))}
return id2label, label2id
#-------------------------功能:自定义分隔符文本分割------------------------------
def split_text(text):
#分割后的下标
split_index = []
#--------------------------------------------------------------------
# 文本分割
#--------------------------------------------------------------------
#第一部分 按照符号分割
pattern = '。|,|,|;|;|?|?|.'
#获取字符的下标位置
for m in re.finditer(pattern, text):
"""
print(m)
start = m.span()[0] #标点符号位置
print(text[start])
start = m.span()[0] - 5
end = m.span()[1] + 5
print('****', text[start:end], '****')
"""
#特殊符号下标
idx = m.span()[0]
#判断是否断句 contniue表示不能直接分割句子
if text[idx-1]=='n': #当前符号前是换行符
continue
if text[idx-1].isdigit() and text[idx+1].isdigit(): #前后都是数字或数字+空格
continue
if text[idx-1].isdigit() and text[idx+1].isspace() and text[idx+2].isdigit():
continue
if text[idx-1].islower() and text[idx+1].islower(): #前后都是小写字母
continue
if text[idx-1].isupper() and text[idx+1].isupper(): #前后都是大写字母
continue
if text[idx-1].islower() and text[idx+1].isdigit(): #前面是小写字母 后面是数字
continue
if text[idx-1].isupper() and text[idx+1].isdigit(): #前面是大写字母 后面是数字
continue
if text[idx-1].isdigit() and text[idx+1].islower(): #前面是数字 后面是小写字母
continue
if text[idx-1].isdigit() and text[idx+1].isupper(): #前面是数字 后面是大写字母
continue
if text[idx+1] in set('.。;;,,'): #前后都是标点符号
continue
if text[idx-1].isspace() and text[idx-2].isspace() and text[idx-3].isupper():
continue #HBA1C 。两个空格+字母
if text[idx-1].isspace() and text[idx-3].isupper():
continue
#print(path)
#print('****', text[idx-20:idx+20], '****')
#将分句的下标存储至列表中 -> 标点符号后面的字符
split_index.append(idx+1)
#--------------------------------------------------------------------
#第二部分 按照自定义符号分割
#下列形式进行句子分割
pattern2 = '([一二三四五六七八九十零])|[一二三四五六七八九十零]、|'
pattern2 += '注:|附录 |表 d|Tab d+|[摘要]|[提要]|表d[^。,,;;]+?n|'
pattern2 += '图 d|Fig d|[Abdtract]|[Summary]|前 言|【摘要】|【关键词】|'
pattern2 += '结 果|讨 论|and |or |with |by |because of |as well as '
#print(pattern2)
for m in re.finditer(pattern2, text):
idx = m.span()[0]
#print('****', text[idx-20:idx+20], '****')
#连接词位于单词中间不能分割 如 goodbye
if (text[idx:idx+2] in ['or','by'] or text[idx:idx+3]=='and' or text[idx:idx+4]=='with')
and (text[idx-1].islower() or text[idx-1].isupper()):
continue
split_index.append(idx) #注意这里不加1 找到即分割
#--------------------------------------------------------------------
#第三部分 中文字符+数字分割
#判断序列且包含汉字的分割(2.接下来...) 同时小数不进行切割
pattern3 = 'nd.' #数字+点
for m in re.finditer(pattern3, text):
idx = m.span()[0]
if ischinese(text[idx+3]): #第四个字符为中文汉字 含换行
#print('****', text[idx-20:idx+20], '****')
split_index.append(idx+1)
#换行+数字+括号 (1)总体治疗原则:淤在选择降糖药物时
for m in re.finditer('n(d)', text):
idx = m.span()[0]
split_index.append(idx+1)
#--------------------------------------------------------------------
#获取句子分割下标后进行排序操作 增加第一行和最后一行
split_index = sorted(set([0, len(text)] + split_index))
split_index = list(split_index)
#print(split_index)
#计算机最大值和最小值
lens = [split_index[i+1]-split_index[i] for i in range(len(split_index)-1)]
print(max(lens), min(lens))
#输出切割的句子
#for i in range(len(split_index)-1):
# print(i, '******', text[split_index[i]:split_index[i+1]])
#---------------------------功能:判断字符是不是汉字-------------------------------
def ischinese(char):
if 'u4e00' <=char <= 'u9fff':
return True
return False
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
dirPath = "data/train_data"
#获取实体类别及个数
entities = get_entities(dirPath)
print(entities)
print(len(entities))
#完成实体标记 列表 字典
#得到标签和下标的映射
label, label_dic = get_labelencoder(entities)
print(label)
print(len(label))
print(label_dic, 'nn')
#遍历路径
files = os.listdir(dirPath)
filenames = set([file.split('.')[0] for file in files])
filenames = list(filenames)
for filename in filenames:
path = os.path.join(dirPath, filename+".txt") #TXT文件
#print(path)
with open(path, 'r', encoding='utf8') as f:
text = f.read()
#分割文本
print(path)
split_text(text)
print("n")
输出结果如下图所示,我们可以计算分割后每个TXT文档的最长句子和最短句子。
4.长短句处理
#encoding:utf-8
import os
import re
#----------------------------功能:获取实体类别及个数---------------------------------
def get_entities(dirPath):
entities = {} #存储实体类别
files = os.listdir(dirPath) #遍历路径
#获取所有文件的名字并去重 0.ann => 0
filenames = set([file.split('.')[0] for file in files])
filenames = list(filenames)
#print(filenames)
#重新构造ANN文件名并遍历文件
for filename in filenames:
path = os.path.join(dirPath, filename+".ann")
#print(path)
#读文件
with open(path, 'r', encoding='utf8') as f:
for line in f.readlines():
#TAB键分割获取实体类型
name = line.split('t')[1]
#print(name)
value = name.split(' ')[0]
#print(value)
#实体加入字典并统计个数
if value in entities:
entities[value] += 1 #在实体集合中数量加1
else:
entities[value] = 1 #创建键值且值为1
#返回实体集
return entities
#----------------------------功能:命名实体BIO标注--------------------------------
def get_labelencoder(entities):
#排序
entities = sorted(entities.items(), key=lambda x: x[1], reverse=True)
print(entities)
#获取实体类别名称
entities = [x[0] for x in entities]
print(entities)
#标记实体
id2label = []
id2label.append('O')
#生成实体标记
for entity in entities:
id2label.append('B-'+entity)
id2label.append('I-'+entity)
#字典键值生成
label2id = {id2label[i]:i for i in range(len(id2label))}
return id2label, label2id
#-------------------------功能:自定义分隔符文本分割------------------------------
def split_text(text, outfile):
#分割后的下标
split_index = []
#文件写入
fw = open(outfile, 'w', encoding='utf8')
#--------------------------------------------------------------------
# 文本分割
#--------------------------------------------------------------------
#第一部分 按照符号分割
pattern = '。|,|,|;|;|?|?|.'
#获取字符的下标位置
for m in re.finditer(pattern, text):
"""
print(m)
start = m.span()[0] #标点符号位置
print(text[start])
start = m.span()[0] - 5
end = m.span()[1] + 5
print('****', text[start:end], '****')
"""
#特殊符号下标
idx = m.span()[0]
#判断是否断句 contniue表示不能直接分割句子
if text[idx-1]=='n': #当前符号前是换行符
continue
if text[idx-1].isdigit() and text[idx+1].isdigit(): #前后都是数字或数字+空格
continue
if text[idx-1].isdigit() and text[idx+1].isspace() and text[idx+2].isdigit():
continue
if text[idx-1].islower() and text[idx+1].islower(): #前后都是小写字母
continue
if text[idx-1].isupper() and text[idx+1].isupper(): #前后都是大写字母
continue
if text[idx-1].islower() and text[idx+1].isdigit(): #前面是小写字母 后面是数字
continue
if text[idx-1].isupper() and text[idx+1].isdigit(): #前面是大写字母 后面是数字
continue
if text[idx-1].isdigit() and text[idx+1].islower(): #前面是数字 后面是小写字母
continue
if text[idx-1].isdigit() and text[idx+1].isupper(): #前面是数字 后面是大写字母
continue
if text[idx+1] in set('.。;;,,'): #前后都是标点符号
continue
if text[idx-1].isspace() and text[idx-2].isspace() and text[idx-3].isupper():
continue #HBA1C 。两个空格+字母
if text[idx-1].isspace() and text[idx-3].isupper():
continue
#print('****', text[idx-20:idx+20], '****')
#将分句的下标存储至列表中 -> 标点符号后面的字符
split_index.append(idx+1)
#--------------------------------------------------------------------
#第二部分 按照自定义符号分割
#下列形式进行句子分割
pattern2 = '([一二三四五六七八九十零])|[一二三四五六七八九十零]、|'
pattern2 += '注:|附录 |表 d|Tab d+|[摘要]|[提要]|表d[^。,,;;]+?n|'
pattern2 += '图 d|Fig d|[Abdtract]|[Summary]|前 言|【摘要】|【关键词】|'
pattern2 += '结 果|讨 论|and |or |with |by |because of |as well as '
#print(pattern2)
for m in re.finditer(pattern2, text):
idx = m.span()[0]
#print('****', text[idx-20:idx+20], '****')
#连接词位于单词中间不能分割 如 goodbye
if (text[idx:idx+2] in ['or','by'] or text[idx:idx+3]=='and' or text[idx:idx+4]=='with')
and (text[idx-1].islower() or text[idx-1].isupper()):
continue
split_index.append(idx) #注意这里不加1 找到即分割
#--------------------------------------------------------------------
#第三部分 中文字符+数字分割
#判断序列且包含汉字的分割(2.接下来...) 同时小数不进行切割
pattern3 = 'nd.' #数字+点
for m in re.finditer(pattern3, text):
idx = m.span()[0]
if ischinese(text[idx+3]): #第四个字符为中文汉字 含换行
#print('****', text[idx-20:idx+20], '****')
split_index.append(idx+1)
#换行+数字+括号 (1)总体治疗原则:淤在选择降糖药物时
for m in re.finditer('n(d)', text):
idx = m.span()[0]
split_index.append(idx+1)
#--------------------------------------------------------------------
#获取句子分割下标后进行排序操作 增加第一行和最后一行
split_index = sorted(set([0, len(text)] + split_index))
split_index = list(split_index)
#print(split_index)
#计算机最大值和最小值
lens = [split_index[i+1]-split_index[i] for i in range(len(split_index)-1)]
#print(max(lens), min(lens))
#--------------------------------------------------------------------
# 长短句处理
#--------------------------------------------------------------------
#遍历每一个句子 (一)xxxx 分割
other_index = []
for i in range(len(split_index)-1):
begin = split_index[i]
end = split_index[i+1]
#print("-----", text[begin:end])
#print(begin, end)
if (text[begin] in '一二三四五六七八九十零') or
(text[begin]=='(' and text[begin+1] in '一二三四五六七八九十零'):
for j in range(begin,end):
if text[j]=='n':
other_index.append(j+1)
#补充+排序
split_index += other_index
split_index = list(sorted(set([0, len(text)] + split_index)))
#--------------------------------------------------------------------
#第一部分 长句处理:句子长度超过150进行拆分
other_index = []
for i in range(len(split_index)-1):
begin = split_index[i]
end = split_index[i+1]
other_index.append(begin)
#句子长度超过150切割 并且最短15个字符
if end-begin>150:
for j in range(begin,end):
#这一次下标位置比上一次超过15分割
if(j+1-other_index[-1])>15:
#换行分割
if text[j]=='n':
other_index.append(j+1)
#空格+前后数字
if text[j]==' ' and text[j-1].isnumeric() and text[j+1].isnumeric():
other_index.append(j+1)
split_index += other_index
split_index = list(sorted(set([0, len(text)] + split_index)))
#--------------------------------------------------------------------
#第二部分 删除空格的句子
for i in range(1, len(split_index)-1):
idx = split_index[i]
#当前下标和上一个下标对比 如果等于空格继续比较
while idx>split_index[i-1]-1 and text[idx-1].isspace():
idx -= 1
split_index[i] = idx
split_index = list(sorted(set([0, len(text)] + split_index)))
#--------------------------------------------------------------------
#第三部分 短句处理-拼接
temp_idx = []
i = 0
while i<(len(split_index)-1):
begin = split_index[i]
end = split_index[i+1]
#先统计句子中中文字符和英文字符个数
num_ch = 0
num_en = 0
if end - begin <15:
for ch in text[begin:end]:
if ischinese(ch):
num_ch += 1
elif ch.islower() or ch.isupper():
num_en += 1
if num_ch + 0.5*num_en>5: #大于5说明长度够用
temp_idx.append(begin)
i += 1 #注意break前i加1 否则死循环
break
#长度小于等于5和后面的句子合并
if num_ch + 0.5*num_en<=5:
temp_idx.append(begin)
i += 2
else:
temp_idx.append(begin) #大于15直接添加下标
i += 1
split_index = list(sorted(set([0, len(text)] + temp_idx)))
#查看句子长度 由于存在n换行一个字符
lens = [split_index[i+1]-split_index[i] for i in range(len(split_index)-1)][:-1] #删除最后一个换行
print(max(lens), min(lens))
#for i in range(len(split_index)-1):
# print(i, '****', text[split_index[i]:split_index[i+1]])
#存储结果
result = []
for i in range(len(split_index)-1):
result.append(text[split_index[i]:split_index[i+1]])
fw.write(text[split_index[i]:split_index[i+1]])
fw.close()
#检查:预处理后字符是否减少
s = ''
for r in result:
s += r
assert len(s)==len(text) #断言
return result
#---------------------------功能:判断字符是不是汉字-------------------------------
def ischinese(char):
if 'u4e00' <=char <= 'u9fff':
return True
return False
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
dirPath = "data/train_data"
outPath = 'data/train_data_pro'
#获取实体类别及个数
entities = get_entities(dirPath)
print(entities)
print(len(entities))
#完成实体标记 列表 字典
#得到标签和下标的映射
label, label_dic = get_labelencoder(entities)
print(label)
print(len(label))
print(label_dic, 'nn')
#遍历路径
files = os.listdir(dirPath)
filenames = set([file.split('.')[0] for file in files])
filenames = list(filenames)
for filename in filenames:
path = os.path.join(dirPath, filename+".txt") #TXT文件
outfile = os.path.join(outPath, filename+"_pro.txt")
#print(path)
with open(path, 'r', encoding='utf8') as f:
text = f.read()
#分割文本
print(path)
split_text(text, outfile)
print("n")
四.实体标记自动化处理
1.分割句子对应的标签字典生成
#encoding:utf-8
import os
import pandas as pd
from collections import Counter
from data_process import split_text
from tqdm import tqdm #进度条 pip install tqdm
#词性标注
import jieba.posseg as psg
train_dir = "train_data"
#----------------------------功能:文本预处理---------------------------------
train_dir = "train_data"
def process_text(idx, split_method=None):
"""
功能: 读取文本并切割,接着打上标记及提取词边界、词性、偏旁部首、拼音等特征
param idx: 文件的名字 不含扩展名
param split_method: 切割文本方法
return
"""
#定义字典 保存所有字的标记、边界、词性、偏旁部首、拼音等特征
data = {}
#--------------------------------------------------------------------
#获取句子
if split_method is None:
#未给文本分割函数 -> 读取文件
with open(f'data/{train_dir}/{idx}.txt', encoding='utf8') as f: #f表示文件路径
texts = f.readlines()
else:
#给出文本分割函数 -> 按函数分割
with open(f'data/{train_dir}/{idx}.txt', encoding='utf8') as f:
outfile = f'data/train_data_pro/{idx}_pro.txt'
print(outfile)
texts = f.read()
texts = split_method(texts, outfile)
#提取句子
data['word'] = texts
print(texts)
#--------------------------------------------------------------------
#获取标签
tag_list = ['O' for s in texts for x in s] #双层循环遍历每句话中的汉字
return tag_list
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
print(process_text('0',split_method=split_text))
tag = pd.read_csv(f'data/{train_dir}/{idx}.ann', header=None, sep='t')
return tag
#读取ANN文件获取每个实体的类型、起始位置和结束位置
tag = pd.read_csv(f'data/{train_dir}/{idx}.ann', header=None, sep='t') #Pandas读取 分隔符为tab键
for i in range(tag.shape[0]): #tag.shape[0]为行数
tag_item = tag.iloc[i][1].split(' ') #每一行的第二列 空格分割
print(tag_item)
#encoding:utf-8
import os
import pandas as pd
from collections import Counter
from data_process import split_text
from tqdm import tqdm #进度条 pip install tqdm
#词性标注
import jieba.posseg as psg
train_dir = "train_data"
#----------------------------功能:文本预处理---------------------------------
train_dir = "train_data"
def process_text(idx, split_method=None):
"""
功能: 读取文本并切割,接着打上标记及提取词边界、词性、偏旁部首、拼音等特征
param idx: 文件的名字 不含扩展名
param split_method: 切割文本方法
return
"""
#定义字典 保存所有字的标记、边界、词性、偏旁部首、拼音等特征
data = {}
#--------------------------------------------------------------------
#获取句子
#--------------------------------------------------------------------
if split_method is None:
#未给文本分割函数 -> 读取文件
with open(f'data/{train_dir}/{idx}.txt', encoding='utf8') as f: #f表示文件路径
texts = f.readlines()
else:
#给出文本分割函数 -> 按函数分割
with open(f'data/{train_dir}/{idx}.txt', encoding='utf8') as f:
outfile = f'data/train_data_pro/{idx}_pro.txt'
print(outfile)
texts = f.read()
texts = split_method(texts, outfile)
#提取句子
data['word'] = texts
print(texts)
#--------------------------------------------------------------------
# 获取标签
#--------------------------------------------------------------------
#初始时将所有汉字标记为O
tag_list = ['O' for s in texts for x in s] #双层循环遍历每句话中的汉字
#读取ANN文件获取每个实体的类型、起始位置和结束位置
tag = pd.read_csv(f'data/{train_dir}/{idx}.ann', header=None, sep='t') #Pandas读取 分隔符为tab键
#0 T1 Disease 1845 1850 1型糖尿病
for i in range(tag.shape[0]): #tag.shape[0]为行数
tag_item = tag.iloc[i][1].split(' ') #每一行的第二列 空格分割
#print(tag_item)
#存在某些实体包括两段位置区间 仅获取起始位置和结束位置
cls, start, end = tag_item[0], int(tag_item[1]), int(tag_item[-1])
#print(cls,start,end)
#对tag_list进行修改
tag_list[start] = 'B-' + cls
for j in range(start+1, end):
tag_list[j] = 'I-' + cls
return tag_list
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
print(process_text('0',split_method=split_text))
#--------------------------------------------------------------------
# 分割后句子匹配标签
#--------------------------------------------------------------------
tags = []
start = 0
end = 0
#遍历文本
for s in texts:
length = len(s)
end += length
tags.append(tag_list[start:end])
start += length
return tag_list, tags
2.提取词性和词边界
#--------------------------------------------------------------------
# 提取词性和词边界
#--------------------------------------------------------------------
#初始标记为M
word_bounds = ['M' for item in tag_list] #边界 M表示中间
word_flags = [] #词性
#分词
for text in texts:
#带词性的结巴分词
for word, flag in psg.cut(text):
if len(word)==1: #1个长度词
start = len(word_flags)
word_bounds[start] = 'S' #单个字
word_flags.append(flag)
else:
start = len(word_flags)
word_bounds[start] = 'B' #开始边界
word_flags += [flag]*len(word) #保证词性和字一一对应
end = len(word_flags) - 1
word_bounds[end] = 'E' #结束边界
#存储
bounds = []
flags = []
start = 0
end = 0
for s in texts:
length = len(s)
end += length
bounds.append(word_bounds[start:end])
flags.append(word_flags[start:end])
start += length
data['bound'] = bounds
data['flag'] = flags
#return texts, tags, bounds, flags
return texts[0], tags[0], bounds[0], flags[0]
(
'中国成人2型糖尿病HBA1C c控制目标的专家共识n目前,',
['O', 'O', 'O', 'O', 'B-Disease', 'I-Disease', 'I-Disease',
'I-Disease', 'I-Disease', 'B-Test', 'I-Test', 'I-Test',
'I-Test', 'I-Test', 'O', 'O', 'O', 'O', 'O', 'O',
'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O'],
['B', 'E', 'B', 'E', 'S', 'S', 'B', 'M', 'E',
'B', 'M', 'M', 'M', 'E', 'S', 'S', 'S', 'B',
'M', 'M', 'E', 'S', 'B', 'E', 'B', 'E', 'S', 'B', 'E', 'S'],
['ns', 'ns', 'n', 'n', 'm', 'k', 'n', 'n', 'n',
'eng', 'eng', 'eng', 'eng', 'eng', 'x', 'x',
'x', 'n', 'n', 'n', 'n', 'uj', 'n', 'n', 'n', 'n', 'x', 't', 't', 'x']
)
3.获取拼音特征
from cnradical import Radical, RunOption
radical = Radical(RunOption.Radical)
pinyin = Radical(RunOption.Pinyin)
text = '你好,今天早上吃饭了吗?Eastmount'
radical_out = [radical.trans_ch(ele) for ele in text]
pinyin_out = [pinyin.trans_ch(ele) for ele in text]
print(radical_out)
print(pinyin_out)
radical_out = radical.trans_str(text)
pinyin_out = pinyin.trans_str(text)
print(radical_out)
print(pinyin_out)
#encoding:utf-8
import os
import pandas as pd
from collections import Counter
from data_process import split_text
from tqdm import tqdm #进度条 pip install tqdm
#词性标注
import jieba.posseg as psg
#获取字的偏旁和拼音
from cnradical import Radical, RunOption
train_dir = "train_data"
#----------------------------功能:文本预处理---------------------------------
train_dir = "train_data"
def process_text(idx, split_method=None):
"""
功能: 读取文本并切割,接着打上标记及提取词边界、词性、偏旁部首、拼音等特征
param idx: 文件的名字 不含扩展名
param split_method: 切割文本方法
return
"""
#定义字典 保存所有字的标记、边界、词性、偏旁部首、拼音等特征
data = {}
#--------------------------------------------------------------------
#获取句子
#--------------------------------------------------------------------
if split_method is None:
#未给文本分割函数 -> 读取文件
with open(f'data/{train_dir}/{idx}.txt', encoding='utf8') as f: #f表示文件路径
texts = f.readlines()
else:
#给出文本分割函数 -> 按函数分割
with open(f'data/{train_dir}/{idx}.txt', encoding='utf8') as f:
outfile = f'data/train_data_pro/{idx}_pro.txt'
print(outfile)
texts = f.read()
texts = split_method(texts, outfile)
#提取句子
data['word'] = texts
print(texts)
#--------------------------------------------------------------------
# 获取标签
#--------------------------------------------------------------------
#初始时将所有汉字标记为O
tag_list = ['O' for s in texts for x in s] #双层循环遍历每句话中的汉字
#读取ANN文件获取每个实体的类型、起始位置和结束位置
tag = pd.read_csv(f'data/{train_dir}/{idx}.ann', header=None, sep='t') #Pandas读取 分隔符为tab键
#0 T1 Disease 1845 1850 1型糖尿病
for i in range(tag.shape[0]): #tag.shape[0]为行数
tag_item = tag.iloc[i][1].split(' ') #每一行的第二列 空格分割
#print(tag_item)
#存在某些实体包括两段位置区间 仅获取起始位置和结束位置
cls, start, end = tag_item[0], int(tag_item[1]), int(tag_item[-1])
#print(cls,start,end)
#对tag_list进行修改
tag_list[start] = 'B-' + cls
for j in range(start+1, end):
tag_list[j] = 'I-' + cls
#断言 两个长度不一致报错
assert len([x for s in texts for x in s])==len(tag_list)
#print(len([x for s in texts for x in s]))
#print(len(tag_list))
#--------------------------------------------------------------------
# 分割后句子匹配标签
#--------------------------------------------------------------------
tags = []
start = 0
end = 0
#遍历文本
for s in texts:
length = len(s)
end += length
tags.append(tag_list[start:end])
start += length
print(len(tags))
#标签数据存储至字典中
data['label'] = tags
#--------------------------------------------------------------------
# 提取词性和词边界
#--------------------------------------------------------------------
#初始标记为M
word_bounds = ['M' for item in tag_list] #边界 M表示中间
word_flags = [] #词性
#分词
for text in texts:
#带词性的结巴分词
for word, flag in psg.cut(text):
if len(word)==1: #1个长度词
start = len(word_flags)
word_bounds[start] = 'S' #单个字
word_flags.append(flag)
else:
start = len(word_flags)
word_bounds[start] = 'B' #开始边界
word_flags += [flag]*len(word) #保证词性和字一一对应
end = len(word_flags) - 1
word_bounds[end] = 'E' #结束边界
#存储
bounds = []
flags = []
start = 0
end = 0
for s in texts:
length = len(s)
end += length
bounds.append(word_bounds[start:end])
flags.append(word_flags[start:end])
start += length
data['bound'] = bounds
data['flag'] = flags
#--------------------------------------------------------------------
# 获取拼音特征
#--------------------------------------------------------------------
radical = Radical(RunOption.Radical) #提取偏旁部首
pinyin = Radical(RunOption.Pinyin) #提取拼音
#提取拼音和偏旁 None用特殊符号替代
radical_out = [[radical.trans_ch(x) if radical.trans_ch(x) is not None else 'PAD' for x in s] for s in texts]
pinyin_out = [[pinyin.trans_ch(x) if pinyin.trans_ch(x) is not None else 'PAD' for x in s] for s in texts]
#赋值
data['radical'] = radical_out
data['pinyin'] = pinyin_out
#return texts, tags, bounds, flags
return texts[0], tags[0], bounds[0], flags[0], radical_out[0], pinyin_out[0]
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
print(process_text('0',split_method=split_text)
('中国成人2型糖尿病HBA1C c控制目标的专家共识n目前,',
['O', 'O', 'O', 'O', 'B-Disease', 'I-Disease', 'I-Disease', 'I-Disease', 'I-Disease',
'B-Test', 'I-Test', 'I-Test', 'I-Test', 'I-Test', 'O', 'O', 'O',
'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O'],
['B', 'E', 'B', 'E', 'S', 'S', 'B', 'M', 'E', 'B', 'M',
'M', 'M', 'E', 'S', 'S', 'S', 'B', 'M', 'M', 'E', 'S',
'B', 'E', 'B', 'E', 'S', 'B', 'E', 'S'],
['ns', 'ns', 'n', 'n', 'm', 'k', 'n', 'n', 'n', 'eng',
'eng', 'eng', 'eng', 'eng', 'x', 'x', 'x', 'n', 'n',
'n', 'n', 'uj', 'n', 'n', 'n', 'n', 'x', 't', 't', 'x'],
['丨', '囗', '戈', '人', 'PAD', '土', '米', '尸', '疒', 'PAD',
'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', '扌', '刂',
'目', '木', '白', '一', '宀', '八', '讠', 'PAD', '目', '刂', 'PAD'],
['zhōng', 'guó', 'chéng', 'rén', 'PAD', 'xíng', 'táng', 'niào', 'bìng', 'PAD',
'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'PAD', 'kòng', 'zhì', 'mù', 'biāo',
'dí', 'zhuān', 'jiā', 'gòng', 'shí', 'PAD', 'mù', 'qián', 'PAD'])
4.按字标记及数据存储
#--------------------------------------------------------------------
# 存储数据
#--------------------------------------------------------------------
#获取样本数量
num_samples = len(texts) #行数
num_col = len(data.keys()) #列数 字典自定义类别数
print(num_samples)
print(num_col)
dataset = []
for i in range(num_samples):
records = list(zip(*[list(v[i]) for v in data.values()])) #压缩
records = list(zip(*[list(v[0]) for v in data.values()]))
for r in records:
print(r)
#return texts, tags, bounds, flags
#return texts[0], tags[0], bounds[0], flags[0], radical_out[0], pinyin_out[0]
#--------------------------------------------------------------------
# 存储数据
#--------------------------------------------------------------------
#获取样本数量
num_samples = len(texts) #行数
num_col = len(data.keys()) #列数 字典自定义类别数 6
print(num_samples)
print(num_col)
dataset = []
for i in range(num_samples):
records = list(zip(*[list(v[i]) for v in data.values()])) #压缩
dataset += records+[['sep']*num_col] #每处理一句话sep分割
#records = list(zip(*[list(v[0]) for v in data.values()]))
#for r in records:
# print(r)
#最后一行sep删除
dataset = dataset[:-1]
#转换成dataframe 增加表头
dataset = pd.DataFrame(dataset,columns=data.keys())
#保存文件 测试集 训练集
save_path = f'data/prepare/{split_name}/{idx}.csv'
dataset.to_csv(save_path,index=False,encoding='utf-8')
#--------------------------------------------------------------------
# 处理换行符 w表示一个字
#--------------------------------------------------------------------
def clean_word(w):
if w=='n':
return 'LB'
if w in [' ','t','u2003']: #中文空格u2003
return 'SPACE'
if w.isdigit(): #将所有数字转换为一种符号 数字训练会造成干扰
return 'NUM'
return w
#对dataframe应用函数
dataset['word'] = dataset['word'].apply(clean_word)
#存储数据
dataset.to_csv(save_path,index=False,encoding='utf-8')
5.多文本处理
#----------------------------功能:预处理所有文本---------------------------------
def multi_process(split_method=None,train_ratio=0.8):
"""
功能: 对所有文本尽心预处理操作
param split_method: 切割文本方法
param train_ratio: 训练集和测试集划分比例
return
"""
#删除目录
if os.path.exists('data/prepare/'):
shutil.rmtree('data/prepare/')
#创建目录
if not os.path.exists('data/prepare/train/'):
os.makedirs('data/prepare/train/')
os.makedirs('data/prepare/test/')
#获取所有文件名
idxs = set([file.split('.')[0] for file in os.listdir('data/'+train_dir)])
idxs = list(idxs)
#随机划分训练集和测试集
shuffle(idxs) #打乱顺序
index = int(len(idxs)*train_ratio) #获取训练集的截止下标
#获取训练集和测试集文件名集合
train_ids = idxs[:index]
test_ids = idxs[index:]
#--------------------------------------------------------------------
# 引入多进程
#--------------------------------------------------------------------
#线程池方式调用
import multiprocessing as mp
num_cpus = mp.cpu_count() #获取机器CPU的个数
pool = mp.Pool(num_cpus)
results = []
#训练集处理
for idx in train_ids:
result = pool.apply_async(process_text, args=(idx,split_method,'train'))
results.append(result)
#测试集处理
for idx in test_ids:
result = pool.apply_async(process_text, args=(idx,split_method,'test'))
results.append(result)
#关闭进程池
pool.close()
pool.join()
[r.get for r in results]
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
#print(process_text('0',split_method=split_text,split_name='train'))
multi_process(split_text)
五.完整代码
1.data_process.py
#encoding:utf-8
import os
import re
#----------------------------功能:获取实体类别及个数---------------------------------
def get_entities(dirPath):
entities = {} #存储实体类别
files = os.listdir(dirPath) #遍历路径
#获取所有文件的名字并去重 0.ann => 0
filenames = set([file.split('.')[0] for file in files])
filenames = list(filenames)
#print(filenames)
#重新构造ANN文件名并遍历文件
for filename in filenames:
path = os.path.join(dirPath, filename+".ann")
#print(path)
#读文件
with open(path, 'r', encoding='utf8') as f:
for line in f.readlines():
#TAB键分割获取实体类型
name = line.split('t')[1]
#print(name)
value = name.split(' ')[0]
#print(value)
#实体加入字典并统计个数
if value in entities:
entities[value] += 1 #在实体集合中数量加1
else:
entities[value] = 1 #创建键值且值为1
#返回实体集
return entities
#----------------------------功能:命名实体BIO标注--------------------------------
def get_labelencoder(entities):
#排序
entities = sorted(entities.items(), key=lambda x: x[1], reverse=True)
print(entities)
#获取实体类别名称
entities = [x[0] for x in entities]
print(entities)
#标记实体
id2label = []
id2label.append('O')
#生成实体标记
for entity in entities:
id2label.append('B-'+entity)
id2label.append('I-'+entity)
#字典键值生成
label2id = {id2label[i]:i for i in range(len(id2label))}
return id2label, label2id
#-------------------------功能:自定义分隔符文本分割------------------------------
def split_text(text, outfile):
#分割后的下标
split_index = []
#文件写入
fw = open(outfile, 'w', encoding='utf8')
#--------------------------------------------------------------------
# 文本分割
#--------------------------------------------------------------------
#第一部分 按照符号分割
pattern = '。|,|,|;|;|?|?|.'
#获取字符的下标位置
for m in re.finditer(pattern, text):
"""
print(m)
start = m.span()[0] #标点符号位置
print(text[start])
start = m.span()[0] - 5
end = m.span()[1] + 5
print('****', text[start:end], '****')
"""
#特殊符号下标
idx = m.span()[0]
#判断是否断句 contniue表示不能直接分割句子
if text[idx-1]=='n': #当前符号前是换行符
continue
if text[idx-1].isdigit() and text[idx+1].isdigit(): #前后都是数字或数字+空格
continue
if text[idx-1].isdigit() and text[idx+1].isspace() and text[idx+2].isdigit():
continue
if text[idx-1].islower() and text[idx+1].islower(): #前后都是小写字母
continue
if text[idx-1].isupper() and text[idx+1].isupper(): #前后都是大写字母
continue
if text[idx-1].islower() and text[idx+1].isdigit(): #前面是小写字母 后面是数字
continue
if text[idx-1].isupper() and text[idx+1].isdigit(): #前面是大写字母 后面是数字
continue
if text[idx-1].isdigit() and text[idx+1].islower(): #前面是数字 后面是小写字母
continue
if text[idx-1].isdigit() and text[idx+1].isupper(): #前面是数字 后面是大写字母
continue
if text[idx+1] in set('.。;;,,'): #前后都是标点符号
continue
if text[idx-1].isspace() and text[idx-2].isspace() and text[idx-3].isupper():
continue #HBA1C 。两个空格+字母
if text[idx-1].isspace() and text[idx-3].isupper():
continue
#print('****', text[idx-20:idx+20], '****')
#将分句的下标存储至列表中 -> 标点符号后面的字符
split_index.append(idx+1)
#--------------------------------------------------------------------
#第二部分 按照自定义符号分割
#下列形式进行句子分割
pattern2 = '([一二三四五六七八九十零])|[一二三四五六七八九十零]、|'
pattern2 += '注:|附录 |表 d|Tab d+|[摘要]|[提要]|表d[^。,,;;]+?n|'
pattern2 += '图 d|Fig d|[Abdtract]|[Summary]|前 言|【摘要】|【关键词】|'
pattern2 += '结 果|讨 论|and |or |with |by |because of |as well as '
#print(pattern2)
for m in re.finditer(pattern2, text):
idx = m.span()[0]
#print('****', text[idx-20:idx+20], '****')
#连接词位于单词中间不能分割 如 goodbye
if (text[idx:idx+2] in ['or','by'] or text[idx:idx+3]=='and' or text[idx:idx+4]=='with')
and (text[idx-1].islower() or text[idx-1].isupper()):
continue
split_index.append(idx) #注意这里不加1 找到即分割
#--------------------------------------------------------------------
#第三部分 中文字符+数字分割
#判断序列且包含汉字的分割(2.接下来...) 同时小数不进行切割
pattern3 = 'nd.' #数字+点
for m in re.finditer(pattern3, text):
idx = m.span()[0]
if ischinese(text[idx+3]): #第四个字符为中文汉字 含换行
#print('****', text[idx-20:idx+20], '****')
split_index.append(idx+1)
#换行+数字+括号 (1)总体治疗原则:淤在选择降糖药物时
for m in re.finditer('n(d)', text):
idx = m.span()[0]
split_index.append(idx+1)
#--------------------------------------------------------------------
#获取句子分割下标后进行排序操作 增加第一行和最后一行
split_index = sorted(set([0, len(text)] + split_index))
split_index = list(split_index)
#print(split_index)
#计算机最大值和最小值
lens = [split_index[i+1]-split_index[i] for i in range(len(split_index)-1)]
#print(max(lens), min(lens))
#--------------------------------------------------------------------
# 长短句处理
#--------------------------------------------------------------------
#遍历每一个句子 (一)xxxx 分割
other_index = []
for i in range(len(split_index)-1):
begin = split_index[i]
end = split_index[i+1]
#print("-----", text[begin:end])
#print(begin, end)
if (text[begin] in '一二三四五六七八九十零') or
(text[begin]=='(' and text[begin+1] in '一二三四五六七八九十零'):
for j in range(begin,end):
if text[j]=='n':
other_index.append(j+1)
#补充+排序
split_index += other_index
split_index = list(sorted(set([0, len(text)] + split_index)))
#--------------------------------------------------------------------
#第一部分 长句处理:句子长度超过150进行拆分
other_index = []
for i in range(len(split_index)-1):
begin = split_index[i]
end = split_index[i+1]
other_index.append(begin)
#句子长度超过150切割 并且最短15个字符
if end-begin>150:
for j in range(begin,end):
#这一次下标位置比上一次超过15分割
if(j+1-other_index[-1])>15:
#换行分割
if text[j]=='n':
other_index.append(j+1)
#空格+前后数字
if text[j]==' ' and text[j-1].isnumeric() and text[j+1].isnumeric():
other_index.append(j+1)
split_index += other_index
split_index = list(sorted(set([0, len(text)] + split_index)))
#--------------------------------------------------------------------
#第二部分 删除空格的句子
for i in range(1, len(split_index)-1):
idx = split_index[i]
#当前下标和上一个下标对比 如果等于空格继续比较
while idx>split_index[i-1]-1 and text[idx-1].isspace():
idx -= 1
split_index[i] = idx
split_index = list(sorted(set([0, len(text)] + split_index)))
#--------------------------------------------------------------------
#第三部分 短句处理-拼接
temp_idx = []
i = 0
while i<(len(split_index)-1):
begin = split_index[i]
end = split_index[i+1]
#先统计句子中中文字符和英文字符个数
num_ch = 0
num_en = 0
if end - begin <15:
for ch in text[begin:end]:
if ischinese(ch):
num_ch += 1
elif ch.islower() or ch.isupper():
num_en += 1
if num_ch + 0.5*num_en>5: #大于5说明长度够用
temp_idx.append(begin)
i += 1 #注意break前i加1 否则死循环
break
#长度小于等于5和后面的句子合并
if num_ch + 0.5*num_en<=5:
temp_idx.append(begin)
i += 2
else:
temp_idx.append(begin) #大于15直接添加下标
i += 1
split_index = list(sorted(set([0, len(text)] + temp_idx)))
#查看句子长度 由于存在n换行一个字符
lens = [split_index[i+1]-split_index[i] for i in range(len(split_index)-1)][:-1] #删除最后一个换行
print(max(lens), min(lens))
#for i in range(len(split_index)-1):
# print(i, '****', text[split_index[i]:split_index[i+1]])
#存储结果
result = []
for i in range(len(split_index)-1):
result.append(text[split_index[i]:split_index[i+1]])
fw.write(text[split_index[i]:split_index[i+1]])
fw.close()
#检查:预处理后字符是否减少
s = ''
for r in result:
s += r
assert len(s)==len(text) #断言
return result
#---------------------------功能:判断字符是不是汉字-------------------------------
def ischinese(char):
if 'u4e00' <=char <= 'u9fff':
return True
return False
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
dirPath = "data/train_data"
outPath = 'data/train_data_pro'
#获取实体类别及个数
entities = get_entities(dirPath)
print(entities)
print(len(entities))
#完成实体标记 列表 字典
#得到标签和下标的映射
label, label_dic = get_labelencoder(entities)
print(label)
print(len(label))
print(label_dic, 'nn')
#遍历路径
files = os.listdir(dirPath)
filenames = set([file.split('.')[0] for file in files])
filenames = list(filenames)
for filename in filenames:
path = os.path.join(dirPath, filename+".txt") #TXT文件
outfile = os.path.join(outPath, filename+"_pro.txt")
#print(path)
with open(path, 'r', encoding='utf8') as f:
text = f.read()
#分割文本
print(path)
split_text(text, outfile)
print("n")
2.prepare_data.py
#encoding:utf-8
import os
import pandas as pd
from collections import Counter
from data_process import split_text
from tqdm import tqdm #进度条 pip install tqdm
#词性标注
import jieba.posseg as psg
#获取字的偏旁和拼音
from cnradical import Radical, RunOption
#删除目录
import shutil
#随机划分训练集和测试集
from random import shuffle
train_dir = "train_data"
#----------------------------功能:文本预处理---------------------------------
train_dir = "train_data"
def process_text(idx, split_method=None, split_name='train'):
"""
功能: 读取文本并切割,接着打上标记及提取词边界、词性、偏旁部首、拼音等特征
param idx: 文件的名字 不含扩展名
param split_method: 切割文本方法
param split_name: 存储数据集 默认训练集, 还有测试集
return
"""
#定义字典 保存所有字的标记、边界、词性、偏旁部首、拼音等特征
data = {}
#--------------------------------------------------------------------
# 获取句子
#--------------------------------------------------------------------
if split_method is None:
#未给文本分割函数 -> 读取文件
with open(f'data/{train_dir}/{idx}.txt', encoding='utf8') as f: #f表示文件路径
texts = f.readlines()
else:
#给出文本分割函数 -> 按函数分割
with open(f'data/{train_dir}/{idx}.txt', encoding='utf8') as f:
outfile = f'data/train_data_pro/{idx}_pro.txt'
print(outfile)
texts = f.read()
texts = split_method(texts, outfile)
#提取句子
data['word'] = texts
print(texts)
#--------------------------------------------------------------------
# 获取标签(实体类别、起始位置)
#--------------------------------------------------------------------
#初始时将所有汉字标记为O
tag_list = ['O' for s in texts for x in s] #双层循环遍历每句话中的汉字
#读取ANN文件获取每个实体的类型、起始位置和结束位置
tag = pd.read_csv(f'data/{train_dir}/{idx}.ann', header=None, sep='t') #Pandas读取 分隔符为tab键
#0 T1 Disease 1845 1850 1型糖尿病
for i in range(tag.shape[0]): #tag.shape[0]为行数
tag_item = tag.iloc[i][1].split(' ') #每一行的第二列 空格分割
#print(tag_item)
#存在某些实体包括两段位置区间 仅获取起始位置和结束位置
cls, start, end = tag_item[0], int(tag_item[1]), int(tag_item[-1])
#print(cls,start,end)
#对tag_list进行修改
tag_list[start] = 'B-' + cls
for j in range(start+1, end):
tag_list[j] = 'I-' + cls
#断言 两个长度不一致报错
assert len([x for s in texts for x in s])==len(tag_list)
#print(len([x for s in texts for x in s]))
#print(len(tag_list))
#--------------------------------------------------------------------
# 分割后句子匹配标签
#--------------------------------------------------------------------
tags = []
start = 0
end = 0
#遍历文本
for s in texts:
length = len(s)
end += length
tags.append(tag_list[start:end])
start += length
print(len(tags))
#标签数据存储至字典中
data['label'] = tags
#--------------------------------------------------------------------
# 提取词性和词边界
#--------------------------------------------------------------------
#初始标记为M
word_bounds = ['M' for item in tag_list] #边界 M表示中间
word_flags = [] #词性
#分词
for text in texts:
#带词性的结巴分词
for word, flag in psg.cut(text):
if len(word)==1: #1个长度词
start = len(word_flags)
word_bounds[start] = 'S' #单个字
word_flags.append(flag)
else:
start = len(word_flags)
word_bounds[start] = 'B' #开始边界
word_flags += [flag]*len(word) #保证词性和字一一对应
end = len(word_flags) - 1
word_bounds[end] = 'E' #结束边界
#存储
bounds = []
flags = []
start = 0
end = 0
for s in texts:
length = len(s)
end += length
bounds.append(word_bounds[start:end])
flags.append(word_flags[start:end])
start += length
data['bound'] = bounds
data['flag'] = flags
#--------------------------------------------------------------------
# 获取拼音和偏旁特征
#--------------------------------------------------------------------
radical = Radical(RunOption.Radical) #提取偏旁部首
pinyin = Radical(RunOption.Pinyin) #提取拼音
#提取拼音和偏旁 None用特殊符号替代
radical_out = [[radical.trans_ch(x) if radical.trans_ch(x) is not None else 'PAD' for x in s] for s in texts]
pinyin_out = [[pinyin.trans_ch(x) if pinyin.trans_ch(x) is not None else 'PAD' for x in s] for s in texts]
#赋值
data['radical'] = radical_out
data['pinyin'] = pinyin_out
#--------------------------------------------------------------------
# 存储数据
#--------------------------------------------------------------------
#获取样本数量
num_samples = len(texts) #行数
num_col = len(data.keys()) #列数 字典自定义类别数 6
print(num_samples)
print(num_col)
dataset = []
for i in range(num_samples):
records = list(zip(*[list(v[i]) for v in data.values()])) #压缩
dataset += records+[['sep']*num_col] #每处理一句话sep分割
#records = list(zip(*[list(v[0]) for v in data.values()]))
#for r in records:
# print(r)
#最后一行sep删除
dataset = dataset[:-1]
#转换成dataframe 增加表头
dataset = pd.DataFrame(dataset,columns=data.keys())
#保存文件 测试集 训练集
save_path = f'data/prepare/{split_name}/{idx}.csv'
dataset.to_csv(save_path,index=False,encoding='utf-8')
#--------------------------------------------------------------------
# 处理换行符 w表示一个字
#--------------------------------------------------------------------
def clean_word(w):
if w=='n':
return 'LB'
if w in [' ','t','u2003']: #中文空格u2003
return 'SPACE'
if w.isdigit(): #将所有数字转换为一种符号 数字训练会造成干扰
return 'NUM'
return w
#对dataframe应用函数
dataset['word'] = dataset['word'].apply(clean_word)
#存储数据
dataset.to_csv(save_path,index=False,encoding='utf-8')
#return texts, tags, bounds, flags
#return texts[0], tags[0], bounds[0], flags[0], radical_out[0], pinyin_out[0]
#----------------------------功能:预处理所有文本---------------------------------
def multi_process(split_method=None,train_ratio=0.8):
"""
功能: 对所有文本尽心预处理操作
param split_method: 切割文本方法
param train_ratio: 训练集和测试集划分比例
return
"""
#删除目录
if os.path.exists('data/prepare/'):
shutil.rmtree('data/prepare/')
#创建目录
if not os.path.exists('data/prepare/train/'):
os.makedirs('data/prepare/train/')
os.makedirs('data/prepare/test/')
#获取所有文件名
idxs = set([file.split('.')[0] for file in os.listdir('data/'+train_dir)])
idxs = list(idxs)
#随机划分训练集和测试集
shuffle(idxs) #打乱顺序
index = int(len(idxs)*train_ratio) #获取训练集的截止下标
#获取训练集和测试集文件名集合
train_ids = idxs[:index]
test_ids = idxs[index:]
#--------------------------------------------------------------------
# 引入多进程
#--------------------------------------------------------------------
#线程池方式调用
import multiprocessing as mp
num_cpus = mp.cpu_count() #获取机器CPU的个数
pool = mp.Pool(num_cpus)
results = []
#训练集处理
for idx in train_ids:
result = pool.apply_async(process_text, args=(idx,split_method,'train'))
results.append(result)
#测试集处理
for idx in test_ids:
result = pool.apply_async(process_text, args=(idx,split_method,'test'))
results.append(result)
#关闭进程池
pool.close()
pool.join()
[r.get for r in results]
#-------------------------------功能:主函数--------------------------------------
if __name__ == '__main__':
#print(process_text('0',split_method=split_text,split_name='train'))
multi_process(split_text)
六.总结
原文始发于微信公众号(娜璋AI安全之家):Python人工智能 | 二十六.基于BiLSTM-CRF的医学命名实体识别研究(上)数据预处理
免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论