udp编程的那些事与golang udp的实践

sheepbao 发表了文章 • 0 个评论 • 1235 次浏览 • 2017-06-26 10:30 • 来自相关话题

udp编程的那些事与golang udp的实践

tcp/ip大协议中,tcp编程大家应该比较熟,应用的场景也很多,但是udp在现实中,应用也不少,而在大部分博文中,都很少对udp的编程进行研究,最近研究了一下udp编程,正好做个记录。<... 查看全部

udp编程的那些事与golang udp的实践


tcp/ip大协议中,tcp编程大家应该比较熟,应用的场景也很多,但是udp在现实中,应用也不少,而在大部分博文中,都很少对udp的编程进行研究,最近研究了一下udp编程,正好做个记录。

sheepbao 2017.06.15


tcp Vs udp


tcp和udp都是著名的传输协议,他们都是基于ip协议,都在OSI模型中传输层。tcp我们都很清楚,它提供了可靠的数据传输,而udp我们也知道,它不提供数据传输的可靠性,只是尽力传输。
他们的特性决定了它们很大的不同,tcp提供可靠性传输,有三次握手,4次分手,相当于具有逻辑上的连接,可以知道这个tcp连接的状态,所以我们都说tcp是面向连接的socket,而udp没有握手,没有分手,也不存在逻辑上的连接,所以我们也都说udp是非面向连接的socket。

我们都畏惧不知道状态的东西,所以即使tcp的协议比udp复杂很多,但对于系统应用层的编程来说,tcp编程其实比udp编程容易。而udp相对比较灵活,所以对于udp编程反而没那么容易,但其实掌握后udp编程也并不难。


udp协议


udp的首部


        2               2       (byte)
+---+---+---+---+---+---+---+---+ -
| src port | dst port | |
+---+---+---+---+---+---+---+---+ 8(bytes)
| length | check sum | |
+---+---+---+---+---+---+---+---+ -
| |
+ data +
| |
+---+---+---+---+---+---+---+---+

udp的首部真的很简单,头2个字节表示的是原端口,后2个字节表示的是目的端口,端口是系统层区分进程的标识。接着是udp长度,最后就是校验和,这个其实很重要,现在的系统都是默认开启udp校验和的,所以我们才能确保udp消息传输的完整性。如果这个校验和关闭了,那会让我们绝对会很忧伤,因为udp不仅不能保证数据一定到达,还不能保证即使数据到了,这个数据是否是正确的。比如:我在发送端发送了“hello”,而接收端却接收到了“hell”。如果真的是这样,我们就必须自己去校验数据的正确性。还好udp默认开发了校验,我们可以保证udp的数据完整性。


udp数据的封装


                                    +---------+
| 应用数据 |
+---------+
| |
v v
+---------+---------+
| udp首部 | 应用数据 |
+---------+---------+
| |
v UDP数据报 v
+---------+---------+---------+
| ip首部 | udp首部 | 应用数据 |
+---------+---------+---------+
| |
v IP数据报 v
+---------+---------+---------+---------+---------+
|以太网首部 | ip首部 | udp首部 | 应用数据 |以太网尾部 |
+---------+---------+---------+---------+---------+
| 14 20 8 4 |
| -> 以太网帧 <- |

数据的封装和tcp是一样,应用层的数据加上udp首部,构成udp数据报,再加上ip首部构成ip数据报,最后加上以太网首部和尾部构成以太网帧,经过网卡发送出去。


Golang udp实践


实践出真知,编程就需要多实践,才能体会其中的奥妙。


echo客户端和服务端


echo服务,实现数据包的回显,这是很多人网络编程起点,因为这个服务足够简单,但又把网络的数据流都过了一遍,这里也用go udp实现一个echo服务。

实现客户端发送一个“hello”,服务端接收消息并原封不动的返回给客户度。


server.go


package main

import (
"flag"
"fmt"
"log"
"net"
)

var addr = flag.String("addr", ":10000", "udp server bing address")

func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
flag.Parse()
}

func main() {
//Resolving address
udpAddr, err := net.ResolveUDPAddr("udp", *addr)
if err != nil {
log.Fatalln("Error: ", err)
}

// Build listining connections
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
log.Fatalln("Error: ", err)
}
defer conn.Close()

// Interacting with one client at a time
recvBuff := make([]byte, 1024)
for {
log.Println("Ready to receive packets!")
// Receiving a message
rn, rmAddr, err := conn.ReadFromUDP(recvBuff)
if err != nil {
log.Println("Error:", err)
return
}

fmt.Printf("<<< Packet received from: %s, data: %s\n", rmAddr.String(), string(recvBuff[:rn]))
// Sending the same message back to current client
_, err = conn.WriteToUDP(recvBuff[:rn], rmAddr)
if err != nil {
log.Println("Error:", err)
return
}
fmt.Println(">>> Sent packet to: ", rmAddr.String())
}
}

client1.go


package main

import (
"flag"
"fmt"
"log"
"net"
)

var raddr = flag.String("raddr", "127.0.0.1:10000", "remote server address")

func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
flag.Parse()
}

func main() {
// Resolving Address
remoteAddr, err := net.ResolveUDPAddr("udp", *raddr)
if err != nil {
log.Fatalln("Error: ", err)
}

// Make a connection
tmpAddr := &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 0,
}

conn, err := net.DialUDP("udp", tmpAddr, remoteAddr)
// Exit if some error occured
if err != nil {
log.Fatalln("Error: ", err)
}
defer conn.Close()

// write a message to server
_, err = conn.Write([]byte("hello"))
if err != nil {
log.Println(err)
} else {
fmt.Println(">>> Packet sent to: ", *raddr)
}

