总结下实现思路:

创新互联长期为超过千家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为资源企业提供专业的成都网站设计、成都网站建设,资源网站改版等技术服务。拥有10多年丰富建站经验和众多成功案例,为您定制开发。
package pipeline
import (
"encoding/binary"
"fmt"
"io"
"math/rand"
"sort"
"time"
)
var startTime time.Time
func Init() {
startTime = time.Now()
}
//内部处理方法
//这里是排序:异步处理容器元素排序
func InMemSort(in <-chan int) <-chan int {
out := make(chan int, 1024)
go func() {
a := []int{}
for v := range in {
a = append(a, v)
}
fmt.Println("Read done:", time.Since(startTime))
sort.Ints(a)
fmt.Println("InMemSort done:", time.Since(startTime))
for _, v := range a {
out <- v
}
close(out)
}()
return out
}
//两路和并,每路通过内部方法异步处理
//这里是排序:in1,in2元素需要排好序(经过内部方法InMemSort异步处理)的容器单元(channel 异步容器/队列)
func Merge(in1, in2 <-chan int) <-chan int {
out := make(chan int, 1024)
// go func() {
// v1, ok1 := <-in1
// v2, ok2 := <-in2
// for {
// if ok1 || ok2 {
// if !ok2 || (ok1 && v1 <= v2) { //v2无值或v1值比v2大
// out <- v1
// v1, ok1 = <-in1
// } else {
// out <- v2
// v2, ok2 = <-in2
// }
// } else {
// close(out)
// break
// }
// }
// }()
go func() {
v1, ok1 := <-in1
v2, ok2 := <-in2
for ok1 || ok2 {
if !ok2 || (ok1 && v1 <= v2) { //v2无值或v1值比v2大
out <- v1
v1, ok1 = <-in1
} else {
out <- v2
v2, ok2 = <-in2
}
}
close(out)
fmt.Println("Merge done:", time.Since(startTime))
}()
return out
}
//读取原数据
//chunkSize=-1全读
func ReadSource(r io.Reader, chunkSize int) <-chan int {
out := make(chan int, 1024)
go func() {
buffer := make([]byte, 8) //int长度根据操作系统来的,64位为int64,64位8个字节
bytesRead := 0
for { //持续读取
n, err := r.Read(buffer) //读取一个int 8byte
bytesRead += n
if n > 0 {
out <- int(binary.BigEndian.Uint64(buffer)) //字节数组转int
}
if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) { //-1全读
break
}
}
close(out)
}()
return out
}
//写处理后(排序)数据
func WriteSink(w io.Writer, in <-chan int) {
for v := range in {
buffer := make([]byte, 8)
binary.BigEndian.PutUint64(buffer, uint64(v))
w.Write(buffer)
}
}
//随机生成数据源
func RandomSource(count int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < count; i++ {
out <- rand.Int()
}
close(out)
}()
return out
}
//多路两两归并,每路通过内部方法异步处理
//这里是排序:ins元素需要排好序(经过内部方法InMemSort异步处理)的容器单元(channel 异步容器/队列)
func MergeN(ins ...<-chan int) <-chan int {
if len(ins) == 1 {
return ins[0]
}
m := len(ins) / 2
return Merge(
MergeN(ins[:m]...),
MergeN(ins[m:]...)) //chennel异步并发归并
}