• 14.5 通道、超时和计时器(Ticker)

    14.5 通道、超时和计时器(Ticker)

    time 包中有一些有趣的功能可以和通道组合使用。

    其中就包含了 time.Ticker 结构体,这个对象以指定的时间间隔重复的向通道 C 发送时间值:

    1. type Ticker struct {
    2. C <-chan Time // the channel on which the ticks are delivered.
    3. // contains filtered or unexported fields
    4. ...
    5. }

    时间间隔的单位是 ns(纳秒,int64),在工厂函数 time.NewTicker 中以 Duration 类型的参数传入:func Newticker(dur) *Ticker

    在协程周期性的执行一些事情(打印状态日志,输出,计算等等)的时候非常有用。

    调用 Stop() 使计时器停止,在 defer 语句中使用。这些都很好的适应 select 语句:

    1. ticker := time.NewTicker(updateInterval)
    2. defer ticker.Stop()
    3. ...
    4. select {
    5. case u:= <-ch1:
    6. ...
    7. case v:= <-ch2:
    8. ...
    9. case <-ticker.C:
    10. logState(status) // call some logging function logState
    11. default: // no value ready to be received
    12. ...
    13. }

    time.Tick() 函数声明为 Tick(d Duration) <-chan Time,当你想返回一个通道而不必关闭它的时候这个函数非常有用:它以 d 为周期给返回的通道发送时间,d是纳秒数。如果需要像下边的代码一样,限制处理频率(函数 client.Call() 是一个 RPC 调用,这里暂不赘述(参见第 15.9 节):

    1. import "time"
    2. rate_per_sec := 10
    3. var dur Duration = 1e9 / rate_per_sec
    4. chRate := time.Tick(dur) // a tick every 1/10th of a second
    5. for req := range requests {
    6. <- chRate // rate limit our Service.Method RPC calls
    7. go client.Call("Service.Method", req, ...)
    8. }

    这样只会按照指定频率处理请求:chRate 阻塞了更高的频率。每秒处理的频率可以根据机器负载(和/或)资源的情况而增加或减少。

    问题 14.1:扩展上边的代码,思考如何承载周期请求数的暴增(提示:使用带缓冲通道和计时器对象)。

    定时器(Timer)结构体看上去和计时器(Ticker)结构体的确很像(构造为 NewTimer(d Duration)),但是它只发送一次时间,在 Dration d 之后。

    还有 time.After(d) 函数,声明如下:

    1. func After(d Duration) <-chan Time

    Duration d 之后,当前时间被发到返回的通道;所以它和 NewTimer(d).C 是等价的;它类似 Tick(),但是 After() 只发送一次时间。下边有个很具体的示例,很好的阐明了 selectdefault 的作用:

    示例 14.11:timer_goroutine.go:

    1. package main
    2. import (
    3. "fmt"
    4. "time"
    5. )
    6. func main() {
    7. tick := time.Tick(1e8)
    8. boom := time.After(5e8)
    9. for {
    10. select {
    11. case <-tick:
    12. fmt.Println("tick.")
    13. case <-boom:
    14. fmt.Println("BOOM!")
    15. return
    16. default:
    17. fmt.Println(" .")
    18. time.Sleep(5e7)
    19. }
    20. }
    21. }

    输出:

    1. .
    2. .
    3. tick.
    4. .
    5. .
    6. tick.
    7. .
    8. .
    9. tick.
    10. .
    11. .
    12. tick.
    13. .
    14. .
    15. tick.
    16. BOOM!

    习惯用法:简单超时模式

    要从通道 ch 中接收数据,但是最多等待1秒。先创建一个信号通道,然后启动一个 lambda 协程,协程在给通道发送数据之前是休眠的:

    1. timeout := make(chan bool, 1)
    2. go func() {
    3. time.Sleep(1e9) // one second
    4. timeout <- true
    5. }()

    然后使用 select 语句接收 ch 或者 timeout 的数据:如果 ch 在 1 秒内没有收到数据,就选择到了 time 分支并放弃了 ch 的读取。

    1. select {
    2. case <-ch:
    3. // a read from ch has occured
    4. case <-timeout:
    5. // the read from ch has timed out
    6. break
    7. }

    第二种形式:取消耗时很长的同步调用

    也可以使用 time.After() 函数替换 timeout-channel。可以在 select 中通过 time.After() 发送的超时信号来停止协程的执行。以下代码,在 timeoutNs 纳秒后执行 selecttimeout 分支后,执行client.Call 的协程也随之结束,不会给通道 ch 返回值:

    1. ch := make(chan error, 1)
    2. go func() { ch <- client.Call("Service.Method", args, &reply) } ()
    3. select {
    4. case resp := <-ch
    5. // use resp and reply
    6. case <-time.After(timeoutNs):
    7. // call timed out
    8. break
    9. }

    注意缓冲大小设置为 1 是必要的,可以避免协程死锁以及确保超时的通道可以被垃圾回收。此外,需要注意在有多个 case 符合条件时, selectcase 的选择是伪随机的,如果上面的代码稍作修改如下,则 select 语句可能不会在定时器超时信号到来时立刻选中 time.After(timeoutNs) 对应的 case,因此协程可能不会严格按照定时器设置的时间结束。

    1. ch := make(chan int, 1)
    2. go func() { for { ch <- 1 } } ()
    3. L:
    4. for {
    5. select {
    6. case <-ch:
    7. // do something
    8. case <-time.After(timeoutNs):
    9. // call timed out
    10. break L
    11. }
    12. }

    第三种形式:假设程序从多个复制的数据库同时读取。只需要一个答案,需要接收首先到达的答案,Query 函数获取数据库的连接切片并请求。并行请求每一个数据库并返回收到的第一个响应:

    1. func Query(conns []conn, query string) Result {
    2. ch := make(chan Result, 1)
    3. for _, conn := range conns {
    4. go func(c Conn) {
    5. select {
    6. case ch <- c.DoQuery(query):
    7. default:
    8. }
    9. }(conn)
    10. }
    11. return <- ch
    12. }

    再次声明,结果通道 ch 必须是带缓冲的:以保证第一个发送进来的数据有地方可以存放,确保放入的首个数据总会成功,所以第一个到达的值会被获取而与执行的顺序无关。正在执行的协程可以总是可以使用 runtime.Goexit() 来停止。

    在应用中缓存数据:

    应用程序中用到了来自数据库(或者常见的数据存储)的数据时,经常会把数据缓存到内存中,因为从数据库中获取数据的操作代价很高;如果数据库中的值不发生变化就没有问题。但是如果值有变化,我们需要一个机制来周期性的从数据库重新读取这些值:缓存的值就不可用(过期)了,而且我们也不希望用户看到陈旧的数据。