esinputer超大文本数据入库es的工具设计思路

admin 2023年7月12日01:43:55esinputer超大文本数据入库es的工具设计思路已关闭评论10 views字数 6382阅读21分16秒阅读模式

介绍

开发工具是为了解决大量文本数据迁移的目的。迁移的主要对象为各种关系型数据库数据。

因为关系型数据库的存储和检索的方式限制,当大量数据累积在数据库中,数据查询速度会稍慢于非关系型数据库

但是当有大量数据的时候需要将原本关系型数据库 的数据迁移到非关系型数据库中 又比价麻烦。所以开发了这个名字叫做esinputer 的工具。

为保证并发效率,使用熟悉的语言进行开发,本人使用的golang进行开发,各位可按照本文思路选择自己最适合的开发路线。

阅读提示

由于代码都片段,仅仅列出关键代码,甚至会省略一些错误处理的代码,如果是没有使用过golang的同学,请忽视代码,直接阅读文字。

功能和适用环境

1、处理对象: 文本类型数据

例:test.txt

姓名---QQ号---QQ群---注册时间
李慢慢233---496672097----599032459---2022年2月2日
李慢慢---没有qq号---123321234---2022年2月2日

第一行为字段 名称,使用---进行分割。以上例子中一共4个字段,你也可以根据你自己的表进行修改。

对应的表结构如下:

esinputer超大文本数据入库es的工具设计思路

转换数据库文件为文本文件

可以使用navicat导出功能

esinputer超大文本数据入库es的工具设计思路

导出时请注意选择分隔符 最好使用不常用的多个字符进行导出

设计思路

读取本地文本

一般情况下使用golang进行文件读取处理,关键部分代码如下

```
filename := "test.txt" // 文件名
content, err := ioutil.ReadFile(filename) // 读取文件内容
if err != nil {
fmt.Println("Error reading file:", err)
return
}

lines := strings.Split(string(content), "\n") // 将内容按行分割

data := make([]map[string]string) // 定义一个map,用于存储文件内容
keys:=[]string
int i=1 //第一行作为map[string]

for , line := range lines {
if i==1{
i++
keys= strings.Split(string(line), "---") // 将内容按---分割并设置为keys
}else{
value:= strings.Split(string(line), "---")
for
,v1:=keys{
cache:=chache[key][value]
data=append(data,cache) //-----这里将key和valu进行绑定,手写的,可能有语法错误
}
}

}

```

最终希望形成的数据格式

[]map[string]string的数据结构

| map | key1 | key2 |
| -------- | -------- | -------- |
| map[0] | value0 | value0 |
| map[1] | value1 | value1 |

| data | 姓名 | QQ号 |
| --------- | ----------- | ----------- |
| data[0] | 李慢慢233 | 496672097 |
| data[1] | 李慢慢 | 599032459 |

注意 :但是由于设计目的是为了处理大量的数据。 所以一次性读取所有文件会导致内存不足。

注意: 由于为了保证处理速度一定会使用到并发技术, 所以要进行关键变量进行锁。

所以改变策略,修改以上代码。思路有两种

1、分行读取,每读取N行,就插入es。

2、创建通信缓冲区,将读取的值放入缓冲区,让其他协程序读取。如果其他协程还没处理, 就阻塞。

第一种方法没有缓冲,直接进行数据处理。 当读取文件的数据大于es处理的速度的时候容易出现错误,需要有错误回退机制。

第二种方法利用channl 信道做缓存,这里的代码只需要将读取的数据送入缓存即可。其他事情有别的函数处理。是比较合适的解耦办法。

这里采用第二种,使用channl通道,将读取的内容传出去关键代码如下:

```
// 定义要读取的文件路径和每N个map[string]string打包的大小

// 打开文件
file, err := os.Open(filePath)
if err != nil {
log.Println("open file error:", err)
panic(err)
}
defer file.Close()
// 创建一个scanner来逐行读取文件
scanner := bufio.NewScanner(file)

// 定义一个切片用于存储map[string]string
dataSlice := make([]map[string]string, 0)

// 逐行读取文件,并将每一行按照"----"符号分割并组装成map[string]string
var Desc []string
isdesc := true
for scanner.Scan() {
line := scanner.Text()
if repalcestr != "" {
line = strings.ReplaceAll(line, repalcestr, repalcedstr) //字符\N替换为空
}
parts := strings.Split(line, splstr)
if isdesc {
Desc = parts
isdesc = false
log.Println("插入的字段如下:", Desc)
continue
}
// 组装map[string]string
data := make(map[string]string)
if len(Desc) > len(parts) {
lostdata++
continue
}
for i := 0; i < len(Desc); i++ {
data[Desc[i]] = parts[i]
}
// 过滤空值

// 将map[string]string添加到切片中
if len(dataSlice) != batchSize-1 {
dataSlice = append(dataSlice, data)
} else {
dataSlice = append(dataSlice, data)
//最后把读取的数据,通过channl送出去。如果channl满了。就阻塞着呗。 等着那版向es发送完了再处理
channl<-dataSlice
dataSlice = make([]map[string]string, 0)
}
}
if len(dataSlice) != 0 {
//最后把读取的数据,通过channl送出去。如果channl满了。就阻塞着呗。 等着那版向es发送完了再处理
channl<-dataSlice
}
//注意, 这里还需要一个在清空数据之后清理掉dataSlice,再重新读取
```

具体内容需要自己去解耦和不全。

组装成es的api请求结构(等待协程调用)

发送的路径由http.MethodPost, postdata.EsUrl+"/_bulk" 使用的是es的_bulk api。

目的是为了保证发送效率。

正常情况下,发送的json数据为类似于{索引:xxx,字段1:值1,字段2:值2} ,每次只能处理一条。

而es所提供的bulk 提供的是一种在一个json中传输多个数组api。可以将多条类似数据进行打包。

```
{ "index" : { "_index" : "test"} }
{"salt":"测试盐","密码":"测试密码","账号":"测试账号","邮箱":"测试邮箱"}
{ "index" : { "_index" : "test"} }
{"salt":"测试盐1","密码":"测试密码1","账号":"测试账号1","邮箱":"测试邮箱1"}

```

关键代码如下:

``
num := 0
postdata := <-msg
// 构建批量插入的请求体
var buf bytes.Buffer
for _, d := range postdata.Data {
meta := []byte(fmt.Sprintf(
{ "index" : { "_index" : "%s" } }%s`, postdata.Index, "\n"))
doc, err := json.Marshal(d)
if err != nil {
fmt.Println("Error json:", err)
return
}
buf.Grow(len(meta) + len(doc))
buf.Write(meta)
buf.Write(doc)
buf.WriteByte('\n')
}
//fmt.Println(buf.String())
// 发送批量插入请求
req, err := http.NewRequest(http.MethodPost, postdata.EsUrl+"/_bulk", strings.NewReader(buf.String()))
if err != nil {
log.Println("Error request:", err)
return
}
req.Header.Set("Content-Type", "application/json")
RETRY:
if num > 3 {
log.Println("数据", postdata.Data, "重试3次失败,放弃发送")
return
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Println("client:", err)
return
}
//fmt.Println(buf.String())
defer resp.Body.Close()

// 处理响应
if resp.StatusCode != http.StatusOK {
log.Println("Error: unexpected status code:", resp.StatusCode)
fmt.Println("ES访问出错了:")
fmt.Println("Error: unexpected status code:", resp.StatusCode)
return
}
//将返回body转换为结构体
err = json.NewDecoder(resp.Body).Decode(&resps)
if err != nil {
log.Println("Error decoding response body:", err)
return
}
if resps.Errors {
//延迟100毫秒后重试
log.Println("Error: bulk insert failed 准备重新发送")
num++
time.Sleep(100 * time.Millisecond)
goto RETRY
}
```

多协程处理

步骤有3个,

1、确认数据大小,如果上面一次性读取的太多,还是要分包处理。因为es的 _bulk 有个最高限制。(可调)

2、读取上面第一步读取本地文件 的信道传输过来的数据

3、创建一个协程,并使用channl通道向协程发送需要传输的数据。调用 组装成es的api请求结构(等待协程调用)

```
// 定义一个waitGroup来等待所有goroutine完成
var wg sync.WaitGroup

// 分批处理map[string]string
for i := 0; i < time; i ++{

wg.Add(1)
go func(batch []map[string]string) {

defer wg.Done()
InsertEsByBulk(sendch)
ch <- msg{Id: 1, msg: "ok"}
}(batch)
}
// 等待所有goroutine完成
go func() {
for {
select {
case <-ch:
mutex.Lock()
counter++
fmt.Printf("已经完成线程:%d\r",
counter)
mutex.Unlock()
}
}
}()

wg.Wait()
```

这里的time为你需开启多少个携程。

还需要协成将读取的文件 sendch \<- Sendmap{Data: batch, Index: conf.IndexName, EsUrl: conf.ElasticHost}

最后在 InsertEsByBulk中使用postdata := \<-msg读取发送过来的数据, 并将数据发送到es

其他拓展功能

在数据量大的情况下,导出的文件过大的时候。 按照以上的代码,需要咋首行设置字段信息。 但是由于导出的时候是不包含字段信息的, 所以需要在运行前在首行添加一行。 但是由于文件过大,打开就得花费几分钟了。 更别说还得修改保证。 内存小点的电脑估计打开都困难。 为了解决这个问题,需要开发一个少量文件查看、少量文件修改的功能

查看文件前20行内容

int i=0
scanner := bufio.NewScanner(file)
if i>50 {
return //超过50行了就结束打印
}
fmt.Println("前", num, "行文件内容如下")
for scanner.Scan() {
line := scanner.Text()
parts := strings.Split(line, splstr)
if isdesc {
Desc = parts
isdesc = false
log.Println("插入的字段如下:", Desc)
fmt.Println("插入的字段如下:", Desc)
i++
continue
}

这里读取第一行作为字段信息, 然后按照字段信息打印每一行内容。

将第一行修改为我们想要的格式(字段)

```
func InsertStrInFirstLine(filename string, insertStr string) error {
// 打开文件
file, err := os.OpenFile(filename, os.O_RDWR, 0644)
if err != nil {
return err
}
defer file.Close()
// 读取文件第一行
reader := bufio.NewReader(file)
line, err := reader.ReadString('\n')
if err != nil {
log.Println(err)
return err
}

// 回退到文件开头
_, err = file.Seek(0, 0)
if err != nil {
log.Println(err)
return err
}

// 写入新的第一行
writer := bufio.NewWriter(file)
_, err = writer.WriteString(insertStr + "\n")
if err != nil {
log.Println(err)
return err
}

// 写入原来的第一行之后的内容
_, err = writer.WriteString(line)
if err != nil {
log.Println(err)
return err
}

// 刷新缓冲区并保存文件
err = writer.Flush()
if err != nil {
log.Println(err)
return err
}

return nil
}

```

测试插入数据是否成功

运行之后,如果首行的字段信息设置的不好,可能出现插入错误数据的问题。 为了解决这个问题。 还需要一个测试插入的功能。 即,插入50行试一试。 这个就不贴代码了。 就是将设计思路中的第三项修改为执行N50次即可。或者用其他办法执行插入命令也行。

总结

在实际开发的过程中。

1、还需要处理log,保证运行数据可追踪。

2、注意读取本地文件组装es数据多协程处理之间的通信

3、导出的文件格式

4、设计提示和交互

5、设计配置文件

。。。。。 言不尽在此。

但是只要有设计思路即可慢慢做完。

运行效果

大致的页面效果如下:

第一步:运行

esinputer超大文本数据入库es的工具设计思路

第二步 查看data文件夹下的数据

esinputer超大文本数据入库es的工具设计思路

第三步 导入(如果没问题的话,第一行没有设置好就输入3就行)

esinputer超大文本数据入库es的工具设计思路

AMD 2600 16G内存, 10亿数据导入事件大概26小时。

你可以在conf 配置中自行修改协程与数量。

是否开源

本文章只是分享一个设计思路 ,具体的还需要自己去做。 所以不开源。

如果有需要的可以来我的群自取编译好的文件。我的群不告诉你在哪里。

重要说明

仅做运维支持工具 ,请勿用作非法用途。

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2023年7月12日01:43:55
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   esinputer超大文本数据入库es的工具设计思路http://cn-sec.com/archives/1868856.html