// Receive response from server
buf := make([]byte, 1024)
rn, rmAddr, err := conn.ReadFromUDP(buf)
if err != nil {
log.Println(err)
} else {
fmt.Printf("<<< %d bytes received from: %v, data: %s\n", rn, rmAddr, string(buf[:rn]))
}
}

client2.go


package main

import (
"flag"
"fmt"
"log"
"net"
)

var (
laddr = flag.String("laddr", "127.0.0.1:9000", "local server address")
raddr = flag.String("raddr", "127.0.0.1:10000", "remote server address")
)

func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
flag.Parse()
}

func main() {
// Resolving Address
localAddr, err := net.ResolveUDPAddr("udp", *laddr)
if err != nil {
log.Fatalln("Error: ", err)
}

remoteAddr, err := net.ResolveUDPAddr("udp", *raddr)
if err != nil {
log.Fatalln("Error: ", err)
}

// Build listening connections
conn, err := net.ListenUDP("udp", localAddr)
// Exit if some error occured
if err != nil {
log.Fatalln("Error: ", err)
}
defer conn.Close()

// write a message to server
_, err = conn.WriteToUDP([]byte("hello"), remoteAddr)
if err != nil {
log.Println(err)
} else {
fmt.Println(">>> Packet sent to: ", *raddr)
}

// Receive response from server
buf := make([]byte, 1024)
rn, remAddr, err := conn.ReadFromUDP(buf)
if err != nil {
log.Println(err)
} else {
fmt.Printf("<<< %d bytes received from: %v, data: %s\n", rn, remAddr, string(buf[:rn]))
}
}

这里实现echo的服务端和客户端,和tcp的差不多,但是有一些小细节需要注意。

对于server端,先net.ListenUDP建立udp一个监听,返回一个udp连接,这里需要注意udp不像tcp,建立tcp监听后返回的是一个Listener,然后阻塞等待接收一个新的连接,这样区别是因为udp一个非面向连接的协议,它没有会话管理。同时也因为udp是非面向连接的协议,当接收到消息后,想把消息返回给当前的客户端时,是不能像tcp一样,直接往conn里写的,而是需要指定远端地址。

对于client端,类似tcp先Dial,返回一个连接,对于发送消息用Write,接收消息用Read,当然udp也可以用ReadFromUDP,这样可以知道从哪得到的消息。但其实client也可以用另一种方式写,如client2.go程序,先建立一个监听,返回一个连接,用这个连接发送消息给服务端和从服务器接收消息,这种方式和tcp倒是有很大的不同。


参考


golang pkg

给大家推荐一款新的编辑神器

winlin 回复了问题 • 10 人关注 • 8 个回复 • 1644 次浏览 • 2017-06-23 17:33 • 来自相关话题

golang实现基于redis和consul的可水平扩展的排行榜服务范例

changjixiong 发表了文章 • 1 个评论 • 443 次浏览 • 2017-06-14 22:15 • 来自相关话题

  本文的完整代码见https://github... 查看全部

  本文的完整代码见https://github.com/changjixion ... snotehttps://github.com/changjixiong/goNotes/tree/master/utilshttps://github.com/changjixion ... nvoke如果文中没有显示链接说明链接在被转发的时候被干掉了,请搜索找到原文阅读。


概述


  排行榜在各种互联网应用中广泛存在。本文将用一个范例说明如何利用redis和consul实现可水平扩展的等级排行榜服务。


redis的使用


  实现排行榜有2个地方需要用到redis:


  1.存储玩家的排行信息,这里使用的是Sorted Sets,代码如下


err := Rds.ZAdd(
PlayerLvRankKey,
redis.Z{
Score: lvScoreWithTime(playerInfo.Lv, time.Now().Unix()),
Member: playerInfo.PlayerID,
},
).Err()

  其中lvScoreWithTime根据玩家等级及到达的时间计算score用于排名,等级相同的情况下,先到达等级的计算分值大于后达到的。


  2.存储玩家自身的信息(名字,ID等),用于在排行榜中显示,毕竟仅仅只有排行的ID是不够的。这里采用hashset,代码如下


// ma的类型为map[string]string
err := Rds.HMSet(fmt.Sprintf("playerInfo:%d", playerID), ma).Err()

服务器端


  先初始化redis连接


rdsClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", "127.0.0.1", 6379),
Password: "123456",
DB: 0,
})
playercache.Rds = rdsClient
rankservice.Rds = rdsClient

  增加初始玩家信息(略)。


  注册服务器接口,此部分详细说明请参考《golang通过反射使用json字符串调用struct的指定方法及返回json结果》http://changjixiong.com/reflect-invoke-method-of-struct-and-get-json-format-result/


reflectinvoke.RegisterMethod(rankservice.DefaultRankService)

  将服务注册到consul,此部分详细说明请参考《golang使用服务发现系统consul》http://changjixiong.com/use-consul-in-golang/


go registerServer()

  在端口9528上开启服务用于结构client请求并返回结果


ln, err := net.Listen("tcp", "0.0.0.0:9528")

if nil != err {
panic("Error: " + err.Error())
}

for {
conn, err := ln.Accept()
// 对Accept()产生的临时错误的处理,可以参考net/http/server.go中的func (srv *Server) Serve(l net.Listener)
if err != nil {
panic("Error: " + err.Error())
}

go RankServer(conn)
}

  增加玩家经验及设置玩家的排行榜数据的接口如下


func (rankService *RankService) AddPlayerExp(playerID, exp int) bool {

player := playercache.GetPlayerInfo(playerID)
if nil == player {
return false
}

player.Exp += exp
// 固定经验升级,可以按需要修改
if player.Exp >= playercache.LvUpExp {
player.Lv += 1
player.Exp = player.Exp - playercache.LvUpExp
rankService.SetPlayerLvRank(player)
}

playercache.SetPlayerInfo(player)

return true
}

