[译]go styleguide

回复

Xargin 发起了问题 • 1 人关注 • 0 个回复 • 294 次浏览 • 2017-08-04 17:39 • 来自相关话题

NoPoint Docker — 云时代的程序分发方式

Julyqi 发表了文章 • 0 个评论 • 195 次浏览 • 2017-07-28 16:17 • 来自相关话题

要说最近一年云计算业界有什么大事件?Google Compute Engine 的正式发布?Azure入华?还是AWS落地中国?留在每个人大脑中的印象可能各不相同,但要是让笔者来排名的话那么Docker绝对应该算是第一位的。如果你之前听说过它的话,那么也... 查看全部

要说最近一年云计算业界有什么大事件?Google Compute Engine 的正式发布?Azure入华?还是AWS落地中国?留在每个人大脑中的印象可能各不相同,但要是让笔者来排名的话那么Docker绝对应该算是第一位的。如果你之前听说过它的话,那么也许你会说“没错,就是它”,因为几乎世界各地的开发、运维都在谈论着Docker;如果你还没听说过Docker,那么我真的建议你花上5分钟来阅读本文。


https://community.clouderwork.com/article/view/59633469d009d.html


总之笔者认为Docker还是非常有趣的一个东西,值得大家花些时间体验一下,相信在各位的工作中多多少少都能用的上Docker。

GOLANG中DEFER, PANIC, RECOVER用法

回复

haohongfan 发起了问题 • 1 人关注 • 0 个回复 • 286 次浏览 • 2017-07-21 11:37 • 来自相关话题

beego & bee 1.9.0 released

astaxie 发表了文章 • 0 个评论 • 474 次浏览 • 2017-07-19 01:07 • 来自相关话题

beego:

  1. Fix the new repo address for casbin #2654
  2. Fix cache/memory fatal error: concurrent map iteration a... 查看全部

beego:



  1. Fix the new repo address for casbin #2654

  2. Fix cache/memory fatal error: concurrent map iteration and map write #2726

  3. AddAPPStartHook func modify #2724

  4. Fix panic: sync: negative WaitGroup counter #2717

  5. incorrect error rendering (wrong status) #2712

  6. validation: support int64 int32 int16 and int8 type #2728

  7. validation: support required option for some struct tag valids #2741

  8. Fix big form parse issue #2725

  9. File log add RotatePerm #2683

  10. Fix Oracle placehold #2749

  11. Supported gzip for req.Header has Content-Encoding: gzip #2754

  12. Add new Database Migrations #2744

  13. Beego auto generate sort ControllerComments #2766

  14. added statusCode and pattern to FilterMonitorFunc #2692

  15. fix the bugs in the "ParseBool" function in the file of config.go #2740


Bee



  1. Added MySQL year data type #443

  2. support multiple http methods #445

  3. The DDL migration can now be generated by adding a -ddl and a proper "alter" or "create" as argument value. #455

  4. Fix: docs generator skips everything containing 'vendor' #454

  5. get these tables information in custom the option #441

  6. read ref(pk) #444

  7. Add command bee server to server static folder.


https://github.com/astaxie/beego/pull/2771

给httprouter添加pprof功能

narutoinfo 发表了文章 • 0 个评论 • 187 次浏览 • 2017-07-18 19:40 • 来自相关话题

给httprouter添加pprof

1:获取包

go get github.com/feixiao/htt... 			查看全部
					

给httprouter添加pprof


1:获取包


go get github.com/feixiao/httpprof

2:进入所在目录,获取依赖


 govendor sync

3:编译运行example


go build

4:外部项目添加使用,只需要参考example的使用即可


func Index(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
fmt.Fprint(w, "Welcome!\n")
}

func Hello(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
fmt.Fprintf(w, "hello, %s!\n", ps.ByName("name"))
}

func main() {
router := httprouter.New()

// 在原来的httprouter的使用基础只是添加了这一句代码
router = httpprof.WrapRouter(router)

router.GET("/", Index)
router.GET("/hello/:name", Hello)

log.Fatal(http.ListenAndServe(":8080", router))
}

udp编程的那些事与golang udp的实践

sheepbao 发表了文章 • 0 个评论 • 1039 次浏览 • 2017-06-26 10:30 • 来自相关话题

udp编程的那些事与golang udp的实践

tcp/ip大协议中,tcp编程大家应该比较熟,应用的场景也很多,但是udp在现实中,应用也不少,而在大部分博文中,都很少对udp的编程进行研究,最近研究了一下udp编程,正好做个记录。<... 查看全部

