内容简介:此文已由作者杨望暑授权网易云社区发布。欢迎访问网易云社区,了解更多网易技术产品运营经验。在服务端查看log会经常使用到tail -f命令实时跟踪文件变化. 那么问题来了, 如果自己写一个同样功能的, 该何处写起呢? 如果你用过ELK里的beats/filebeat的话, 应该知道filebeat做的事情就是监控日志变化, 并把最新数据,按照自定义配置处理后, 发送给ElasticSearch/kafka/... 对, 本文就是想介绍如何自己实现一个简易版filebeat, 只要日志内容发生变化(appen
此文已由作者杨望暑授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
背景
在服务端查看log会经常使用到tail -f命令实时跟踪文件变化. 那么问题来了, 如果自己写一个同样功能的, 该何处写起呢? 如果你用过ELK里的beats/filebeat的话, 应该知道filebeat做的事情就是监控日志变化, 并把最新数据,按照自定义配置处理后, 发送给ElasticSearch/kafka/... 对, 本文就是想介绍如何自己实现一个简易版filebeat, 只要日志内容发生变化(append new line), 能触发一个消息, 实现对这一行数据的预处理, 打印, 接入kafka等动作, 还有一个功能是, 当这个 工具 重启后, 依然能从上次读取的位置开始读.
工具
Golang IDEA
大致流程
具体实现
从流程图中可以看出, 我们需要解决下面几个问题
-
记录上一次程序关闭前,文件读取位置,下次程序启动时候加载这个位置信息.
-
文件定位并按行读取, 并发布读取的行
-
监测文件内容变化,并发出通知
记录上次读取位置
这个问题关键应该是什么时候记录上次读取的offset.
-
读取并发布后记录 如果发布后,做记录前,程序挂了,那么重启程序后,这行数据会重新被读一次.
-
读取后马上记录,记录成功后,才对外发布. 这样会产生另一个问题, 发布前程序挂了, 重启后, 那条未必发送的消息,外部是拿不到了.
如果没理解错, elastic的filebeat选的就是第一种,且没做相应的异常处理, 他是设置一个channel池, 接收并异步写入位置信息, 如果写入失败, 则打印一条error日志就继续走了
logp.Err("Writing of registry returned error: %v. Continuing...", err)复制代码
文件定位并按行读取, 并发布读取的行
要读取一个文件, 首先要有一个reader
func (tail *Tailf) openReader() { tail.file, _ = os.Open(tail.FileName) tail.reader = bufio.NewReader(tail.file) }复制代码
对于从文件位置(offset)=0处开始读一行, 这没什么问题, 直接用下面这个方法就可以了.
func (tail *Tailf) readLine() (string, error) { line, err := tail.reader.ReadString('\n') if err != nil { return line, err } line = strings.TrimRight(line, "\n") return line, err }复制代码
但是, 对于文件内容增加了, 但是还没到一行,也就是没出现\n 却出现了EOF(end of file), 那这个情况下, 我们是要等待的,offset必须保持在这一行的行头.
func (tail *Tailf) getOffset() (offset int64, err error) { offset, err = tail.file.Seek(0, os.SEEK_CUR) offset -= int64(tail.reader.Buffered()) return}func (tail *Tailf) beginWatch() { tail.openReader() var offset int64 for { //取上一次读取位置(行头) offset, _ = tail.getOffset() line, err := tail.readLine() if err == nil { tail.publishLine(line) } else if err == io.EOF { //读到了EOF, offset设置回到行头 tail.seekTo(Seek{offset: offset, whence: 0}) //block and wait for changes tail.waitChangeEvent() } else { fmt.Println(err) return } } }func (tail *Tailf) seekTo(pos Seek) error { tail.file.Seek(pos.offset, pos.whence) //一旦改变了offset, 这个reader必须reset一下才能生效 tail.reader.Reset(tail.file) return nil}// 这里是发布一个消息, 因为是demo,所以只是简单的往channel里一扔func (tail *Tailf) publishLine(line string) { tail.Lines <- line }复制代码
下面说说waitChangeEvent
如何监视文件内容变化,并通知
监测文件内容增加的方式大体有2种
-
监测文件最后修改时间以及文件大小的变化,俗称poll--轮询
-
利用 linux 的inotify命令实现监测,他会在文件发生状态改变后触发事件
这里采用第一种方式, filebeat也用的第一种. 我们自己怎么实现呢?
//currReadPos: 文件末尾的offset,也就是当前文件大小func (w *PollWatcher) ChangeEvent(currReadPos int64) (*ChangeEvent, error) { watchingFile, err := os.Stat(w.FileName) if err != nil { return nil, err } changes := NewChangeEvent() //当前的大小 w.FileSize = currReadPos //之前的修改时间 previousModTime := watchingFile.ModTime() //轮询 go func() { previousSize := w.FileSize for { time.Sleep(POLL_DURATION) //这里省略很多代码, 假设文件是存在的,且没被重命名,删除之类的情况, 文件是像日志文件一样不断append的 file, _ := os.Stat(w.FileName) // ... 省略一大段代码 if previousSize > 0 && previousSize < w.FileSize { //文件肥了 changes.NotifyModified() previousSize = w.FileSize continue } previousSize = w.FileSize // 处理 原本没内容, 但是加入了内容, 所以要用修改时间 modTime := file.ModTime() if modTime != previousModTime { previousModTime = modTime changes.NotifyModified() } } }() return changes, nil}复制代码
这里的changes.NotifyModified方法只是往下面实例里Modified Channel 放入 ce.Modified <- true
type ChangeEvent struct { Modified chan bool Truncated chan bool Deleted chan bool}复制代码
也正是这个动作, 在主线程中, 就能收到文件被修改的通知, 从而继续出发readLine动作
// 上面有个beginWatch方法代码,结合这个代码来看func (tail *Tailf) waitChangeEvent() error { // ... 省略初始化动作 select { //只测试文件内容增加 case <-tail.changes.Modified: fmt.Println(">> find Modified") return nil // ... 省略其他 } }复制代码
有了这个一连串的代码后, 我们就能在main里监视文件变化了
func main() { t, _ := tailf.NewTailf("/Users/yws/Desktop/test.log") for line := range t.Lines { //这里会block住,有新行到来,就会输出新行 fmt.Println(line) } }复制代码
扩展点
这个扩展点, 和filebeat一样.
-
在读取时候, 不一定是按行读取,可以读多行,json解析等
-
发布时候, 本文例子是直接写console, 其实可以接kafka, redis, 数据库等
-
.... 想不出来了
总结
虽然是一个很简单的功能, 现代主流服务端编程语言基本都能实现, 但为什么用go来实现呢? 一大堆优点和缺点就不列了..这不是软文. 谈谈go初学者的看法
-
代码很简洁, 虽然不支持很多高级语言特性, 但看起来依然那么爽, 除了那些过渡包装的struct以及怪异的取名.
-
写并发(goroutine)是那么的简单,那么的优雅,但也很容易被我这样的菜鸟滥用, 这语言debug目前有点肉痛
-
goroutine通信也是那么的简单, channel设计的很棒, 用着很爽
-
不爽的地方, 多返回值的问题, 写惯了 java 的xinstance.method(yInstance.method()), 当yInstance.method()是多返回值的时候,必须拆分成2行或更多, 每次编译器报错时候就想砸键盘.
参考资料
-
github.com/elastic/bea… filebeat只是其中一个feature
-
github.com/hpcloud/tai… 写到一半发现原来别人也干过一样的事了, 代码基本大同小异, 有兴趣的可以看他的代码, 写的更完善.
网易云免费体验馆,0成本体验20+款云产品!
更多网易技术、产品、运营经验分享请点击。
相关文章:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 关于indexOf的发散思维
- 以案例的形式,来分析用户思维、产品思维和工程思维
- 工程思维与产品思维
- Callback ——从同步思维切换到异步思维
- mybatis思维导图,让mybatis不再难懂(二) - java思维导图
- 百度发布智能城市“ACE王牌计划” 李彦宏要用AI思维吊打互联网思维
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Text Algorithms
Maxime Crochemore、Wojciech Rytter / Oxford University Press / 1994
This much-needed book on the design of algorithms and data structures for text processing emphasizes both theoretical foundations and practical applications. It is intended to serve both as a textbook......一起来看看 《Text Algorithms》 这本书的介绍吧!