func (rankService *RankService) SetPlayerLvRank(playerInfo *playercache.PlayerInfo) bool {

if nil == playerInfo {
return false
}

err := Rds.ZAdd(
PlayerLvRankKey,
redis.Z{
Score: lvScoreWithTime(playerInfo.Lv, time.Now().Unix()),
Member: playerInfo.PlayerID,
},
).Err()

if nil != err {
log.Println("RankService: SetPlayerLvRank:", err)
return false
}

return true
}

  获取指定排行的玩家信息的接口


func (rankService *RankService) GetPlayerByLvRank(start, count int64) []*playercache.PlayerInfo {

playerInfos := []*playercache.PlayerInfo{}

ids, err := Rds.ZRevRange(PlayerLvRankKey, start, start+count-1).Result()

if nil != err {
log.Println("RankService: GetPlayerByLvRank:", err)
return playerInfos
}

for _, idstr := range ids {
id, err := strconv.Atoi(idstr)

if nil != err {
log.Println("RankService: GetPlayerByLvRank:", err)
} else {
playerInfo := playercache.LoadPlayerInfo(id)

if nil != playerInfos {
playerInfos = append(playerInfos, playerInfo)
}
}
}

return playerInfos

}

客户端


  连接到consul并查到到排行榜服务的地址,连接并发送请求


func main() {

client, err := consulapi.NewClient(consulapi.DefaultConfig())

if err != nil {
log.Fatal("consul client error : ", err)
}

for {

time.Sleep(time.Second * 3)
var services map[string]*consulapi.AgentService
var err error

services, err = client.Agent().Services()

log.Println("services", strings.Repeat("-", 80))
for _, service := range services {
log.Println(service)
}

if nil != err {
log.Println("in consual list Services:", err)
continue
}

if _, found := services["rankNode_1"]; !found {
log.Println("rankNode_1 not found")
continue
}
log.Println("choose", strings.Repeat("-", 80))
log.Println("rankNode_1", services["rankNode_1"])
sendData(services["rankNode_1"])

}
}

运行情况


  consul上注册了2个自定义的服务,一个是名为serverNode的echo服务(来源 《golang使用服务发现系统consul》),另一个是本文的排行榜服务rankNode。


  服务器接收到的请求片段


get: {"func_name":"AddPlayerExp","params":[4,41]}
get: {"func_name":"AddPlayerExp","params":[2,35]}
get: {"func_name":"AddPlayerExp","params":[5,27]}
get: {"func_name":"GetPlayerByLvRank","params":[0,3]}

  客户端在consul中查找到服务并连接rankNode_1


services ----------------------------------------------------------
&{consul consul [] 8300 false}
&{rankNode_1 rankNode [serverNode] 9528 127.0.0.1 false}
&{serverNode_1 serverNode [serverNode] 9527 127.0.0.1 false}
choose ------------------------------------------------------------
rankNode_1 &{rankNode_1 rankNode [serverNode] 9528 127.0.0.1 false}

  客户端收到的回应片段


get: {"func_name":"AddPlayerExp","data":[true],"errorcode":0}
get: {"func_name":"AddPlayerExp","data":[true],"errorcode":0}
get: {"func_name":"AddPlayerExp","data":[true],"errorcode":0}
get: {"func_name":"GetPlayerByLvRank","data":[[{"player_id":3,"player_name":"玩家3","exp":57,"lv":4,"online":true},{"player_id":2,"player_name":"玩家2","exp":31,"lv":4,"online":true},{"player_id":1,"player_name":"玩家1","exp":69,"lv":3,"online":true}]],"errorcode":0}

一点说明


  为什么说是可水平扩展的排行榜服务呢?文中已经看到,目前有2个自定的服务注册在consul上,client选择了rankNode_1,那么如果注册了多个rankNode,则可以在其中某些节点不可用时,client可以选择其他可用的节点获取服务,而当不可用的节点重新可用时,可以继续注册到consul以提供服务。

GoCN每日新闻(2017-06-14)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 644 次浏览 • 2017-06-14 10:58 • 来自相关话题

GoCN每日新闻(2017-06-13)

stirlingx 回复了问题 • 2 人关注 • 1 个回复 • 600 次浏览 • 2017-06-13 11:18 • 来自相关话题

kcp-go源码解析

sheepbao 发表了文章 • 0 个评论 • 1841 次浏览 • 2017-06-12 15:24 • 来自相关话题

kcp-go源码解析

对kcp-go的源码解析,有错误之处,请一定告之。
sheepbao 2017.0612

概念