udp编程的那些事与golang udp的实践


tcp/ip大协议中,tcp编程大家应该比较熟,应用的场景也很多,但是udp在现实中,应用也不少,而在大部分博文中,都很少对udp的编程进行研究,最近研究了一下udp编程,正好做个记录。

sheepbao 2017.06.15


tcp Vs udp


tcp和udp都是著名的传输协议,他们都是基于ip协议,都在OSI模型中传输层。tcp我们都很清楚,它提供了可靠的数据传输,而udp我们也知道,它不提供数据传输的可靠性,只是尽力传输。
他们的特性决定了它们很大的不同,tcp提供可靠性传输,有三次握手,4次分手,相当于具有逻辑上的连接,可以知道这个tcp连接的状态,所以我们都说tcp是面向连接的socket,而udp没有握手,没有分手,也不存在逻辑上的连接,所以我们也都说udp是非面向连接的socket。

我们都畏惧不知道状态的东西,所以即使tcp的协议比udp复杂很多,但对于系统应用层的编程来说,tcp编程其实比udp编程容易。而udp相对比较灵活,所以对于udp编程反而没那么容易,但其实掌握后udp编程也并不难。


udp协议


udp的首部


        2               2       (byte)
+---+---+---+---+---+---+---+---+ -
| src port | dst port | |
+---+---+---+---+---+---+---+---+ 8(bytes)
| length | check sum | |
+---+---+---+---+---+---+---+---+ -
| |
+ data +
| |
+---+---+---+---+---+---+---+---+

udp的首部真的很简单,头2个字节表示的是原端口,后2个字节表示的是目的端口,端口是系统层区分进程的标识。接着是udp长度,最后就是校验和,这个其实很重要,现在的系统都是默认开启udp校验和的,所以我们才能确保udp消息传输的完整性。如果这个校验和关闭了,那会让我们绝对会很忧伤,因为udp不仅不能保证数据一定到达,还不能保证即使数据到了,这个数据是否是正确的。比如:我在发送端发送了“hello”,而接收端却接收到了“hell”。如果真的是这样,我们就必须自己去校验数据的正确性。还好udp默认开发了校验,我们可以保证udp的数据完整性。


udp数据的封装


                                    +---------+
| 应用数据 |
+---------+
| |
v v
+---------+---------+
| udp首部 | 应用数据 |
+---------+---------+
| |
v UDP数据报 v
+---------+---------+---------+
| ip首部 | udp首部 | 应用数据 |
+---------+---------+---------+
| |
v IP数据报 v
+---------+---------+---------+---------+---------+
|以太网首部 | ip首部 | udp首部 | 应用数据 |以太网尾部 |
+---------+---------+---------+---------+---------+
| 14 20 8 4 |
| -> 以太网帧 <- |

数据的封装和tcp是一样,应用层的数据加上udp首部,构成udp数据报,再加上ip首部构成ip数据报,最后加上以太网首部和尾部构成以太网帧,经过网卡发送出去。


Golang udp实践


实践出真知,编程就需要多实践,才能体会其中的奥妙。


echo客户端和服务端


echo服务,实现数据包的回显,这是很多人网络编程起点,因为这个服务足够简单,但又把网络的数据流都过了一遍,这里也用go udp实现一个echo服务。

实现客户端发送一个“hello”,服务端接收消息并原封不动的返回给客户度。


server.go


package main

import (
"flag"
"fmt"
"log"
"net"
)

var addr = flag.String("addr", ":10000", "udp server bing address")

func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
flag.Parse()
}

func main() {
//Resolving address
udpAddr, err := net.ResolveUDPAddr("udp", *addr)
if err != nil {
log.Fatalln("Error: ", err)
}

// Build listining connections
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
log.Fatalln("Error: ", err)
}
defer conn.Close()

// Interacting with one client at a time
recvBuff := make([]byte, 1024)
for {
log.Println("Ready to receive packets!")
// Receiving a message
rn, rmAddr, err := conn.ReadFromUDP(recvBuff)
if err != nil {
log.Println("Error:", err)
return
}

fmt.Printf("<<< Packet received from: %s, data: %s\n", rmAddr.String(), string(recvBuff[:rn]))
// Sending the same message back to current client
_, err = conn.WriteToUDP(recvBuff[:rn], rmAddr)
if err != nil {
log.Println("Error:", err)
return
}
fmt.Println(">>> Sent packet to: ", rmAddr.String())
}
}

client1.go


package main

