介绍
开发工具是为了解决大量文本数据迁移的目的。迁移的主要对象为各种关系型数据库数据。
因为关系型数据库
的存储和检索的方式限制,当大量数据累积在数据库中,数据查询速度会稍慢于非关系型数据库
。
但是当有大量数据的时候需要将原本关系型数据库
的数据迁移到非关系型数据库中
又比价麻烦。所以开发了这个名字叫做esinputer
的工具。
为保证并发效率,使用熟悉的语言进行开发,本人使用的golang
进行开发,各位可按照本文思路选择自己最适合的开发路线。
阅读提示
由于代码都片段,仅仅列出关键代码,甚至会省略一些错误处理的代码,如果是没有使用过golang的同学,请忽视代码,直接阅读文字。
功能和适用环境
1、处理对象: 文本类型数据
例:test.txt
姓名---QQ号---QQ群---注册时间
李慢慢233---496672097----599032459---2022年2月2日
李慢慢---没有qq号---123321234---2022年2月2日
第一行为字段
名称,使用---
进行分割。以上例子中一共4个字段,你也可以根据你自己的表进行修改。
对应的表结构如下:
转换数据库文件为文本文件
可以使用navicat
的导出
功能
导出时请注意选择分隔符
最好使用不常用的多个字符进行导出
设计思路
读取本地文本
一般情况下使用golang
进行文件读取处理,关键部分代码如下
```
filename := "test.txt" // 文件名
content, err := ioutil.ReadFile(filename) // 读取文件内容
if err != nil {
fmt.Println("Error reading file:", err)
return
}
lines := strings.Split(string(content), "\n") // 将内容按行分割
data := make([]map[string]string) // 定义一个map,用于存储文件内容
keys:=[]string
int i=1 //第一行作为map[string]
for , line := range lines {
if i==1{
i++
keys= strings.Split(string(line), "---") // 将内容按---分割并设置为keys
}else{
value:= strings.Split(string(line), "---")
for ,v1:=keys{
cache:=chache[key][value]
data=append(data,cache) //-----这里将key和valu进行绑定,手写的,可能有语法错误
}
}
}
```
最终希望形成的数据格式
[]map[string]string的数据结构
| map | key1 | key2 |
| -------- | -------- | -------- |
| map[0] | value0 | value0 |
| map[1] | value1 | value1 |
| data | 姓名 | QQ号 |
| --------- | ----------- | ----------- |
| data[0] | 李慢慢233 | 496672097 |
| data[1] | 李慢慢 | 599032459 |
注意 :但是由于设计目的是为了处理大量的数据。 所以一次性读取所有文件会导致内存不足。
注意: 由于为了保证处理速度一定会使用到并发技术, 所以要进行关键变量进行锁。
所以改变策略,修改以上代码。思路有两种
1、分行读取,每读取N行,就插入es。
2、创建通信缓冲区,将读取的值放入缓冲区,让其他协程序读取。如果其他协程还没处理, 就阻塞。
第一种方法没有缓冲,直接进行数据处理。 当读取文件的数据大于es处理的速度的时候容易出现错误,需要有错误回退机制。
第二种方法利用channl
信道做缓存,这里的代码只需要将读取的数据送入缓存即可。其他事情有别的函数处理。是比较合适的解耦办法。
这里采用第二种,使用channl
通道,将读取的内容传出去关键代码如下:
```
// 定义要读取的文件路径和每N个map[string]string打包的大小
// 打开文件
file, err := os.Open(filePath)
if err != nil {
log.Println("open file error:", err)
panic(err)
}
defer file.Close()
// 创建一个scanner来逐行读取文件
scanner := bufio.NewScanner(file)
// 定义一个切片用于存储map[string]string
dataSlice := make([]map[string]string, 0)
// 逐行读取文件,并将每一行按照"----"符号分割并组装成map[string]string
var Desc []string
isdesc := true
for scanner.Scan() {
line := scanner.Text()
if repalcestr != "" {
line = strings.ReplaceAll(line, repalcestr, repalcedstr) //字符\N替换为空
}
parts := strings.Split(line, splstr)
if isdesc {
Desc = parts
isdesc = false
log.Println("插入的字段如下:", Desc)
continue
}
// 组装map[string]string
data := make(map[string]string)
if len(Desc) > len(parts) {
lostdata++
continue
}
for i := 0; i < len(Desc); i++ {
data[Desc[i]] = parts[i]
}
// 过滤空值
// 将map[string]string添加到切片中
if len(dataSlice) != batchSize-1 {
dataSlice = append(dataSlice, data)
} else {
dataSlice = append(dataSlice, data)
//最后把读取的数据,通过channl
送出去。如果channl满了。就阻塞着呗。 等着那版向es发送完了再处理
channl<-dataSlice
dataSlice = make([]map[string]string, 0)
}
}
if len(dataSlice) != 0 {
//最后把读取的数据,通过channl
送出去。如果channl满了。就阻塞着呗。 等着那版向es发送完了再处理
channl<-dataSlice
}
//注意, 这里还需要一个在清空数据之后清理掉dataSlice,再重新读取
```
具体内容需要自己去解耦和不全。
组装成es的api请求结构(等待协程调用)
发送的路径由http.MethodPost, postdata.EsUrl+"/_bulk"
使用的是es的_bulk
api。
目的是为了保证发送效率。
正常情况下,发送的json数据为类似于{索引:xxx,字段1:值1,字段2:值2}
,每次只能处理一条。
而es所提供的bulk
提供的是一种在一个json中传输多个数组api。可以将多条类似数据进行打包。
```
{ "index" : { "_index" : "test"} }
{"salt":"测试盐","密码":"测试密码","账号":"测试账号","邮箱":"测试邮箱"}
{ "index" : { "_index" : "test"} }
{"salt":"测试盐1","密码":"测试密码1","账号":"测试账号1","邮箱":"测试邮箱1"}
```
关键代码如下:
``
{ "index" : { "_index" : "%s" } }%s`, postdata.Index, "\n"))
num := 0
postdata := <-msg
// 构建批量插入的请求体
var buf bytes.Buffer
for _, d := range postdata.Data {
meta := []byte(fmt.Sprintf(
doc, err := json.Marshal(d)
if err != nil {
fmt.Println("Error json:", err)
return
}
buf.Grow(len(meta) + len(doc))
buf.Write(meta)
buf.Write(doc)
buf.WriteByte('\n')
}
//fmt.Println(buf.String())
// 发送批量插入请求
req, err := http.NewRequest(http.MethodPost, postdata.EsUrl+"/_bulk", strings.NewReader(buf.String()))
if err != nil {
log.Println("Error request:", err)
return
}
req.Header.Set("Content-Type", "application/json")
RETRY:
if num > 3 {
log.Println("数据", postdata.Data, "重试3次失败,放弃发送")
return
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Println("client:", err)
return
}
//fmt.Println(buf.String())
defer resp.Body.Close()
// 处理响应
if resp.StatusCode != http.StatusOK {
log.Println("Error: unexpected status code:", resp.StatusCode)
fmt.Println("ES访问出错了:")
fmt.Println("Error: unexpected status code:", resp.StatusCode)
return
}
//将返回body转换为结构体
err = json.NewDecoder(resp.Body).Decode(&resps)
if err != nil {
log.Println("Error decoding response body:", err)
return
}
if resps.Errors {
//延迟100毫秒后重试
log.Println("Error: bulk insert failed 准备重新发送")
num++
time.Sleep(100 * time.Millisecond)
goto RETRY
}
```
多协程处理
步骤有3个,
1、确认数据大小,如果上面一次性读取的太多,还是要分包处理。因为es的 _bulk
有个最高限制。(可调)
2、读取上面第一步读取本地文件 的信道传输过来的数据
3、创建一个协程,并使用channl
通道向协程发送需要传输的数据。调用 组装成es的api请求结构(等待协程调用)
```
// 定义一个waitGroup来等待所有goroutine完成
var wg sync.WaitGroup
// 分批处理map[string]string
for i := 0; i < time; i ++{
wg.Add(1)
go func(batch []map[string]string) {
defer wg.Done()
InsertEsByBulk(sendch)
ch <- msg{Id: 1, msg: "ok"}
}(batch)
}
// 等待所有goroutine完成
go func() {
for {
select {
case <-ch:
mutex.Lock()
counter++
fmt.Printf("已经完成线程:%d\r", counter)
mutex.Unlock()
}
}
}()
wg.Wait()
```
这里的time
为你需开启多少个携程。
还需要协成将读取的文件 sendch \<- Sendmap{Data: batch, Index: conf.IndexName, EsUrl: conf.ElasticHost}
最后在 InsertEsByBulk
中使用postdata := \<-msg读取发送过来的数据, 并将数据发送到es
其他拓展功能
在数据量大的情况下,导出的文件过大的时候。 按照以上的代码,需要咋首行设置字段信息。 但是由于导出的时候是不包含字段信息的, 所以需要在运行前在首行添加一行。 但是由于文件过大,打开就得花费几分钟了。 更别说还得修改保证。 内存小点的电脑估计打开都困难。 为了解决这个问题,需要开发一个少量文件查看、少量文件修改的功能
查看文件前20行内容
int i=0
scanner := bufio.NewScanner(file)
if i>50 {
return //超过50行了就结束打印
}
fmt.Println("前", num, "行文件内容如下")
for scanner.Scan() {
line := scanner.Text()
parts := strings.Split(line, splstr)
if isdesc {
Desc = parts
isdesc = false
log.Println("插入的字段如下:", Desc)
fmt.Println("插入的字段如下:", Desc)
i++
continue
}
这里读取第一行作为字段信息, 然后按照字段信息打印每一行内容。
将第一行修改为我们想要的格式(字段)
```
func InsertStrInFirstLine(filename string, insertStr string) error {
// 打开文件
file, err := os.OpenFile(filename, os.O_RDWR, 0644)
if err != nil {
return err
}
defer file.Close()
// 读取文件第一行
reader := bufio.NewReader(file)
line, err := reader.ReadString('\n')
if err != nil {
log.Println(err)
return err
}
// 回退到文件开头
_, err = file.Seek(0, 0)
if err != nil {
log.Println(err)
return err
}
// 写入新的第一行
writer := bufio.NewWriter(file)
_, err = writer.WriteString(insertStr + "\n")
if err != nil {
log.Println(err)
return err
}
// 写入原来的第一行之后的内容
_, err = writer.WriteString(line)
if err != nil {
log.Println(err)
return err
}
// 刷新缓冲区并保存文件
err = writer.Flush()
if err != nil {
log.Println(err)
return err
}
return nil
}
```
测试插入数据是否成功
运行之后,如果首行的字段信息设置的不好,可能出现插入错误数据的问题。 为了解决这个问题。 还需要一个测试插入的功能。 即,插入50行试一试。 这个就不贴代码了。 就是将设计思路
中的第三项修改为执行N50次即可。或者用其他办法执行插入命令也行。
总结
在实际开发的过程中。
1、还需要处理log,保证运行数据可追踪。
2、注意读取本地文件
、组装es数据
、 多协程处理
之间的通信
3、导出的文件格式
4、设计提示和交互
5、设计配置文件
。。。。。 言不尽在此。
但是只要有设计思路即可慢慢做完。
运行效果
大致的页面效果如下:
第一步:运行
第二步 查看data
文件夹下的数据
第三步 导入(如果没问题的话,第一行没有设置好就输入3就行)
AMD 2600 16G内存, 10亿数据导入事件大概26小时。
你可以在conf
配置中自行修改协程与数量。
是否开源
本文章只是分享一个设计思路 ,具体的还需要自己去做。 所以不开源。
如果有需要的可以来我的群自取编译好的文件。我的群不告诉你在哪里。
重要说明
仅做运维支持工具 ,请勿用作非法用途。
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论