Golang Pipeline

Linux中很经典的命令

cat log.txt | grep abc
# 基本概念是每一个进程的输出(stdout)直接作为下一个进程的输入(stdin)

例题:

从一个切片中找到所有的偶数,然后再将其*10

常用做法:

func main() {

   list := []int{2, 3, 6, 12, 22, 16, 4, 9, 23, 64, 62}
   fmt.Println(Multiply(Evens(list)))

}
func Evens(list []int) (ret []int) {
   ret = make([]int, 0)
   for _, num := range list {
      if num%2 == 0 {
         ret = append(ret, num)
      }
   }
   return
}
func Multiply(list []int) (ret []int) {
   ret = make([]int, 0)
   for _, num := range list {
      ret = append(ret, num*10)
   }
   return
}

管道函数

type Cmd func(list []int) (ret []int)

//管道函数
func p(args []int, c1 Cmd, c2 Cmd) []int {
   ret := c1(args)
   return c2(ret)
}

func main() {

	list := []int{2, 3, 6, 12, 22, 16, 4, 9, 23, 64, 62}
	fmt.Println(p(list, Evens, Multiply))

}

将其修改为管道的模式

func main() {

	list := []int{2, 3, 6, 12, 22, 16, 4, 9, 23, 64, 62}
	ret := Pipe(list, Evens, Multiply)
	for r := range ret {
		fmt.Printf("%d ", r)
	}

}
func Evens(list []int) chan int {
	c := make(chan int)
	go func() {
		defer close(c)
		for _, num := range list {
			if num%2 == 0 {
				c <- num
			}
		}
	}()
	return c
}
func Multiply(in chan int) chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for num := range in {
			out <- num * 10
		}
	}()
	return out
}

type Cmd func(list []int) chan int

type PipeCmd func(chan int) chan int

func Pipe(args []int, c1 Cmd, c2 PipeCmd) chan int {
	return c2(c1(args))
}

适应多管道模式

func main() {

	list := []int{2, 3, 6, 12, 22, 16, 4, 9, 23, 64, 62}
	ret := Pipe(list, Evens, Multiply, Multiply, Multiply)
	for r := range ret {
		fmt.Printf("%d ", r)
	}

}
func Evens(list []int) chan int {
	c := make(chan int)
	go func() {
		defer close(c)
		for _, num := range list {
			if num%2 == 0 {
				c <- num
			}
		}
	}()
	return c
}
func Multiply(in chan int) chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for num := range in {
			out <- num * 10
		}
	}()
	return out
}

type Cmd func(list []int) chan int

type PipeCmd func(chan int) chan int

func Pipe(args []int, c1 Cmd, c2 ...PipeCmd) chan int {
	c := c1(args)
	if len(c2) == 0 {
		return c
	}
	clist := make([]chan int, 0)
	for index, ch := range c2 {
		if index == 0 {
			clist = append(clist, ch(c))
		} else {
			clist = append(clist, ch(clist[len(clist)-1]))
		}
	}
	return clist[len(clist)-1]
}

多路复用

多个函数同时从一个channel里读取数据,直至chennal被关闭
package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {

   list := []int{2, 3, 6, 12, 22, 16, 4, 9, 23, 64, 62}
   ret := PipeMux(list, Evens, Multiply,Multiply,Multiply)
   for r := range ret {
      fmt.Printf("%d ", r)
   }

}
func Evens(list []int) chan int {
   c := make(chan int)
   go func() {
      defer close(c)
      for _, num := range list {
         if num%2 == 0 {
            c <- num
         }
      }
   }()
   return c
}
func Multiply(in chan int) chan int {
   out := make(chan int)
   go func() {
      defer close(out)
      for num := range in {
         time.Sleep(time.Millisecond*300)
         out <- num * 10
      }
   }()
   return out
}

type Cmd func(list []int) chan int

type PipeCmd func(chan int) chan int