import (
"flag"
"fmt"
"log"
"net"
)

var raddr = flag.String("raddr", "127.0.0.1:10000", "remote server address")

func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
flag.Parse()
}

func main() {
// Resolving Address
remoteAddr, err := net.ResolveUDPAddr("udp", *raddr)
if err != nil {
log.Fatalln("Error: ", err)
}

// Make a connection
tmpAddr := &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 0,
}

conn, err := net.DialUDP("udp", tmpAddr, remoteAddr)
// Exit if some error occured
if err != nil {
log.Fatalln("Error: ", err)
}
defer conn.Close()

// write a message to server
_, err = conn.Write([]byte("hello"))
if err != nil {
log.Println(err)
} else {
fmt.Println(">>> Packet sent to: ", *raddr)
}

// Receive response from server
buf := make([]byte, 1024)
rn, rmAddr, err := conn.ReadFromUDP(buf)
if err != nil {
log.Println(err)
} else {
fmt.Printf("<<< %d bytes received from: %v, data: %s\n", rn, rmAddr, string(buf[:rn]))
}
}

client2.go


package main

import (
"flag"
"fmt"
"log"
"net"
)

var (
laddr = flag.String("laddr", "127.0.0.1:9000", "local server address")
raddr = flag.String("raddr", "127.0.0.1:10000", "remote server address")
)

func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
flag.Parse()
}

func main() {
// Resolving Address
localAddr, err := net.ResolveUDPAddr("udp", *laddr)
if err != nil {
log.Fatalln("Error: ", err)
}

remoteAddr, err := net.ResolveUDPAddr("udp", *raddr)
if err != nil {
log.Fatalln("Error: ", err)
}

// Build listening connections
conn, err := net.ListenUDP("udp", localAddr)
// Exit if some error occured
if err != nil {
log.Fatalln("Error: ", err)
}
defer conn.Close()

// write a message to server
_, err = conn.WriteToUDP([]byte("hello"), remoteAddr)
if err != nil {
log.Println(err)
} else {
fmt.Println(">>> Packet sent to: ", *raddr)
}

// Receive response from server
buf := make([]byte, 1024)
rn, remAddr, err := conn.ReadFromUDP(buf)
if err != nil {
log.Println(err)
} else {
fmt.Printf("<<< %d bytes received from: %v, data: %s\n", rn, remAddr, string(buf[:rn]))
}
}

这里实现echo的服务端和客户端,和tcp的差不多,但是有一些小细节需要注意。

对于server端,先net.ListenUDP建立udp一个监听,返回一个udp连接,这里需要注意udp不像tcp,建立tcp监听后返回的是一个Listener,然后阻塞等待接收一个新的连接,这样区别是因为udp一个非面向连接的协议,它没有会话管理。同时也因为udp是非面向连接的协议,当接收到消息后,想把消息返回给当前的客户端时,是不能像tcp一样,直接往conn里写的,而是需要指定远端地址。

对于client端,类似tcp先Dial,返回一个连接,对于发送消息用Write,接收消息用Read,当然udp也可以用ReadFromUDP,这样可以知道从哪得到的消息。但其实client也可以用另一种方式写,如client2.go程序,先建立一个监听,返回一个连接,用这个连接发送消息给服务端和从服务器接收消息,这种方式和tcp倒是有很大的不同。


参考


golang pkg

GoCN每日新闻(2017-06-14)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 497 次浏览 • 2017-06-14 10:58 • 来自相关话题

kcp-go源码解析

sheepbao 发表了文章 • 0 个评论 • 1521 次浏览 • 2017-06-12 15:24 • 来自相关话题

kcp-go源码解析

对kcp-go的源码解析,有错误之处,请一定告之。
sheepbao 2017.0612

概念