ARQ:自动重传请求(Automatic Repeat-reQuest,... 查看全部

kcp-go源码解析


对kcp-go的源码解析,有错误之处,请一定告之。

sheepbao 2017.0612


概念


ARQ:自动重传请求(Automatic Repeat-reQuest,ARQ)是OSI模型中数据链路层的错误纠正协议之一.

RTO:Retransmission TimeOut

FEC:Forward Error Correction


kcp简介


kcp是一个基于udp实现快速、可靠、向前纠错的的协议,能以比TCP浪费10%-20%的带宽的代价,换取平均延迟降低30%-40%,且最大延迟降低三倍的传输效果。纯算法实现,并不负责底层协议(如UDP)的收发。查看官方文档kcp


kcp-go是用go实现了kcp协议的一个库,其实kcp类似tcp,协议的实现也很多参考tcp协议的实现,滑动窗口,快速重传,选择性重传,慢启动等。

kcp和tcp一样,也分客户端和监听端。


    +-+-+-+-+-+            +-+-+-+-+-+
| Client | | Server |
+-+-+-+-+-+ +-+-+-+-+-+
|------ kcp data ------>|
|<----- kcp data -------|

kcp协议


layer model


+----------------------+
| Session |
+----------------------+
| KCP(ARQ) |
+----------------------+
| FEC(OPTIONAL) |
+----------------------+
| CRYPTO(OPTIONAL)|
+----------------------+
| UDP(Packet) |
+----------------------+

KCP header


KCP Header Format


      4           1   1     2 (Byte)
+---+---+---+---+---+---+---+---+
| conv |cmd|frg| wnd |
+---+---+---+---+---+---+---+---+
| ts | sn |
+---+---+---+---+---+---+---+---+
| una | len |
+---+---+---+---+---+---+---+---+
| |
+ DATA +
| |
+---+---+---+---+---+---+---+---+

代码结构


src/vendor/github.com/xtaci/kcp-go/
├── LICENSE
├── README.md
├── crypt.go 加解密实现
├── crypt_test.go
├── donate.png
├── fec.go 向前纠错实现
├── frame.png
├── kcp-go.png
├── kcp.go kcp协议实现
├── kcp_test.go
├── sess.go 会话管理实现
├── sess_test.go
├── snmp.go 数据统计实现
├── updater.go 任务调度实现
├── xor.go xor封装
└── xor_test.go

着重研究两个文件kcp.gosess.go


kcp浅析


kcp是基于udp实现的,所有udp的实现这里不做介绍,kcp做的事情就是怎么封装udp的数据和怎么解析udp的数据,再加各种处理机制,为了重传,拥塞控制,纠错等。下面介绍kcp客户端和服务端整体实现的流程,只是大概介绍一下函数流,不做详细解析,详细解析看后面数据流的解析。


kcp client整体函数流


和tcp一样,kcp要连接服务端需要先拨号,但是和tcp有个很大的不同是,即使服务端没有启动,客户端一样可以拨号成功,因为实际上这里的拨号没有发送任何信息,而tcp在这里需要三次握手。


DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)
V
net.DialUDP("udp", nil, udpaddr)
V
NewConn()
V
newUDPSession() {初始化UDPSession}
V
NewKCP() {初始化kcp}
V
updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
V
go sess.readLoop()
V
go s.receiver(chPacket)
V
s.kcpInput(data)
V
s.fecDecoder.decodeBytes(data)
V
s.kcp.Input(data, true, s.ackNoDelay)
V
kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
V
notifyReadEvent()

客户端大体的流程如上面所示,先Dial,建立udp连接,将这个连接封装成一个会话,然后启动一个go程,接收udp的消息。


kcp server整体函数流


