主要功能:
- 输入操作(用户输入):
- A操作(添加任务数据):
- 一个协程程序读取源文件夹下所有文件绝对路径,生成相应的目标文件夹下的绝对路径。
- 将源文件绝对路径和目标文件绝对路径,存储在数据队列中。
- B操作(读取数据,并发比对备份文件):
- 通过循环判断数据队列中是否有数据,或者是否添加数据完成。如果没数据,同时B操作添加数据已完成,退出循环(退出C操作)。
- 如果添加数据未完成,循环读取数据队列中的值,如果有数据,按照输入的同步协程数,同步读取队列中数据,进行如下操作:
- 目标已经有重名文件,比对文件的md5和sha256(防止哈希碰撞),如果相同,证明文件相同,此文件不做任何操作,退出本次循环。
- 如果md5和sha256有不同,证明两文件只是文件名相同,文件内容有不同,对目标文件重命名,如果重命名后,继续循环上一操作,直到目标中没有重命名后的文件。(重命名就是对原文件名后加“(1)”,多次重命名后就是:“文件名(1)(1)(1).扩展名”)
- 目标没有重名文件,或者重命名完成后的文件,备份文件。
代码缺点
- 问题:数据队列没有数据时,消费者需要不停的循环判断等待。
- 希望有解决方案:改为堵塞等待,性能会有进一步提升。欢迎提出宝贵意见。
主文件代码
package mainimport ("fmt""io/fs""os""path/filepath""sync"
)
var wq = NewWorkQueue()
var fz = false
var wgroup = sync.WaitGroup{}
var readMax = 5
var ch = make(chan struct{}, readMax)
var cn = NewCountNum()
func main() {var srcDir, dstDir stringfmt.Print("源文件路径:")fmt.Scanln(&srcDir)fmt.Print("目标文件路径:")fmt.Scanln(&dstDir)srcAbs, _ := filepath.Abs(srcDir) dstAbs, _ := filepath.Abs(dstDir) fmt.Print("同步数量:")fmt.Scanln(&readMax)wgroup.Add(1) go A(srcAbs, dstAbs)B() wgroup.Wait() logstr := fmt.Sprintf("[完成] %s %s 错误:%d,已存在:%d,成功:%d!\n", srcAbs, dstAbs, cn.ErrGet(), cn.WarnGet(), cn.OkGet())SaveLog(logstr)
}
func A(srcAbs, dstAbs string) {os.MkdirAll(dstAbs, os.ModeDir) filepath.WalkDir(srcAbs, func(path string, d fs.DirEntry, err error) error {if err != nil {return err}dstPath := DstPaht(path, srcAbs, dstAbs) if d.IsDir() { os.MkdirAll(dstPath, os.ModeDir) } else { wq.Add(map[string]string{"src": path, "dst": dstPath})}return nil})fz = true wgroup.Done()
}
func B() {for {if fz && wq.Size() == 0 { return } if wq.Size() > 0 { go func() {defer wgroup.Done() wgroup.Add(1) data := wq.Pop() if data != nil { ch <- struct{}{} wgroup.Add(1) go func(data interface{}) { defer func() {<-ch wgroup.Done() }()val, _ := data.(map[string]string) srcPath := val["src"] dstPath := val["dst"] for IsFileExist(dstPath) { srcMd5, srcSha256, _ := FileHash(srcPath)dstMd5, dstSha256, _ := FileHash(dstPath)if srcMd5 == dstMd5 && srcSha256 == dstSha256 { cn.WarnAdd() fmt.Print("\r"+dstPath, " -> 已存在!")return } else {dstPath = FileRename(dstPath) }}CopyFile(srcPath, dstPath) cn.OkAdd() fmt.Print("\r"+dstPath, " -> 完成!") }(data)}}()}}
}
type CountNum struct {okNum int warnNum int errNum int mutex sync.Mutex
}func NewCountNum() *CountNum {return &CountNum{okNum: 0, warnNum: 0, errNum: 0, mutex: sync.Mutex{}}
}
func (cn *CountNum) OkAdd() {cn.mutex.Lock()defer cn.mutex.Unlock()cn.okNum++
}
func (cn *CountNum) OkGet() int {return cn.okNum
}
func (cn *CountNum) WarnAdd() {cn.mutex.Lock()defer cn.mutex.Unlock()cn.warnNum++
}
func (cn *CountNum) WarnGet() int {return cn.warnNum
}
func (cn *CountNum) ErrAdd() {cn.mutex.Lock()defer cn.mutex.Unlock()cn.errNum++
}
func (cn *CountNum) ErrGet() int {return cn.errNum
}
文件、目录操作代码
package mainimport ("crypto/md5""crypto/sha256""encoding/hex""fmt""io""log""os""path/filepath""strings"
)
func DstPaht(srcPath, srcAbs, dstAbs string) string {return strings.Replace(srcPath, srcAbs, dstAbs, -1)
}
func FileRename(filePath string) string {nameExt := filepath.Ext(filePath) nameSrart := strings.TrimSuffix(filePath, nameExt) return fmt.Sprint(nameSrart, "(1)", nameExt)
}
func FileHash(path string) (string, string, error) {f, err := os.Open(path) if err != nil {return "", "", err}defer f.Close()h5 := md5.New() if _, err := io.Copy(h5, f); err != nil { return "", "", err}h256 := sha256.New() f.Seek(0, 0) if _, err := io.Copy(h256, f); err != nil { return "", "", err}return hex.EncodeToString(h5.Sum(nil)), hex.EncodeToString(h256.Sum(nil)), err
}
func IsFileExist(path string) bool {_, err := os.Stat(path)if err != nil { return false}return true
}
func CopyFile(src, dst string) {rFile, err := os.Open(src) defer rFile.Close()if err != nil {log := "[err-a] " + src + " 空 读取源文件错误"cn.ErrAdd() SaveLog(log)}wFile, err := os.OpenFile(dst, os.O_CREATE|os.O_WRONLY, 0777) defer wFile.Close()if err != nil {log := "[err-b] 空 " + dst + " 创建目标文件错误"cn.ErrAdd() SaveLog(log)}_, err = io.Copy(wFile, rFile) if err != nil {log := "[err-c] " + src + " " + dst + " 复制错误"cn.ErrAdd() SaveLog(log)}
}
func SaveLog(loginfo string) {file, _ := os.OpenFile("run.Log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0777) defer file.Close()logger := log.New(file, "", log.Ldate|log.Ltime)logger.Println(loginfo)logger.SetOutput(os.Stdout)logger.Print("\r" + loginfo)
}
数据队列代码
package mainimport ("sync"
)
type DataNode struct {data interface{} next *DataNode
}
type WorkQueue struct {root *DataNode size int mutex sync.Mutex
}
func NewWorkQueue() *WorkQueue {wq := &WorkQueue{root: nil, size: 0}return wq
}
func (wq *WorkQueue) Add(data interface{}) {wq.mutex.Lock() defer wq.mutex.Unlock() if wq.root == nil { wq.root = new(DataNode) wq.root.data = data } else {dn := new(DataNode) dn.data = data node := wq.root for node.next != nil { node = node.next }node.next = dn }wq.size++
}
func (wq *WorkQueue) Pop() interface{} {wq.mutex.Lock() defer wq.mutex.Unlock() if wq.root == nil { return nil} else {node := wq.root v := node.data wq.root = node.next wq.size-- return v }
}
func (wq *WorkQueue) Size() int {return wq.size
}