ARQ:自动重传请求(Automatic Repeat-reQuest,... 查看全部

kcp-go源码解析


对kcp-go的源码解析,有错误之处,请一定告之。

sheepbao 2017.0612


概念


ARQ:自动重传请求(Automatic Repeat-reQuest,ARQ)是OSI模型中数据链路层的错误纠正协议之一.

RTO:Retransmission TimeOut

FEC:Forward Error Correction


kcp简介


kcp是一个基于udp实现快速、可靠、向前纠错的的协议,能以比TCP浪费10%-20%的带宽的代价,换取平均延迟降低30%-40%,且最大延迟降低三倍的传输效果。纯算法实现,并不负责底层协议(如UDP)的收发。查看官方文档kcp


kcp-go是用go实现了kcp协议的一个库,其实kcp类似tcp,协议的实现也很多参考tcp协议的实现,滑动窗口,快速重传,选择性重传,慢启动等。

kcp和tcp一样,也分客户端和监听端。


    +-+-+-+-+-+            +-+-+-+-+-+
| Client | | Server |
+-+-+-+-+-+ +-+-+-+-+-+
|------ kcp data ------>|
|<----- kcp data -------|

kcp协议


layer model


+----------------------+
| Session |
+----------------------+
| KCP(ARQ) |
+----------------------+
| FEC(OPTIONAL) |
+----------------------+
| CRYPTO(OPTIONAL)|
+----------------------+
| UDP(Packet) |
+----------------------+

KCP header


KCP Header Format


      4           1   1     2 (Byte)
+---+---+---+---+---+---+---+---+
| conv |cmd|frg| wnd |
+---+---+---+---+---+---+---+---+
| ts | sn |
+---+---+---+---+---+---+---+---+
| una | len |
+---+---+---+---+---+---+---+---+
| |
+ DATA +
| |
+---+---+---+---+---+---+---+---+

代码结构


src/vendor/github.com/xtaci/kcp-go/
├── LICENSE
├── README.md
├── crypt.go 加解密实现
├── crypt_test.go
├── donate.png
├── fec.go 向前纠错实现
├── frame.png
├── kcp-go.png
├── kcp.go kcp协议实现
├── kcp_test.go
├── sess.go 会话管理实现
├── sess_test.go
├── snmp.go 数据统计实现
├── updater.go 任务调度实现
├── xor.go xor封装
└── xor_test.go

着重研究两个文件kcp.gosess.go


kcp浅析


kcp是基于udp实现的,所有udp的实现这里不做介绍,kcp做的事情就是怎么封装udp的数据和怎么解析udp的数据,再加各种处理机制,为了重传,拥塞控制,纠错等。下面介绍kcp客户端和服务端整体实现的流程,只是大概介绍一下函数流,不做详细解析,详细解析看后面数据流的解析。


kcp client整体函数流


和tcp一样,kcp要连接服务端需要先拨号,但是和tcp有个很大的不同是,即使服务端没有启动,客户端一样可以拨号成功,因为实际上这里的拨号没有发送任何信息,而tcp在这里需要三次握手。


DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)
V
net.DialUDP("udp", nil, udpaddr)
V
NewConn()
V
newUDPSession() {初始化UDPSession}
V
NewKCP() {初始化kcp}
V
updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
V
go sess.readLoop()
V
go s.receiver(chPacket)
V
s.kcpInput(data)
V
s.fecDecoder.decodeBytes(data)
V
s.kcp.Input(data, true, s.ackNoDelay)
V
kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
V
notifyReadEvent()

客户端大体的流程如上面所示,先Dial,建立udp连接,将这个连接封装成一个会话,然后启动一个go程,接收udp的消息。


kcp server整体函数流