func Pipe(args []int, c1 Cmd, c2 ...PipeCmd) chan int {
   c := c1(args)
   if len(c2) == 0 {
      return c
   }
   clist := make([]chan int, 0)
   for index, ch := range c2 {
      if index == 0 {
         clist = append(clist, ch(c))
      } else {
         clist = append(clist, ch(clist[len(clist)-1]))
      }
   }
   return clist[len(clist)-1]
}

//PipeMux 多路复用
func PipeMux(args []int, c1 Cmd, c2 ...PipeCmd) chan int {
   c := c1(args)
   if len(c2) == 0 {
      return c
   }
   out := make(chan int)
   wg := sync.WaitGroup{}
   for _, ch := range c2 {
      getChan := ch(c)
      wg.Add(1)
      go func(input chan int) {
         defer wg.Done()
         for v := range input {
            out <- v
         }
      }(getChan)
   }
   go func() {
      defer close(out)
      wg.Wait()
   }()

   return out
}

从mysql中读取文件到csv文件下:

package getData

import (
   "demo/demo/pipeline/dbInit"
   "encoding/csv"
   "fmt"
   "log"
   "os"
   "strconv"
)

type Book struct {
   BookId int `gorm:"column:book_id"`
   BookName string `gorm:"column:book_name"`
}
type BookList struct {
   Data []*Book
   Page int
}

const sql="select * from books order by book_id limit ? offset ? "
func ReadData()  {
   page:=1
   pagesize:=1000
   for{
      booklist:=&BookList{make([]*Book,0),page}
      db:=dbInit.GetDB().Raw(sql,pagesize,(page-1)*pagesize).Find(&booklist.Data)
      if db.Error!=nil || db.RowsAffected==0{
         break
      }
      err:=SaveData(booklist)
      if err!=nil{
         log.Println(err)
      }
      page++
   }
}

//写入到csv文件
func SaveData(data *BookList) error   {
   file:=fmt.Sprintf("./src/pipeline/csv/%d.csv",data.Page)
   csvFile,err:= os.OpenFile(file,os.O_RDWR|os.O_CREATE|os.O_TRUNC,0666)
   if err!=nil{
      return err
   }
   defer csvFile.Close()
   w := csv.NewWriter(csvFile)//创建一个新的写入文件流
   header := []string{"book_id", "book_name"}
   export := [][]string{
      header,
   }
   for _,d:=range data.Data{
      cnt:=[]string{
         strconv.Itoa(d.BookId),
         d.BookName,
      }
      export=append(export,cnt)
   }
   err=w.WriteAll(export)
   if err!=nil{
      return err
   }
   w.Flush()
   return nil
}

多路复用读取文件

package getData

import (
   "demo/demo/pipeline/dbInit"
   "encoding/csv"
   "fmt"
   "os"
   "strconv"
   "sync"
)

type Book struct {
   BookId   int    `gorm:"column:book_id"`
   BookName string `gorm:"column:book_name"`
}

type BookList struct {
   Data []*Book
   Page int
}

//入参
type InChan chan *BookList

//结果集
type Result struct {
   Page  int
   Error error
}

//管道数据输出
type OutChan chan *Result

//定义管道命令类型
type DataCmd func() InChan

type DataPipeCmd func(in InChan) OutChan

func Pipe(c1 func() InChan, cs ...DataPipeCmd) OutChan {
   in := c1()
   out := make(OutChan)
   wg := sync.WaitGroup{}
   for _, c := range cs {
      getChan := c(in)
      wg.Add(1)
      go func(input OutChan) {
         defer wg.Done()
         for v := range input {
            out <- v
         }
      }(getChan)
   }
   return out
}

func WriteData(in InChan) OutChan {
   out := make(OutChan)
   go func() {
      defer close(out)
      for d := range in {
         err := SaveData(d)
         out <- &Result{Page: d.Page, Error: err}
      }
   }()
   return out
}

const sql = "select * from books order by book_id limit ? offset ? "

func ReadData() InChan {
   page := 1
   pagesize := 1000
   in := make(InChan)
   go func() {
      for {
         booklist := &BookList{make([]*Book, 0), page}
         db := dbInit.GetDB().Raw(sql, pagesize, (page-1)*pagesize).Find(&booklist.Data)
         if db.Error != nil || db.RowsAffected == 0 {
            break
         }
         in <- booklist
         page++
      }
   }()
   return in
}