ListenWithOptions() 
V
net.ListenUDP()
V
ServerConn()
V
newFECDecoder()
V
go l.monitor() {从chPacket接收udp数据,写入kcp}
V
go l.receiver(chPacket) {从upd接收数据,并入队列}
V
newUDPSession()
V
updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
V
s.kcpInput(data)`
V
s.fecDecoder.decodeBytes(data)
V
s.kcp.Input(data, true, s.ackNoDelay)
V
kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
V
notifyReadEvent()

服务端的大体流程如上图所示,先Listen,启动udp监听,接着用一个go程监控udp的数据包,负责将不同session的数据写入不同的udp连接,然后解析封装将数据交给上层。


kcp 数据流详细解析


不管是kcp的客户端还是服务端,他们都有io行为,就是读与写,我们只分析一个就好了,因为它们读写的实现是一样的,这里分析客户端的读与写。


kcp client 发送消息


s.Write(b []byte) 
V
s.kcp.WaitSnd() {}
V
s.kcp.Send(b) {将数据根据mss分段,并存在kcp.snd_queue}
V
s.kcp.flush(false) [flush data to output] {
if writeDelay==true {
flush
}else{
每隔`interval`时间flush一次
}
}
V
kcp.output(buffer, size)
V
s.output(buf)
V
s.conn.WriteTo(ext, s.remote)
V
s.conn..Conn.WriteTo(buf)

读写都是在sess.go文件中实现的,Write方法:


// Write implements net.Conn
func (s *UDPSession) Write(b []byte) (n int, err error) {
for {
...

// api flow control
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
n = len(b)
for {
if len(b) <= int(s.kcp.mss) {
s.kcp.Send(b)
break
} else {
s.kcp.Send(b[:s.kcp.mss])
b = b[s.kcp.mss:]
}
}

if !s.writeDelay {
s.kcp.flush(false)
}
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
return n, nil
}

...
// wait for write event or timeout
select {
case <-s.chWriteEvent:
case <-c:
case <-s.die:
}

if timeout != nil {
timeout.Stop()
}
}
}

假设发送一个hello消息,Write方法会先判断发送窗口是否已满,满的话该函数阻塞,不满则kcp.Send("hello"),而Send函数实现根据mss的值对数据分段,当然这里的发送的hello,长度太短,只分了一个段,并把它们插入发送的队列里。


func (kcp *KCP) Send(buffer []byte) int {
...
for i := 0; i < count; i++ {
var size int
if len(buffer) > int(kcp.mss) {
size = int(kcp.mss)
} else {
size = len(buffer)
}
seg := kcp.newSegment(size)
copy(seg.data, buffer[:size])
if kcp.stream == 0 { // message mode
seg.frg = uint8(count - i - 1)
} else { // stream mode
seg.frg = 0
}
kcp.snd_queue = append(kcp.snd_queue, seg)
buffer = buffer[size:]
}
return 0
}

接着判断参数writeDelay,如果参数设置为false,则立马发送消息,否则需要任务调度后才会触发发送,发送消息是由flush函数实现的。


// flush pending data
func (kcp *KCP) flush(ackOnly bool) {
var seg Segment
seg.conv = kcp.conv
seg.cmd = IKCP_CMD_ACK
seg.wnd = kcp.wnd_unused()
seg.una = kcp.rcv_nxt

buffer := kcp.buffer
// flush acknowledges
ptr := buffer
for i, ack := range kcp.acklist {
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
// filter jitters caused by bufferbloat
if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
seg.sn, seg.ts = ack.sn, ack.ts
ptr = seg.encode(ptr)

}
}
kcp.acklist = kcp.acklist[0:0]

if ackOnly { // flash remain ack segments
size := len(buffer) - len(ptr)
if size > 0 {
kcp.output(buffer, size)
}
return
}

// probe window size (if remote window size equals zero)
if kcp.rmt_wnd == 0 {
current := currentMs()
if kcp.probe_wait == 0 {
kcp.probe_wait = IKCP_PROBE_INIT
kcp.ts_probe = current + kcp.probe_wait
} else {
if _itimediff(current, kcp.ts_probe) >= 0 {
if kcp.probe_wait < IKCP_PROBE_INIT {
kcp.probe_wait = IKCP_PROBE_INIT
}
kcp.probe_wait += kcp.probe_wait / 2
if kcp.probe_wait > IKCP_PROBE_LIMIT {
kcp.probe_wait = IKCP_PROBE_LIMIT
}
kcp.ts_probe = current + kcp.probe_wait
kcp.probe |= IKCP_ASK_SEND
}
}
} else {
kcp.ts_probe = 0
kcp.probe_wait = 0
}

// flush window probing commands
if (kcp.probe & IKCP_ASK_SEND) != 0 {
seg.cmd = IKCP_CMD_WASK
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
ptr = seg.encode(ptr)
}

// flush window probing commands
if (kcp.probe & IKCP_ASK_TELL) != 0 {
seg.cmd = IKCP_CMD_WINS
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
ptr = seg.encode(ptr)
}

kcp.probe = 0

// calculate window size
cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
if kcp.nocwnd == 0 {
cwnd = _imin_(kcp.cwnd, cwnd)
}

// sliding window, controlled by snd_nxt && sna_una+cwnd
newSegsCount := 0
for k := range kcp.snd_queue {
if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
break
}
newseg := kcp.snd_queue[k]
newseg.conv = kcp.conv
newseg.cmd = IKCP_CMD_PUSH
newseg.sn = kcp.snd_nxt
kcp.snd_buf = append(kcp.snd_buf, newseg)
kcp.snd_nxt++
newSegsCount++
kcp.snd_queue[k].data = nil
}
if newSegsCount > 0 {
kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
}

// calculate resent
resent := uint32(kcp.fastresend)
if kcp.fastresend <= 0 {
resent = 0xffffffff
}

// check for retransmissions
current := currentMs()
var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
for k := range kcp.snd_buf {
segment := &kcp.snd_buf[k]
needsend := false
if segment.xmit == 0 { // initial transmit
needsend = true
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
} else if _itimediff(current, segment.resendts) >= 0 { // RTO
needsend = true
if kcp.nodelay == 0 {
segment.rto += kcp.rx_rto
} else {
segment.rto += kcp.rx_rto / 2
}
segment.resendts = current + segment.rto
lost++
lostSegs++
} else if segment.fastack >= resent { // fast retransmit
needsend = true
segment.fastack = 0
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
change++
fastRetransSegs++
} else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
needsend = true
segment.fastack = 0
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
change++
earlyRetransSegs++
}

if needsend {
segment.xmit++
segment.ts = current
segment.wnd = seg.wnd
segment.una = seg.una

size := len(buffer) - len(ptr)
need := IKCP_OVERHEAD + len(segment.data)

if size+need > int(kcp.mtu) {
kcp.output(buffer, size)
current = currentMs() // time update for a blocking call
ptr = buffer
}

ptr = segment.encode(ptr)
copy(ptr, segment.data)
ptr = ptr[len(segment.data):]

if segment.xmit >= kcp.dead_link {
kcp.state = 0xFFFFFFFF
}
}
}

// flash remain segments
size := len(buffer) - len(ptr)
if size > 0 {
kcp.output(buffer, size)
}

// counter updates
sum := lostSegs
if lostSegs > 0 {
atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
}
if fastRetransSegs > 0 {
atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
sum += fastRetransSegs
}
if earlyRetransSegs > 0 {
atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
sum += earlyRetransSegs
}
if sum > 0 {
atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
}

// update ssthresh
// rate halving, https://tools.ietf.org/html/rfc6937
if change > 0 {
inflight := kcp.snd_nxt - kcp.snd_una
kcp.ssthresh = inflight / 2
if kcp.ssthresh < IKCP_THRESH_MIN {
kcp.ssthresh = IKCP_THRESH_MIN
}
kcp.cwnd = kcp.ssthresh + resent
kcp.incr = kcp.cwnd * kcp.mss
}

// congestion control, https://tools.ietf.org/html/rfc5681
if lost > 0 {
kcp.ssthresh = cwnd / 2
if kcp.ssthresh < IKCP_THRESH_MIN {
kcp.ssthresh = IKCP_THRESH_MIN
}
kcp.cwnd = 1
kcp.incr = kcp.mss
}

if kcp.cwnd < 1 {
kcp.cwnd = 1
kcp.incr = kcp.mss
}
}

flush函数非常的重要,kcp的重要参数都是在调节这个函数的行为,这个函数只有一个参数ackOnly,意思就是只发送ack,如果ackOnly为true的话,该函数只遍历ack列表,然后发送,就完事了。
如果不是,也会发送真实数据。
在发送数据前先进行windSize探测,如果开启了拥塞控制nc=0,则每次发送前检测服务端的winsize,如果服务端的winsize变小了,自身的winsize也要更着变小,来避免拥塞。如果没有开启拥塞控制,就按设置的winsize进行数据发送。

接着循环每个段数据,并判断每个段数据的是否该重发,还有什么时候重发:



  1. 如果这个段数据首次发送,则直接发送数据。

  2. 如果这个段数据的当前时间大于它自身重发的时间,也就是RTO,则重传消息。

  3. 如果这个段数据的ack丢失累计超过resent次数,则重传,也就是快速重传机制。这个resent参数由resend参数决定。

  4. 如果这个段数据的ack有丢失且没有新的数据段,则触发ER,ER相关信息ER


最后通过kcp.output发送消息hello,output是个回调函数,函数的实体是sess.go的:


func (s *UDPSession) output(buf []byte) {
var ecc [][]byte

// extend buf's header space
ext := buf
if s.headerSize > 0 {
ext = s.ext[:s.headerSize+len(buf)]
copy(ext[s.headerSize:], buf)
}

// FEC stage
if s.fecEncoder != nil {
ecc = s.fecEncoder.Encode(ext)
}

// encryption stage
if s.block != nil {
io.ReadFull(rand.Reader, ext[:nonceSize])
checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
s.block.Encrypt(ext, ext)

if ecc != nil {
for k := range ecc {
io.ReadFull(rand.Reader, ecc[k][:nonceSize])
checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
s.block.Encrypt(ecc[k], ecc[k])
}
}
}

// WriteTo kernel
nbytes := 0
npkts := 0
// if mrand.Intn(100) < 50 {
for i := 0; i < s.dup+1; i++ {
if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
nbytes += n
npkts++
}
}
// }

if ecc != nil {
for k := range ecc {
if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
nbytes += n
npkts++
}
}
}
atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
}

output函数才是真正的将数据写入内核中,在写入之前先进行了fec编码,fec编码器的实现是用了一个开源库github.com/klauspost/reedsolomon,编码以后的hello就不是和原来的hello一样了,至少多了几个字节。
fec编码器有两个重要的参数reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShardsparityShards,这两个参数决定了fec的冗余度,冗余度越大抗丢包性就越强。


kcp的任务调度器


其实这里任务调度器是一个很简单的实现,用一个全局变量updater来管理session,代码文件为updater.go。其中最主要的函数


func (h *updateHeap) updateTask() {
var timer <-chan time.Time
for {
select {
case <-timer:
case <-h.chWakeUp:
}

h.mu.Lock()
hlen := h.Len()
now := time.Now()
if hlen > 0 && now.After(h.entries[0].ts) {
for i := 0; i < hlen; i++ {
entry := heap.Pop(h).(entry)
if now.After(entry.ts) {
entry.ts = now.Add(entry.s.update())
heap.Push(h, entry)
} else {
heap.Push(h, entry)
break
}
}
}
if hlen > 0 {
timer = time.After(h.entries[0].ts.Sub(now))
}
h.mu.Unlock()
}
}

任务调度器实现了一个堆结构,每当有新的连接,session都会插入到这个堆里,接着for循环每隔interval时间,遍历这个堆,得到entry然后执行entry.s.update()。而entry.s.update()会执行s.kcp.flush(false)来发送数据。


总结


这里简单介绍了kcp的整体流程,详细介绍了发送数据的流程,但未介绍kcp接收数据的流程,其实在客户端发送数据后,服务端是需要返回ack的,而客户端也需要根据返回的ack来判断数据段是否需要重传还是在队列里清除该数据段。处理返回来的ack是在函数kcp.Input()函数实现的。具体详细流程下次再介绍。

GoCN每日新闻(2017-06-12)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 641 次浏览 • 2017-06-12 10:25 • 来自相关话题

Golang逃逸分析

sheepbao 发表了文章 • 0 个评论 • 3034 次浏览 • 2017-06-11 11:56 • 来自相关话题

Golang逃逸分析

介绍逃逸分析的概念,go怎么开启逃逸分析的log。
以下资料来自互联网,有错误之处,请一定告之。
sheepbao 2017.06.10

什么是逃逸分析

w... 查看全部

Golang逃逸分析


介绍逃逸分析的概念,go怎么开启逃逸分析的log。

以下资料来自互联网,有错误之处,请一定告之。

sheepbao 2017.06.10


什么是逃逸分析


wiki上的定义


In compiler optimization, escape analysis is a method for determining the dynamic scope of pointers - where in the program a pointer can be accessed. It is related to pointer analysis and shape analysis.


When a variable (or an object) is allocated in a subroutine, a pointer to the variable can escape to other threads of execution, or to calling subroutines. If an implementation uses tail call optimization (usually required for functional languages), objects may also be seen as escaping to called subroutines. If a language supports first-class continuations (as do Scheme and Standard ML of New Jersey), portions of the call stack may also escape.


If a subroutine allocates an object and returns a pointer to it, the object can be accessed from undetermined places in the program — the pointer has "escaped". Pointers can also escape if they are stored in global variables or other data structures that, in turn, escape the current procedure.


Escape analysis determines all the places where a pointer can be stored and whether the lifetime of the pointer can be proven to be restricted only to the current procedure and/or threa


大概的意思是在编译程序优化理论中,逃逸分析是一种确定指针动态范围的方法,可以分析在程序的哪些地方可以访问到指针。它涉及到指针分析和形状分析。
当一个变量(或对象)在子程序中被分配时,一个指向变量的指针可能逃逸到其它执行线程中,或者去调用子程序。如果使用尾递归优化(通常在函数编程语言中是需要的),对象也可能逃逸到被调用的子程序中。
如果一个子程序分配一个对象并返回一个该对象的指针,该对象可能在程序中的任何一个地方被访问到——这样指针就成功“逃逸”了。如果指针存储在全局变量或者其它数据结构中,它们也可能发生逃逸,这种情况是当前程序中的指针逃逸。
逃逸分析需要确定指针所有可以存储的地方,保证指针的生命周期只在当前进程或线程中。


逃逸分析的用处(为了性能)



  • 最大的好处应该是减少gc的压力,不逃逸的对象分配在栈上,当函数返回时就回收了资源,不需要gc标记清除。

  • 因为逃逸分析完后可以确定哪些变量可以分配在栈上,栈的分配比堆快,性能好

  • 同步消除,如果你定义的对象的方法上有同步锁,但在运行时,却只有一个线程在访问,此时逃逸分析后的机器码,会去掉同步锁运行。


go消除了堆和栈的区别


go在一定程度消除了堆和栈的区别,因为go在编译的时候进行逃逸分析,来决定一个对象放栈上还是放堆上,不逃逸的对象放栈上,可能逃逸的放堆上。


开启go编译时的逃逸分析日志


开启逃逸分析日志很简单,只要在编译的时候加上-gcflags '-m',但是我们为了不让编译时自动内连函数,一般会加-l参数,最终为-gcflags '-m -l'


Example:


package main

import (
"fmt"
)

func main() {
s := "hello"
fmt.Println(s)
}

go run -gcflags '-m -l' escape.go

Output:


# command-line-arguments
escape_analysis/main.go:9: s escapes to heap
escape_analysis/main.go:9: main ... argument does not escape
hello

什么时候逃逸,什么时候不逃逸


Example1:


package main

type S struct{}

func main() {
var x S
y := &x
_ = *identity(y)
}

func identity(z *S) *S {
return z
}

Output:


# command-line-arguments
escape_analysis/main.go:11: leaking param: z to result ~r1 level=0
escape_analysis/main.go:7: main &x does not escape

这里的第一行表示z变量是“流式”,因为identity这个函数仅仅输入一个变量,又将这个变量作为返回输出,但identity并没有引用z,所以这个变量没有逃逸,而x没有被引用,且生命周期也在mian里,x没有逃逸,分配在栈上。


Example2:


package main

type S struct{}

func main() {
var x S
_ = *ref(x)
}

func ref(z S) *S {
return &z
}

Output:


# command-line-arguments
escape_analysis/main.go:11: &z escapes to heap
escape_analysis/main.go:10: moved to heap: z

这里的z是逃逸了,原因很简单,go都是值传递,ref函数copy了x的值,传给z,返回z的指针,然后在函数外被引用,说明z这个变量在函数內声明,可能会被函数外的其他程序访问。所以z逃逸了,分配在堆上


对象里的变量会怎么样呢?看下面


Example3:


package main

type S struct {
M *int
}

func main() {
var i int
refStruct(i)
}

func refStruct(y int) (z S) {
z.M = &y
return z
}

Output:


# command-line-arguments
escape_analysis/main.go:13: &y escapes to heap
escape_analysis/main.go:12: moved to heap: y

看日志的输出,这里的y是逃逸了,看来在struct里好像并没有区别,有可能被函数外的程序访问就会逃逸


Example4:


package main

type S struct {
M *int
}

func main() {
var i int
refStruct(&i)
}

func refStruct(y *int) (z S) {
z.M = y
return z
}

Output:


# command-line-arguments
escape_analysis/main.go:12: leaking param: y to result z level=0
escape_analysis/main.go:9: main &i does not escape

这里的y没有逃逸,分配在栈上,原因和Example1是一样的。


Example5:


package main

type S struct {
M *int
}

func main() {
var x S
var i int
ref(&i, &x)
}

func ref(y *int, z *S) {
z.M = y
}

Output:


# command-line-arguments
escape_analysis/main.go:13: leaking param: y
escape_analysis/main.go:13: ref z does not escape
escape_analysis/main.go:10: &i escapes to heap
escape_analysis/main.go:9: moved to heap: i
escape_analysis/main.go:10: main &x does not escape

这里的z没有逃逸,而i却逃逸了,这是因为go的逃逸分析不知道z和i的关系,逃逸分析不知道参数y是z的一个成员,所以只能把它分配给堆。


参考


Go Escape Analysis Flaws

go-escape-analysis

GoCN每日新闻(2017-06-11)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 616 次浏览 • 2017-06-11 11:16 • 来自相关话题

GoCN每日新闻(2017-06-10)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 611 次浏览 • 2017-06-10 10:50 • 来自相关话题

GoCN每日新闻(2017-06-09)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 713 次浏览 • 2017-06-09 10:16 • 来自相关话题

GoCN每日新闻(2017-06-08)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 708 次浏览 • 2017-06-08 09:36 • 来自相关话题

GoCN每日新闻 (2017-06-07)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 672 次浏览 • 2017-06-07 10:02 • 来自相关话题

golang使用服务发现系统consul

changjixiong 发表了文章 • 0 个评论 • 677 次浏览 • 2017-06-06 22:15 • 来自相关话题

  本文的完整代码见 https://github.com/changjixiong/goNotes/tree/master/consulnotes ,如果文中没有显示链接说明链接在被转发的时候被干掉了,请搜索找到原文阅读。


consul是什么


"Consul is a distributed, highly available, datacenter-aware, service discovery and configuration system. It can be used to present services and nodes in a flexible and powerful interface that allows clients to always have an up-to-date view of the infrastructure they are a part of."


  引用一段网上对consul文档的翻译(http://consul.la/intro/what-is-consul)


Consul有多个组件,但是整体来看,它是你基础设施中用于发现和配置服务的一个工具。它提供如下几个关键功能:

* 服务发现: Consul的某些客户端可以提供一个服务,例如api或者mysql,其它客户端可以使用Consul去发现这个服务的提供者。使用DNS或者HTTP,应用可以很容易的找到他们所依赖的服务。
* 健康检查: Consul客户端可以提供一些健康检查,这些健康检查可以关联到一个指定的服务(服务是否返回200 OK),也可以关联到本地节点(内存使用率是否在90%以下)。这些信息可以被一个操作员用来监控集群的健康状态,被服务发现组件路由时用来远离不健康的主机。
* 键值存储: 应用可以使用Consul提供的分层键值存储用于一些目的,包括动态配置、特征标记、协作、leader选举等等。通过一个简单的HTTP API可以很容易的使用这个组件。
* 多数据中心: Consul对多数据中心有非常好的支持,这意味着Consul用户不必担心由于创建更多抽象层而产生的多个区域。
Consul被设计为对DevOps群体和应用开发者友好,他非常适合现代的、可伸缩的基础设施。

范例


  网上关于consul的文档及使用说明有很多,然而却缺少关于使用的范例,接下来的内容将用一个范例来演示如何找到可服务的节点。完整的代码见https://github.com/changjixion ... notes


  假设在一个系统中,节点A需要访问某种服务,该服务有N个节点可提供服务,这些节点位于服务集群groupB,节点A只需要连接上groupB中的任一节点即可获得服务。


启动consul


  consul提供开发模式用于启动单节点服务供开发调试用,运行命令consul agent -dev 启动consul,输出的信息中有一行


Client Addr: 127.0.0.1 (HTTP: 8500, HTTPS: -1, DNS: 8600, RPC: 8400)

  显示了consul运行参数,通过网址http://127.0.0.1:8500/ui/#/dc1/nodes可以查看节点与服务


注册服务并添加健康检查


  下面的代码将向consul注册一个服务


import (
"fmt"
"log"

"net/http"

consulapi "github.com/hashicorp/consul/api"
)

func consulCheck(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "consulCheck")
}

func registerServer() {

config := consulapi.DefaultConfig()
client, err := consulapi.NewClient(config)

if err != nil {
log.Fatal("consul client error : ", err)
}

checkPort := 8080

registration := new(consulapi.AgentServiceRegistration)
registration.ID = "serverNode_1"
registration.Name = "serverNode"
registration.Port = 9527
registration.Tags = []string{"serverNode"}
registration.Address = "127.0.0.1"
registration.Check = &consulapi.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s:%d%s", registration.Address, checkPort, "/check"),
Timeout: "3s",
Interval: "5s",
DeregisterCriticalServiceAfter: "30s", //check失败后30秒删除本服务
}

err = client.Agent().ServiceRegister(registration)

if err != nil {
log.Fatal("register server error : ", err)
}

http.HandleFunc("/check", consulCheck)
http.ListenAndServe(fmt.Sprintf(":%d", checkPort), nil)

}

  consulapi.DefaultConfig()的源代码显示默认采用的是http方式连接"127.0.0.1:8500",前文中显示consul开发模式默认提供的http服务是在127.0.0.1:8500,在实际使用中需要设置为实际的参数。


  consulapi.AgentServiceCheck中的HTTP指定了健康检查的接口地址即127.0.0.1:8080/check,consulCheck函数响应这个接口调用,返回200状态码及一段字符串"consulCheck",健康检查还有其他几种方式,具体可以参考官方文档。


  consulapi.AgentServiceCheck中的DeregisterCriticalServiceAfter指定检查不通过后多长时间注销本服务,这里设置为30秒。


  向consul注册的服务地址为127.0.0.1:9527,以下是在127.0.0.1:9527上提供的echo服务。


ln, err := net.Listen("tcp", "0.0.0.0:9527")

if nil != err {
panic("Error: " + err.Error())
}

for {
conn, err := ln.Accept()

if err != nil {
panic("Error: " + err.Error())
}

go EchoServer(conn)
}

  服务启动后,访问http://127.0.0.1:8500/ui/#/dc1/nodes 会发现 "2 services",点开后会在页面上看到serverNode 127.0.0.1:9527,表明服务信息已经注册。以下信息显示健康检查通过。


HTTP GET http://127.0.0.1:8080/check: 200 OK Output: consulCheck

使用服务


  服务使用方client通过以下代码向consul查询可用的服务(忽略错误处理)


client, err := consulapi.NewClient(consulapi.DefaultConfig())//非默认情况下需要设置实际的参数
...
services, err = client.Agent().Services()
...
if _, found := services["serverNode_1"]; !found {
log.Println("serverNode_1 not found")
continue
} //查找名为serverNode_1的服务

  查找到服务后连接服务并发送数据


conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", service.Address, service.Port))
...

  client先启动,服务后启动,然后服务关闭,运行日志如下:


serverNode_1 not found
get: EchoServerHello World, 001
get: EchoServerHello World, 002
...
get: EchoServerHello World, 008
Read Buffer Error: EOF
dial tcp 127.0.0.1:9527: getsockopt: connection refused
dial tcp 127.0.0.1:9527: getsockopt: connection refused
...
dial tcp 127.0.0.1:9527: getsockopt: connection refused
serverNode_1 not found
serverNode_1 not found

  服务启动前提示serverNode_1没找到,服务启动后数据交互正常,服务关闭后consul尚未注销服务client提示服务无法连接,稍后consul注销了失效的服务,client显示服务没有找到。


使用场景设想


  假设一个网络游戏有N个副本服务节点提供服务,在生产运行期间,有的节点可能故障,有些节点可能负载过高,有些节点可能故障后自行回复需要能重新上线提供服务。通过consul系统可以随时让网关服务器或者逻辑服务器获取可用的副本服务节点并将请求转发到该节点,保持副本服务的高效可用。


  其他类型的服务也可以采用同样的方式进行水平扩展。进一步的,可以在负载高的时候启动新节点,在负载低的时候关闭部分节点,在云服务器上实现这些非常方便,并且由于是按使用计费,通过负载增加或关闭节点也可以避免云服务器资源的浪费从而降低费用。


一点问题


  服务注册时设置检查失败后30秒注销服务,实际运行中大约80秒才注销服务,原因待查。

GoCN每日新闻(2017-06-06)

回复

astaxie 发起了问题 • 1 人关注 • 0 个回复 • 639 次浏览 • 2017-06-06 09:50 • 来自相关话题