ListenWithOptions() 
V
net.ListenUDP()
V
ServerConn()
V
newFECDecoder()
V
go l.monitor() {从chPacket接收udp数据,写入kcp}
V
go l.receiver(chPacket) {从upd接收数据,并入队列}
V
newUDPSession()
V
updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
V
s.kcpInput(data)`
V
s.fecDecoder.decodeBytes(data)
V
s.kcp.Input(data, true, s.ackNoDelay)
V
kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
V
notifyReadEvent()

服务端的大体流程如上图所示,先Listen,启动udp监听,接着用一个go程监控udp的数据包,负责将不同session的数据写入不同的udp连接,然后解析封装将数据交给上层。


kcp 数据流详细解析


不管是kcp的客户端还是服务端,他们都有io行为,就是读与写,我们只分析一个就好了,因为它们读写的实现是一样的,这里分析客户端的读与写。


kcp client 发送消息


s.Write(b []byte) 
V
s.kcp.WaitSnd() {}
V
s.kcp.Send(b) {将数据根据mss分段,并存在kcp.snd_queue}
V
s.kcp.flush(false) [flush data to output] {
if writeDelay==true {
flush
}else{
每隔`interval`时间flush一次
}
}
V
kcp.output(buffer, size)
V
s.output(buf)
V
s.conn.WriteTo(ext, s.remote)
V
s.conn..Conn.WriteTo(buf)

读写都是在sess.go文件中实现的,Write方法:


// Write implements net.Conn
func (s *UDPSession) Write(b []byte) (n int, err error) {
for {
...

// api flow control
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
n = len(b)
for {
if len(b) <= int(s.kcp.mss) {
s.kcp.Send(b)
break
} else {
s.kcp.Send(b[:s.kcp.mss])
b = b[s.kcp.mss:]
}
}

if !s.writeDelay {
s.kcp.flush(false)
}
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
return n, nil
}

...
// wait for write event or timeout
select {
case <-s.chWriteEvent:
case <-c:
case <-s.die:
}

if timeout != nil {
timeout.Stop()
}
}
}

假设发送一个hello消息,Write方法会先判断发送窗口是否已满,满的话该函数阻塞,不满则kcp.Send("hello"),而Send函数实现根据mss的值对数据分段,当然这里的发送的hello,长度太短,只分了一个段,并把它们插入发送的队列里。


func (kcp *KCP) Send(buffer []byte) int {
...
for i := 0; i < count; i++ {
var size int
if len(buffer) > int(kcp.mss) {
size = int(kcp.mss)
} else {
size = len(buffer)
}
seg := kcp.newSegment(size)
copy(seg.data, buffer[:size])
if kcp.stream == 0 { // message mode
seg.frg = uint8(count - i - 1)
} else { // stream mode
seg.frg = 0
}
kcp.snd_queue = append(kcp.snd_queue, seg)
buffer = buffer[size:]
}
return 0
}

接着判断参数writeDelay,如果参数设置为false,则立马发送消息,否则需要任务调度后才会触发发送,发送消息是由flush函数实现的。


// flush pending data
func (kcp *KCP) flush(ackOnly bool) {
var seg Segment
seg.conv = kcp.conv
seg.cmd = IKCP_CMD_ACK
seg.wnd = kcp.wnd_unused()
seg.una = kcp.rcv_nxt

buffer := kcp.buffer
// flush acknowledges
ptr := buffer
for i, ack := range kcp.acklist {
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
// filter jitters caused by bufferbloat
if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
seg.sn, seg.ts = ack.sn, ack.ts
ptr = seg.encode(ptr)

}
}
kcp.acklist = kcp.acklist[0:0]

if ackOnly { // flash remain ack segments
size := len(buffer) - len(ptr)
if size > 0 {
kcp.output(buffer, size)
}
return
}

// probe window size (if remote window size equals zero)
if kcp.rmt_wnd == 0 {
current := currentMs()
if kcp.probe_wait == 0 {
kcp.probe_wait = IKCP_PROBE_INIT
kcp.ts_probe = current + kcp.probe_wait
} else {
if _itimediff(current, kcp.ts_probe) >= 0 {
if kcp.probe_wait < IKCP_PROBE_INIT {
kcp.probe_wait = IKCP_PROBE_INIT
}
kcp.probe_wait += kcp.probe_wait / 2
if kcp.probe_wait > IKCP_PROBE_LIMIT {
kcp.probe_wait = IKCP_PROBE_LIMIT
}
kcp.ts_probe = current + kcp.probe_wait
kcp.probe |= IKCP_ASK_SEND
}
}
} else {
kcp.ts_probe = 0
kcp.probe_wait = 0
}

// flush window probing commands
if (kcp.probe & IKCP_ASK_SEND) != 0 {
seg.cmd = IKCP_CMD_WASK
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
ptr = seg.encode(ptr)
}

// flush window probing commands
if (kcp.probe & IKCP_ASK_TELL) != 0 {
seg.cmd = IKCP_CMD_WINS
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
ptr = seg.encode(ptr)
}

kcp.probe = 0

// calculate window size
cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
if kcp.nocwnd == 0 {
cwnd = _imin_(kcp.cwnd, cwnd)
}

// sliding window, controlled by snd_nxt && sna_una+cwnd
newSegsCount := 0
for k := range kcp.snd_queue {
if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
break
}
newseg := kcp.snd_queue[k]
newseg.conv = kcp.conv
newseg.cmd = IKCP_CMD_PUSH
newseg.sn = kcp.snd_nxt
kcp.snd_buf = append(kcp.snd_buf, newseg)
kcp.snd_nxt++
newSegsCount++
kcp.snd_queue[k].data = nil
}
if newSegsCount > 0 {
kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
}

// calculate resent
resent := uint32(kcp.fastresend)
if kcp.fastresend <= 0 {
resent = 0xffffffff
}

// check for retransmissions
current := currentMs()
var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
for k := range kcp.snd_buf {
segment := &kcp.snd_buf[k]
needsend := false
if segment.xmit == 0 { // initial transmit
needsend = true
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
} else if _itimediff(current, segment.resendts) >= 0 { // RTO
needsend = true
if kcp.nodelay == 0 {
segment.rto += kcp.rx_rto
} else {
segment.rto += kcp.rx_rto / 2
}
segment.resendts = current + segment.rto
lost++
lostSegs++
} else if segment.fastack >= resent { // fast retransmit
needsend = true
segment.fastack = 0
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
change++
fastRetransSegs++
} else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
needsend = true
segment.fastack = 0
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
change++
earlyRetransSegs++
}

if needsend {
segment.xmit++
segment.ts = current
segment.wnd = seg.wnd
segment.una = seg.una

size := len(buffer) - len(ptr)
need := IKCP_OVERHEAD + len(segment.data)

if size+need > int(kcp.mtu) {
kcp.output(buffer, size)
current = currentMs() // time update for a blocking call
ptr = buffer
}

ptr = segment.encode(ptr)
copy(ptr, segment.data)
ptr = ptr[len(segment.data):]

if segment.xmit >= kcp.dead_link {
kcp.state = 0xFFFFFFFF
}
}
}

// flash remain segments
size := len(buffer) - len(ptr)
if size > 0 {
kcp.output(buffer, size)
}

// counter updates
sum := lostSegs
if lostSegs > 0 {
atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
}
if fastRetransSegs > 0 {
atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
sum += fastRetransSegs
}
if earlyRetransSegs > 0 {
atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
sum += earlyRetransSegs
}
if sum > 0 {
atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
}

// update ssthresh
// rate halving, https://tools.ietf.org/html/rfc6937
if change > 0 {
inflight := kcp.snd_nxt - kcp.snd_una
kcp.ssthresh = inflight / 2
if kcp.ssthresh < IKCP_THRESH_MIN {
kcp.ssthresh = IKCP_THRESH_MIN
}
kcp.cwnd = kcp.ssthresh + resent
kcp.incr = kcp.cwnd * kcp.mss
}

// congestion control, https://tools.ietf.org/html/rfc5681
if lost > 0 {
kcp.ssthresh = cwnd / 2
if kcp.ssthresh < IKCP_THRESH_MIN {
kcp.ssthresh = IKCP_THRESH_MIN
}
kcp.cwnd = 1
kcp.incr = kcp.mss
}

if kcp.cwnd < 1 {
kcp.cwnd = 1
kcp.incr = kcp.mss
}
}

flush函数非常的重要,kcp的重要参数都是在调节这个函数的行为,这个函数只有一个参数ackOnly,意思就是只发送ack,如果ackOnly为true的话,该函数只遍历ack列表,然后发送,就完事了。
如果不是,也会发送真实数据。
在发送数据前先进行windSize探测,如果开启了拥塞控制nc=0,则每次发送前检测服务端的winsize,如果服务端的winsize变小了,自身的winsize也要更着变小,来避免拥塞。如果没有开启拥塞控制,就按设置的winsize进行数据发送。

接着循环每个段数据,并判断每个段数据的是否该重发,还有什么时候重发:



  1. 如果这个段数据首次发送,则直接发送数据。

  2. 如果这个段数据的当前时间大于它自身重发的时间,也就是RTO,则重传消息。

  3. 如果这个段数据的ack丢失累计超过resent次数,则重传,也就是快速重传机制。这个resent参数由resend参数决定。

  4. 如果这个段数据的ack有丢失且没有新的数据段,则触发ER,ER相关信息ER


最后通过kcp.output发送消息hello,output是个回调函数,函数的实体是sess.go的:


func (s *UDPSession) output(buf []byte) {
var ecc [][]byte

// extend buf's header space
ext := buf
if s.headerSize > 0 {
ext = s.ext[:s.headerSize+len(buf)]
copy(ext[s.headerSize:], buf)
}

// FEC stage
if s.fecEncoder != nil {
ecc = s.fecEncoder.Encode(ext)
}

// encryption stage
if s.block != nil {
io.ReadFull(rand.Reader, ext[:nonceSize])
checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
s.block.Encrypt(ext, ext)

if ecc != nil {
for k := range ecc {
io.ReadFull(rand.Reader, ecc[k][:nonceSize])
checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
s.block.Encrypt(ecc[k], ecc[k])
}
}
}

// WriteTo kernel
nbytes := 0
npkts := 0
// if mrand.Intn(100) < 50 {
for i := 0; i < s.dup+1; i++ {
if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
nbytes += n
npkts++
}
}
// }

if ecc != nil {
for k := range ecc {
if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
nbytes += n
npkts++
}
}
}
atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
}

output函数才是真正的将数据写入内核中,在写入之前先进行了fec编码,fec编码器的实现是用了一个开源库github.com/klauspost/reedsolomon,编码以后的hello就不是和原来的hello一样了,至少多了几个字节。
fec编码器有两个重要的参数reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShardsparityShards,这两个参数决定了fec的冗余度,冗余度越大抗丢包性就越强。


kcp的任务调度器


其实这里任务调度器是一个很简单的实现,用一个全局变量updater来管理session,代码文件为updater.go。其中最主要的函数


func (h *updateHeap) updateTask() {
var timer <-chan time.Time
for {
select {
case <-timer:
case <-h.chWakeUp:
}

h.mu.Lock()
hlen := h.Len()
now := time.Now()
if hlen > 0 && now.After(h.entries[0].ts) {
for i := 0; i < hlen; i++ {
entry := heap.Pop(h).(entry)
if now.After(entry.ts) {
entry.ts = now.Add(entry.s.update())
heap.Push(h, entry)
} else {
heap.Push(h, entry)
break
}
}
}
if hlen > 0 {
timer = time.After(h.entries[0].ts.Sub(now))
}
h.mu.Unlock()
}
}

任务调度器实现了一个堆结构,每当有新的连接,session都会插入到这个堆里,接着for循环每隔interval时间,遍历这个堆,得到entry然后执行entry.s.update()。而entry.s.update()会执行s.kcp.flush(false)来发送数据。


总结


这里简单介绍了kcp的整体流程,详细介绍了发送数据的流程,但未介绍kcp接收数据的流程,其实在客户端发送数据后,服务端是需要返回ack的,而客户端也需要根据返回的ack来判断数据段是否需要重传还是在队列里清除该数据段。处理返回来的ack是在函数kcp.Input()函数实现的。具体详细流程下次再介绍。

GoCN每日新闻(2017-06-12)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 478 次浏览 • 2017-06-12 10:25 • 来自相关话题

Golang逃逸分析

sheepbao 发表了文章 • 0 个评论 • 2294 次浏览 • 2017-06-11 11:56 • 来自相关话题

Golang逃逸分析

介绍逃逸分析的概念,go怎么开启逃逸分析的log。
以下资料来自互联网,有错误之处,请一定告之。
sheepbao 2017.06.10

什么是逃逸分析

w... 查看全部

Golang逃逸分析


介绍逃逸分析的概念,go怎么开启逃逸分析的log。

以下资料来自互联网,有错误之处,请一定告之。

sheepbao 2017.06.10


什么是逃逸分析


wiki上的定义


In compiler optimization, escape analysis is a method for determining the dynamic scope of pointers - where in the program a pointer can be accessed. It is related to pointer analysis and shape analysis.


When a variable (or an object) is allocated in a subroutine, a pointer to the variable can escape to other threads of execution, or to calling subroutines. If an implementation uses tail call optimization (usually required for functional languages), objects may also be seen as escaping to called subroutines. If a language supports first-class continuations (as do Scheme and Standard ML of New Jersey), portions of the call stack may also escape.


If a subroutine allocates an object and returns a pointer to it, the object can be accessed from undetermined places in the program — the pointer has "escaped". Pointers can also escape if they are stored in global variables or other data structures that, in turn, escape the current procedure.


Escape analysis determines all the places where a pointer can be stored and whether the lifetime of the pointer can be proven to be restricted only to the current procedure and/or threa


大概的意思是在编译程序优化理论中,逃逸分析是一种确定指针动态范围的方法,可以分析在程序的哪些地方可以访问到指针。它涉及到指针分析和形状分析。
当一个变量(或对象)在子程序中被分配时,一个指向变量的指针可能逃逸到其它执行线程中,或者去调用子程序。如果使用尾递归优化(通常在函数编程语言中是需要的),对象也可能逃逸到被调用的子程序中。
如果一个子程序分配一个对象并返回一个该对象的指针,该对象可能在程序中的任何一个地方被访问到——这样指针就成功“逃逸”了。如果指针存储在全局变量或者其它数据结构中,它们也可能发生逃逸,这种情况是当前程序中的指针逃逸。
逃逸分析需要确定指针所有可以存储的地方,保证指针的生命周期只在当前进程或线程中。


逃逸分析的用处(为了性能)



  • 最大的好处应该是减少gc的压力,不逃逸的对象分配在栈上,当函数返回时就回收了资源,不需要gc标记清除。

  • 因为逃逸分析完后可以确定哪些变量可以分配在栈上,栈的分配比堆快,性能好

  • 同步消除,如果你定义的对象的方法上有同步锁,但在运行时,却只有一个线程在访问,此时逃逸分析后的机器码,会去掉同步锁运行。


go消除了堆和栈的区别


go在一定程度消除了堆和栈的区别,因为go在编译的时候进行逃逸分析,来决定一个对象放栈上还是放堆上,不逃逸的对象放栈上,可能逃逸的放堆上。


开启go编译时的逃逸分析日志


开启逃逸分析日志很简单,只要在编译的时候加上-gcflags '-m',但是我们为了不让编译时自动内连函数,一般会加-l参数,最终为-gcflags '-m -l'


Example:


package main

import (
"fmt"
)

func main() {
s := "hello"
fmt.Println(s)
}

go run -gcflags '-m -l' escape.go

Output:


# command-line-arguments
escape_analysis/main.go:9: s escapes to heap
escape_analysis/main.go:9: main ... argument does not escape
hello

什么时候逃逸,什么时候不逃逸


Example1:


package main

type S struct{}

func main() {
var x S
y := &x
_ = *identity(y)
}

func identity(z *S) *S {
return z
}

Output:


# command-line-arguments
escape_analysis/main.go:11: leaking param: z to result ~r1 level=0
escape_analysis/main.go:7: main &x does not escape

这里的第一行表示z变量是“流式”,因为identity这个函数仅仅输入一个变量,又将这个变量作为返回输出,但identity并没有引用z,所以这个变量没有逃逸,而x没有被引用,且生命周期也在mian里,x没有逃逸,分配在栈上。


Example2:


package main

type S struct{}

func main() {
var x S
_ = *ref(x)
}

func ref(z S) *S {
return &z
}

Output:


# command-line-arguments
escape_analysis/main.go:11: &z escapes to heap
escape_analysis/main.go:10: moved to heap: z

这里的z是逃逸了,原因很简单,go都是值传递,ref函数copy了x的值,传给z,返回z的指针,然后在函数外被引用,说明z这个变量在函数內声明,可能会被函数外的其他程序访问。所以z逃逸了,分配在堆上


对象里的变量会怎么样呢?看下面


Example3:


package main

type S struct {
M *int
}

func main() {
var i int
refStruct(i)
}

func refStruct(y int) (z S) {
z.M = &y
return z
}

Output:


# command-line-arguments
escape_analysis/main.go:13: &y escapes to heap
escape_analysis/main.go:12: moved to heap: y

看日志的输出,这里的y是逃逸了,看来在struct里好像并没有区别,有可能被函数外的程序访问就会逃逸


Example4:


package main

type S struct {
M *int
}

func main() {
var i int
refStruct(&i)
}

func refStruct(y *int) (z S) {
z.M = y
return z
}

Output:


# command-line-arguments
escape_analysis/main.go:12: leaking param: y to result z level=0
escape_analysis/main.go:9: main &i does not escape

这里的y没有逃逸,分配在栈上,原因和Example1是一样的。


Example5:


package main

type S struct {
M *int
}

func main() {
var x S
var i int
ref(&i, &x)
}

func ref(y *int, z *S) {
z.M = y
}

Output:


# command-line-arguments
escape_analysis/main.go:13: leaking param: y
escape_analysis/main.go:13: ref z does not escape
escape_analysis/main.go:10: &i escapes to heap
escape_analysis/main.go:9: moved to heap: i
escape_analysis/main.go:10: main &x does not escape

这里的z没有逃逸,而i却逃逸了,这是因为go的逃逸分析不知道z和i的关系,逃逸分析不知道参数y是z的一个成员,所以只能把它分配给堆。


参考


Go Escape Analysis Flaws

go-escape-analysis

GoCN每日新闻(2017-06-11)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 456 次浏览 • 2017-06-11 11:16 • 来自相关话题

GoCN每日新闻(2017-06-10)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 459 次浏览 • 2017-06-10 10:50 • 来自相关话题

GoCN每日新闻(2017-06-09)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 547 次浏览 • 2017-06-09 10:16 • 来自相关话题

GoCN每日新闻(2017-06-08)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 546 次浏览 • 2017-06-08 09:36 • 来自相关话题

GoCN每日新闻 (2017-06-07)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 498 次浏览 • 2017-06-07 10:02 • 来自相关话题