//写入到csv文件
func SaveData(data *BookList) error {
   file := fmt.Sprintf("D:\\go_work\\go\\src\\demo\\demo\\pipeline\\csv/%d.csv", data.Page)
   csvFile, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
   if err != nil {
      return err
   }
   defer csvFile.Close()
   w := csv.NewWriter(csvFile) //创建一个新的写入文件流
   header := []string{"book_id", "book_name"}
   export := [][]string{
      header,
   }
   for _, d := range data.Data {
      cnt := []string{
         strconv.Itoa(d.BookId),
         d.BookName,
      }
      export = append(export, cnt)
   }
   err = w.WriteAll(export)
   if err != nil {
      return err
   }
   w.Flush()
   return nil
}

func Test() {
   out := Pipe(ReadData, WriteData, WriteData, WriteData,WriteData)
   for o := range out {
      fmt.Printf("%d文件执行完成,结果%v\n", o.Page, o.Error)
   }
}

封装一个通用的管道方法:

package getdata

import "sync"

type InChan chan interface{}
type OutChan chan interface{}
type CmdFunc func(args ...interface{}) InChan
type PipeCmdFunc func(in InChan) OutChan
type Pipe struct{
   Cmd CmdFunc
   PipeCmd PipeCmdFunc
   Count int
}

func NewPipe()  *Pipe {
   return &Pipe{Count:1}
}
func(this *Pipe) SetCmd(c CmdFunc)  {
   this.Cmd=c
}
func(this *Pipe) SetPipeCmd(c PipeCmdFunc,count int )  {
   this.PipeCmd=c
   this.Count=count
}

func(this *Pipe) Exec(args ...interface{}) OutChan  {
   in:=this.Cmd(args)
   out:=make(OutChan)
   wg:=sync.WaitGroup{}
   for i:=0;i<this.Count;i++{
      getChan:=this.PipeCmd(in)
      wg.Add(1)
      go func(input OutChan) {
         defer wg.Done()
         for v:=range input{
            out<-v
         }
      }(getChan)
   }
   go func() {
      defer close(out)
      wg.Wait()
   }()
   return out
}

测试:

package getdata

import (
   "fmt"
   "goplus/src/pipeline/AppInit"
   "time"

   "log"
)

const sql="select * from books order by book_id limit ? offset ?"

func GetPage(args ...interface{}) InChan  {
   in:=make(InChan)
   go func() {
      defer close(in)
         for i:=1;i<=80;i++{
            in<-i
         }
   }()
   return in


}
func GetData(in InChan) OutChan {
   out:=make(OutChan)
   go func() {
      defer close(out)
      for d:=range in {
         page:=d.(int)
         pagesize:=1000
         booklist := &BookList{make([]*Book, 0), page}
         db := AppInit.GetDB().Raw(sql, pagesize, (page-1)*pagesize).Find(&booklist.Data)
         if db.Error!=nil{
            log.Println(db.Error)
         }
         out<-booklist.Data
      }
   }()
   return out
}

//模拟处理数据
func DoData(in InChan) OutChan{
   out:=make(OutChan)
   go func() {
      defer close(out)
      for d:=range in {
         v:=d.([]*Book)
         time.Sleep(time.Second*1)
         out<-fmt.Sprintf("处理了%d条数据,%d\n",len(v),time.Now().Unix())
      }
   }()
   return out
}
func PipeTest()  {
     p1:=NewPipe()
     p1.SetCmd(GetPage)
     p1.SetPipeCmd(GetData,5)
     out:=p1.Exec()


     p2:=NewPipe()
     p2.SetCmd(func(args ...interface{}) InChan {
         return InChan(out)
    })
    p2.SetPipeCmd(DoData,2)
     out2:=p2.Exec()

     for item:=range out2{
       fmt.Println(item)
    }

}

Q.E.D.


勤俭节约,艰苦奋斗。