技术讨论

技术讨论

GOLANG使用Context管理关联goroutine

技术讨论winlin 发表了文章 • 6 个评论 • 161 次浏览 • 4 天前 • 来自相关话题

一般一个业务很少不用到goroutine的,因为很多方法是需要等待的,例如http.Server.ListenAndServe这个就是等待的,除非关闭了Server或Listener,否则是不会返回的。除非是一个API服务器,否... 查看全部

一般一个业务很少不用到goroutine的,因为很多方法是需要等待的,例如http.Server.ListenAndServe这个就是等待的,除非关闭了Server或Listener,否则是不会返回的。除非是一个API服务器,否则肯定需要另外起goroutine发起其他的服务,而且对于API服务器来说,在http.Handler的处理函数中一般也需要起goroutine,如何管理这些goroutine,在GOLANG1.7提供context.Context


先看一个简单的,如果启动两个goroutine,一个是HTTP,还有个信号处理的收到退出信号做清理:


wg := sync.WaitGroup{}
defer wg.Wait()

wg.Add(1)
go func() {
defer wg.Done()

ss := make(os.Signal, 0)
signal.Notify(ss, syscall.SIGINT, syscall.SIGTERM)
for s := ss {
fmt.Println("Got signal", s)
break
}
}()

wg.Add(1)
go func() {
defer wg.Done()

svr := &http.Server{ Addr:":8080", Handler:nil, }
fmt.Println(svr.ListenAndServe())
}

很清楚,起了两个goroutine,然后用WaitGroup等待它们退出。如果它们之间没有交互,不互相影响,那真的是蛮简单的,可惜这样是不行的,因为信号的goroutine收到退出信号后,应该通知server退出。暴力一点的是直接调用svr.Close(),但是如果有些请求还需要取消怎么办呢?最好用Context了:


wg := sync.WaitGroup{}
defer wg.Wait()

ctx,cancel := context.WithCancel(context.Background())

wg.Add(1)
go func() {
defer wg.Done()

ss := make(chan os.Signal, 0)
signal.Notify(ss, syscall.SIGINT, syscall.SIGTERM)
select {
case <- ctx.Done():
return
case s := <- ss:
fmt.Println("Got signal", s)
cancel() // 取消请求,通知用到ctx的所有goroutine
return
}
}()

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

svr := &http.Server{ Addr:":8080", Handler:nil, }

go func(){
select {
case <- ctx.Done():
svr.Close()
}
}

fmt.Println(svr.ListenAndServe())
}

这个方式可以在新开goroutine时继续使用,譬如新加一个goroutine,里面读写了UDPConn:


wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

var conn *net.UDPConn
if conn,err = net.Dial("udp", "127.0.0.1:1935"); err != nil {
fmt.Println("Dial UDP server failed, err is", err)
return
}

fmt.Println(UDPRead(ctx, conn))
}()

UDPRead = func(ctx context.Context, conn *net.UDPConn) (err error) {
wg := sync.WaitGroup{}
defer wg.Wait()

ctx, cancel := context.WithCancel(ctx)

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

for {
b := make([]byte, core.MTUSize)
size, _, err := conn.ReadFromUDP(b)
// 处理UDP包 b[:size]
}
}()

select {
case <-ctx.Done():
conn.Close()
}
return
}

如果只是用到HTTP Server,可以这么写:


func run(ctx contex.Context) {
server := &http.Server{Addr: addr, Handler: nil}
go func() {
select {
case <-ctx.Done():
server.Close()
}
}()

http.HandleFunc("/api", func(w http.ResponseWriter, r *http.Request) {
})

fmt.Println(server.ListenAndServe())
}

如果需要提供一个API来让服务器退出,可以这么写:


func run(ctx contex.Context) {
server := &http.Server{Addr: addr, Handler: nil}

ctx, cancel := context.WithCancel(ctx)
http.HandleFunc("/quit", func(w http.ResponseWriter, r *http.Request) {
cancel() // 使用局部的ctx和cancel
})

go func() {
select {
case <-ctx.Done():
server.Close()
}
}()

fmt.Println(server.ListenAndServe())
}

使用局部的ctx和cancel,可以避免cancel传入的ctx,只是影响当前的ctx。

GOLANG使用嵌入结构实现接口

技术讨论winlin 发表了文章 • 0 个评论 • 102 次浏览 • 4 天前 • 来自相关话题

考虑一个Packet接口,一般会返回一个Header,例如:

type PacketHeader struct {
    ID uint32
    Timesta... 			查看全部
					

考虑一个Packet接口,一般会返回一个Header,例如:


type PacketHeader struct {
ID uint32
Timestamp uint64
}

type Packet interface {
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
Header() *PacketHeader
}

如果是OO的语言,一般会有一个基类,里面包含了Header和实现这个Header:


class BasePacket : public Packet {
protected:
PacketHeader h;
public:
virtual Header() *PacketHeader;
};

class HandshakePacket : public BasePacket {
};

在子类中就都实现了这个Header()方法了,在GOLANG同样可以做到,通过在Header中定义方法,在Packet中包含Header就可以。


func (v *PacketHeader) Header() *PakcetHeader {
return v
}

type HandshakePacket struct {
PacketHeader
}

看起来还差不多的,都可以实现,golang只是代码少一点,清晰一点点而已。考虑要添加一些辅助函数,譬如给Packet添加是否是紧急类型的包,那OO语言得做一次代理:


type Packet interface {
IsErgency() bool
}

class BasePacketHeader {
public:
bool IsErgency() {
return realtime < 3;
}
}

class BasePacket {
public:
bool IsErgency() {
return h.IsErgency();
}
}

而在GOLANG中,只需要在Header实现就好了:


func (v *PacketHeader) IsErgency() bool {
return v.realtime < 3
}

更高级的可以直接嵌入接口。譬如context.Context的实现,cancelCtx直接嵌入了一个接口:


type cancelCtx struct {
Context

通过指定类型,或者初始化的顺序初始化struct


func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{
Context: parent,
done: make(chan struct{}),
}
}

结构嵌套的方式,让组合实现起来非常便捷,避免频繁的代理。

使用两个context实现CLOSE包的超时等待

技术讨论winlin 发表了文章 • 0 个评论 • 92 次浏览 • 4 天前 • 来自相关话题

在UDP中,一般发送者发送包后,如果一定的时间对方没有收到,就需要重传。例如UDP实现握手的过程,如果握手的包,比如RTMFP协议的IHELLO,发送给对方后,如果一定1秒没有收到,就应该重发一次,然后等3秒、6秒、9秒,如果最后没有收到就是超时了。查看全部

在UDP中,一般发送者发送包后,如果一定的时间对方没有收到,就需要重传。例如UDP实现握手的过程,如果握手的包,比如RTMFP协议的IHELLO,发送给对方后,如果一定1秒没有收到,就应该重发一次,然后等3秒、6秒、9秒,如果最后没有收到就是超时了。


最后一个Close包,发送者不能等待这么长的时间,所以需要设置一个较短的时间做超时退出。一般收发都是一个context,在最后这个Close包时,收到ctx.Done也不能立刻退出,因为还需要稍微等待,譬如600毫秒如果没有收到响应才能退出。


一个可能的实现是这样:


in := make(chan []byte)

func Close(ctx context.Context) (err error) {
timeous := ... // 1s,3s,6s,9s...
for _, to := range timeouts {
// 发送给对方WriteToUDP("CLOSE", peer)
// 另外一个goroutine读取UDP包到in

select {
case <- time.After(to):
case <- in:
fmt.Println("Close ok")
return
case <- ctx.Done():
fmt.Println("Program quit")
return
}
}
return
}

但是这个问题在于,在程序退出时,一般都会cancel ctx然后调用Close方法,这个地方就不会等待任何的超时,就打印"Program quit"然后返回了。解决方案是用另外一个context。但是如何处理之前的ctx的done呢?可以再起一个goroutine做同步:


in := make(chan []byte)

func Close(ctx context.Context) (err error) {
ctxRead,cancelRead := context.WithCancel(context.Background())
go func(){ // sync ctx with ctxRead
select {
case <-ctxRead.Done():
case <-ctx.Done():
select {
case <-ctxRead.Done():
case <-time.After(600*time.Milliseconds):
cancelRead()
}
}
}()

ctx = ctxRead // 下面直接用ctxRead。
timeous := ... // 1s,3s,6s,9s...
for _, to := range timeouts {
// 发送给对方WriteToUDP("CLOSE", peer)
// 另外一个goroutine读取UDP包到in

select {
case <- time.After(to):
case <- in:
fmt.Println("Close ok")
return
case <- ctx.Done():
fmt.Println("Program quit")
return
}
}
return
}

这样在主要的逻辑中,还是只需要处理ctx,但是这个ctx已经是新的context了。不过在实际的过程中,这个sync的goroutine需要确定起来后,才能继续,否则会造成执行顺序不确定:


sc := make(chan bool, 1)
go func(){ // sync ctx with ctxRead
sc <- true
select {
......
}
<- sc

使用context,来控制多个goroutine的执行和取消,是非常好用的,关键可以完全关注业务的逻辑,而不会引入因为ctx取消或者超时机制而造成的特殊逻辑。

GOLANG实现超时对象检测的最好理解的方式

技术讨论winlin 发表了文章 • 0 个评论 • 143 次浏览 • 6 天前 • 来自相关话题

依赖于心跳的系统,都需要超时检测。比如P2P系统中客户端每隔120秒向数据服务器发送一次数据汇总,服务器就需要维护一个超时时间。比如一个UDP服务器,在和客户端之间创建Session之后,如果没有数据包,一般会有Ping包,说明这个Session是存活的... 查看全部

依赖于心跳的系统,都需要超时检测。比如P2P系统中客户端每隔120秒向数据服务器发送一次数据汇总,服务器就需要维护一个超时时间。比如一个UDP服务器,在和客户端之间创建Session之后,如果没有数据包,一般会有Ping包,说明这个Session是存活的,服务器在发现Session超时后也需要清理。


首先,服务器一般需要维护一个列表,以Peer为例:


type Peer struct {
id uint64
heartbeat time.Time
}

type Server struct {
peers map[uint64]*Peer
lock sync.Mutex
}

创建Peer,同时在收到Ping消息后,更新Peer的心跳时间:


func (v *Server) Create(id uint64) *Peer {
v.lock.Lock()
defer v.lock.UnLock()

p = &Peer { id:id, heartbeat: time.Now(), }
v.peers[id] = p
return p
}

func (v *Server) OnPing(id uint64) {
v.lock.Lock()
defer v.lock.UnLock()

if p,ok := v.peers[id]; ok {
p.heatbeat = time.Now()
}
}

当然,需要起一个goroutine定期扫描这个列表, 假设300秒超时:


go func(v *Server) {
for {
func(){
v.lock.Lock()
defer v.lock.UnLock()

now := time.Now()
for id,p := range v.peers {
if p.heartbeat.Add(300 * time.Second).Before(now) {
delete(v.peers, id)
}
}
}()
time.Sleep(30 * time.Second)
}
}(server)

如果Peers的数目非常多,那么扫描时每次都需要锁定v.peers,会导致其他的业务都无法进行。特别是清理Peer这个过程如果比较复杂,譬如需要发起io请求,是一个费时的操作时,就会造成系统的等待。


一般来说,超时的Peer不会很多,因此可以用chan放一个超时的peer,每个peer专门起一个goroutine来看什么时候超时,这样就可以在检测超时时避免用锁了:


timeout := make(chan *Peer)

func (v *Server) Create(id uint64) *Peer {
v.lock.Lock()
defer v.lock.UnLock()

p = &Peer { id:id, heartbeat: time.Now(), }
v.peers[id] = p
return p

go func(p *Peer) {
for {
tm := p.heartbeat
<- time.After(300 * time.Second)
if tm.Equal(p.heartbeat) {
timeout <- p
break
}
}
}(p)
}

go func(v *Server){
for gw := range timeout {
func(){
lgateways.Lock()
defer lgateways.Unlock()

delete(gateways, gw.port)
}()

// Do something cleanup about the gateway.
}
}(server)

这样就只有在有Peer超时时,才真正锁住Server.peers

GOLANG接口适配,组合方式的灵活接口演化

技术讨论winlin 发表了文章 • 0 个评论 • 174 次浏览 • 2017-05-15 20:49 • 来自相关话题

在OO(Object Oriented)原则中,有一条叫做:优先使用组合,而不是继承。虽然GOLANG并不是OO的语言(没有继承和多态),但是不妨碍GOLANG使用这条原则,而GOLANG的作者就强调过这一点,在GOLANG中是使用组合而非继承来扩展。<... 查看全部

在OO(Object Oriented)原则中,有一条叫做:优先使用组合,而不是继承。虽然GOLANG并不是OO的语言(没有继承和多态),但是不妨碍GOLANG使用这条原则,而GOLANG的作者就强调过这一点,在GOLANG中是使用组合而非继承来扩展。


装逼的说来,继承是一种名词化的语言体系,先进行业务抽象然后设计类体系和继承关系。而组合,强制使用接口,因为组合中使用的总是另外一个对象的接口,通过动词的组合,实现目标,比如不管是什么只要有Write([]byte)(int,error)这个动作,就实现了这个接口,其他对象组合这个接口后,对外也看起来就是个io.Writer的接口。


比如,GOALNG1.8支持了writev,一般在面向对象会这么的搞:


class Socket {
int Write(void*, int);
int Writev(const iovec*, int);
};

对的吧?一个Socket可以写数据,也可以用writev写iovec向量,就是一次性写入多个内存块。



Note: 有时候内存块是不连续的,比如一个Video帧,发送给不同的客户端时,Header是需要修改的,但是Payload都一样,那么可以针对每个客户端只创建一个header,然后公用payload,但是这时候两个内存指针是不连续的,特别是需要同时写入多个视频帧时,writev就很神奇的避免了内存拷贝writev(header+payload),具体参考下writev的资料哈。



这样有个问题,并非所有系统都支持Writev的,并非所有Socket都支持Writev的,如果是自己写个代码,当然是可以随便这么搞的,但是作为标准库,GOLANG当然是不能这么做的。GOLANG就加了一个接口(一个新动作)叫做net.buffersWriter,如果实现了这个接口就用writev。先看用法:


    conn,err := net.Dial("tcp", "127.0.0.1:1935")

buffers := Buffers{
[]byte("once upon a time in "),
[]byte("Gopherland ... "),
}

buffers.WriteTo(conn)

在Buffers的WriteTo方法会判断是否是writev的接口,如果是则用writev写,否则就一个个的写:


func (v *Buffers) WriteTo(w io.Writer) (n int64, err error) {
if wv, ok := w.(buffersWriter); ok {
return wv.writeBuffers(v)
}

实际上conn是net.TcpConn,里面有个fd *net.netFD,它实现了net.buffersWriter接口,所以最后调用的就是(fd *netFD) writeBuffers(v *Buffers)


func (c *conn) writeBuffers(v *Buffers) (int64, error) {
n, err := c.fd.writeBuffers(v)

func (fd *netFD) writeBuffers(v *Buffers) (n int64, err error) {
iovecs = append(iovecs, syscall.Iovec{Base: &chunk[0]})
wrote, _, e0 := syscall.Syscall(syscall.SYS_WRITEV,
uintptr(fd.sysfd),
uintptr(unsafe.Pointer(&iovecs[0])),
uintptr(len(iovecs)))

对于其他没有实现这个接口的对象,就每个向量循环的写。


在看一个例子http.Get(url string),客户端发起一个HTTP请求:


http.Get("http://localhost:1985/api/v1/versions")
// 实际上调用的是:
func (c *Client) Get(url string)
// 然后调用:
(c *Client) Do(req *Request)

在GOLANG1.7中引入了context的概念,用来支持cancel,怎么用的:


ctx,cancel := context.WithCancel(context.Background())

select {
case <- ctx.Done():
// Cancelled.
case <- time.After(...):
// Timeout
case <- other events:
// Other events.
}

如何支持取消的HTTP请求呢?给http.Get加个ctx参数?例如http.Get(ctx, url)这样?那改动得多大啊,而且还不能兼容之前的API,泪奔~看看GOLANG的解决:


ctx,cancel := context.WithCancel(context.Background())
go func(){
req,err := http.NewRequest("http://...")
res,err := http.DefaultClient.Do(req.WithContext(ctx))
defer res.Body.Close()
// 读取res响应结果。
}()

select {
case <- ctx.Done():
case <- time.After(3 * time.Second):
cancel() // Timeout to cancel all requests.
}

使用组合,通过req.WithContext再返回一个*http.Request,实现同样的目的。

GOLANG使用简单类型,在协议解析的妙用

技术讨论winlin 发表了文章 • 0 个评论 • 646 次浏览 • 2017-05-11 15:41 • 来自相关话题

在协议解析中,经常需要用到转换不同的含义,比如声音的采样率,在FLV中定义和AAC中定义是不同的。在FLV中只有4中采样率5512, 11025, 22050, 44100。而在AAC中有16种采样率96000, 8... 查看全部

在协议解析中,经常需要用到转换不同的含义,比如声音的采样率,在FLV中定义和AAC中定义是不同的。在FLV中只有4中采样率5512, 11025, 22050, 44100。而在AAC中有16种采样率96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350(还有4个是保留的)。也就是说,1在FLV中标识11025Hz,而在AAC中表示的是88200Hz。如何实现这个转换呢?


C++当然先得定义枚举:


enum SrsAudioSampleRate
{
SrsAudioSampleRate5512 = 0,
SrsAudioSampleRate11025,
SrsAudioSampleRate22050,
SrsAudioSampleRate44100,
SrsAudioSampleRateForbidden,
};

C++当然是用函数了:


SrsAudioSampleRate aac_to_flv(int v) {
if (v >= 0 && v <=5) {
return SrsAudioSampleRate44100;
} else if (v >=6 && v <= 8) {
return SrsAudioSampleRate22050;
} else if (v >= 9 && v <= 11) {
return SrsAudioSampleRate11025;
} else if (v == 12) {
return SrsAudioSampleRate5512;
} else {
return SrsAudioSampleRateForbidden;
}
}

看起来还是挺简单的。慢着,还有的时候需要打印出采样率来,所以还得搞个函数:


string srs_audio_sample_rate2str(SrsAudioSampleRate v)
{
switch (v) {
case SrsAudioSampleRate5512: return "5512";
case SrsAudioSampleRate11025: return "11025";
case SrsAudioSampleRate22050: return "22050";
case SrsAudioSampleRate44100: return "44100";
default: return "Forbidden";
}
}

拿到一个AAC的采样率,然后转换成FLV的,并打印出来,是这么使用的:


// 从文件或者流中读取出AAC的采样率的值。
int samplingFrequencyIndex = ...;
// 转换成FLV的采样率。
SrsAudioSampleRate sampleRate = aac_to_flv(samplingFrequencyIndex);
// 转换成字符串格式。
string sSampleRate = srs_audio_sample_rate2str(sampleRate);
// 打印采样率。
printf("SampleRate=%d/%sHz\n", sampleRate, sSampleRate);

有什么麻烦的呢?



  1. 函数和类型之间没有关系,每次使用的时候都得去翻手册啊翻手册。

  2. 如果定义成一个struct,那转换的时候又太麻烦了。


还能不能愉快的玩耍呢?用GOLANG吧!先看用法:


var sampleRate AudioSamplingRate
sampleRate.From(samplingFrequencyIndex)
fmt.Printf("SampleRate=%d/%v\n", sampleRate, sampleRate)

就是这么简单(此处应该有掌声)~


其实实现起来也非常自然:


type AudioSamplingRate uint8

const (
AudioSamplingRate5kHz AudioSamplingRate = iota // 0 = 5.5 kHz
AudioSamplingRate11kHz // 1 = 11 kHz
AudioSamplingRate22kHz // 2 = 22 kHz
AudioSamplingRate44kHz // 3 = 44 kHz
AudioSamplingRateForbidden
)

func (v AudioSamplingRate) String() string {
switch v {
case AudioSamplingRate5kHz:
return "5.5kHz"
case AudioSamplingRate11kHz:
return "11kHz"
case AudioSamplingRate22kHz:
return "22kHz"
case AudioSamplingRate44kHz:
return "44kHz"
default:
return "Forbidden"
}
}

func (v *AudioSamplingRate) From(a int) {
switch a {
case 0, 1, 2, 3, 4, 5:
*v = AudioSamplingRate44kHz
case 6, 7, 8:
*v = AudioSamplingRate22kHz
case 9, 10, 11:
*v = AudioSamplingRate11kHz
case 12:
*v = AudioSamplingRate5kHz
default:
*v = AudioSamplingRateForbidden
}
}


Remark: 代码参考go-oryx-lib flv.



有几个地方非常不同:



  1. 虽然GOLANG只是在uint8上面加了函数,但是使用起来方便很多了,以前在C++中用这两个枚举,每次都要跳到枚举的定义来看对应的函数是什么。

  2. GOLANG的switch比较强大,可以case好几个值,和C++的if有点想,但是GOLANG的case更直观,知道这几个值会被转换成另外的值,而if读起来像是将一个范围的值转换,不好懂。

  3. GOLANG的枚举使用const实现,也可以带类型,而且有个iota很强大,特别是在定义那些移位的枚举时就很好用。


好吧,这只是几个小的改进,虽然用起来很方便。来看看在AMF0中基本类型的妙用,AMF0是一种传输格式,和JSON很像,不过JSON是文本的,而AMF0是字节的,都是用来在网络中传输对象的。因此,AMF0定义了几个基本的类型:String, Number, Boolean, Object,其中Object的属性定义为String的属性名和值,值可以是其他的类型。


先看看C++的实现,首先定义一个AMF0Any对象,可以转换成具体的String或者Object等对象:


class SrsAmf0Any {
// 提供转换的函数,获取实际的值。
virtual std::string to_str();
virtual bool to_boolean();
virtual double to_number();
virtual SrsAmf0Object* to_object();
// 当然还得提供判断的函数,得知道是什么类型才能转。
virtual bool is_string();
virtual bool is_boolean();
virtual bool is_number();
virtual bool is_object();
// 提供创建基本类型的函数。
static SrsAmf0Any* str(const char* value = NULL);
static SrsAmf0Any* boolean(bool value = false);
static SrsAmf0Any* number(double value = 0.0);
static SrsAmf0Object* object();
};

在实现时,String和Number等基本类型可以隐藏起来(在cpp中实现):


namespace _srs_internal {
class SrsAmf0String : public SrsAmf0Any {
public:
std::string value;
// 当然它必须实现编码和解码的函数。
virtual int total_size();
virtual int read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream);
};
}

AMF0Object当然得暴露出来的:


class SrsAmf0Object : public SrsAmf0Any {
public:
virtual int total_size();
virtual int read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream);
// 提供设置和读取属性的方法。
virtual void set(std::string key, SrsAmf0Any* value);
virtual SrsAmf0Any* get_property(std::string name);
};

用起来是这样:


// 设置Object的属性,并发送给服务器。
SrsConnectAppPacket* pkt = NULL;
pkt->command_object->set("app", SrsAmf0Any::str(app.c_str()));
pkt->command_object->set("tcUrl", SrsAmf0Any::str(tcUrl.c_str()));

// 读取服务器的响应,取出服务器的IP等信息。
SrsConnectAppResPacket* pkt = NULL;
SrsAmf0Any* data = pkt->info->get_property("data");
if (si && data && data->is_object()) {
SrsAmf0Object* obj = data->to_objet();

SrsAmf0Any* prop = obj->get_property("srs_server_ip");
if (prop && prop->is_string()) {
printf("Server IP: %s\n", prop->to_str().c_str());
}

prop = obj->get_property("srs_pid");
if (prop && prop->is_number()) {
printf("Server PID: %d\n, prop->to_number());
}
}

看起来巨繁琐吧?快用GOLANG,如果换成GOLANG,可以用基本类型定义AMF0的基本类型,这样使用起来是这样:


pkt := or.NewConnectAppPacket()
pkt.CommandObject.Set("tcUrl", amf0.NewString(tcUrl))
pkt.CommandObject.Set("app", amf0.NewString(app))

var res *or.ConnectAppResPacket
if data, ok := res.Args.Get("data").(*amf0.Object); ok {
if data, ok := data.Get("srs_server_ip").(*amf0.String); ok {
fmt.Printf("Server IP: %s\n", string(*data))
}
if data, ok := data.Get("srs_pid").(*amf0.Number); ok {
fmt.Printf("Server PID: %d\n, int(*data))
}
}

区别在于:



  1. C++由于不能在基本类型上定义方法,导致必须创建struct或者class类型,有比较繁琐的类型转换和判断。

  2. GOLANG的类型判断,提供了ok的方式,一句话就能把类型转换弄好,而且接口和实现struct的对象可以重用变量名。

  3. 不必加很多类型判断,没有多余的变量,干净利索,需要维护的信息比较少。


实现起来更舒服,基本类型不用定义struct:


type String string
func (v *String) Size() int {}
func (v *String) UnmarshalBinary(data []byte) (err error) {}
func (v *String) MarshalBinary() (data []byte, err error) {}

type Object struct {}
func (v *Object) Size() int {}
func (v *Object) UnmarshalBinary(data []byte) (err error) {}
func (v *Object) MarshalBinary() (data []byte, err error) {}


Remark:代码参考go-oryx-lib amf0.



更神奇的是,因为Object、EcmaArray和StrictArray都是类似的结构,但是有些细微的差异,因此使用GOLANG的结构体嵌套可以很直接的解决问题:


type Object struct {
objectBase
eof objectEOF
}
type EcmaArray struct {
objectBase
count uint32
eof objectEOF
}
type StrictArray struct {
objectBase
count uint32
}

可以对比下SRS的实现,C++可以采用继承,而GOLANG直接组合那些基本的单元。


爱生活,爱够浪(此处可以响起掌声了)~

GOLANG将类型作为参数,用反射设置指针的指针,实现类似模板功能

技术讨论winlin 发表了文章 • 0 个评论 • 152 次浏览 • 2017-05-09 20:13 • 来自相关话题

在协议解析中,C++的模板有比较大的作用,有时候我们希望丢弃所有的包,只留下特定类型的包。参考SRS的代码查看全部

在协议解析中,C++的模板有比较大的作用,有时候我们希望丢弃所有的包,只留下特定类型的包。参考SRS的代码SrsRtmpClient::connect_app2


类型系统的设计, SrsConnectAppResPacket继承自SrsPacket


class SrsPacket;
class SrsConnectAppResPacket : public SrsPacket

协议栈提供了expect_message模板函数,接收特定类型的包:


SrsCommonMessage* msg = NULL;
SrsConnectAppResPacket* pkt = NULL;
if ((ret = protocol.expect_message<SrsConnectAppResPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
return ret;
}

SrsAmf0Any* data = pkt->info->get_property("data");
SrsAmf0EcmaArray* arr = data->to_ecma_array();
SrsAmf0Any* prop = arr->ensure_property_string("srs_server_ip");
string srs_server_ip = prop->to_str();

在向服务器发送了ConnectApp后,就等待ConnectAppRes响应包,丢弃所有的其他的。这个时候,类型SrsConnectAppResPacket就作为了一个参数,也就是C++的模板。如果是GOLANG怎么实现呢?没有直接的办法的,因为没有泛型。


在GOLANG中,也需要定义个interface,参考Packet,当然也是有ConnectAppResPacket实现了这个接口(Message是原始消息,它的Payload可以Unmarshal为Packet):


type Message struct { Payload []byte }
type Packet interface {} // Message.Payload = Packet.Marshal()
type ConnectAppResPacket struct { Args amf0.Amf0 }

第一种方法,协议栈只需要收取Message,然后解析Message为Packet,收到packet后使用类型转换,判断不是自己需要的包就丢弃:


func (v *Protocol) ReadMessage() (m *Message, err error)
func (v *Protocol) DecodeMessage(m *Message) (pkt Packet, err error)

不过这两个基础的API,User在使用时,比较麻烦些,每次都得写一个for循环:


var protocol *Protocol

for {
var m *Message
m,_ = protocol.ReadMessage()

var p Packet
p,_ = protocol.DecodeMessage(m)

if res,ok := p.(*ConnectAppResPacket); ok {
if data, ok := res.Args.Get("data").(*amf0.EcmaArray); ok {
if data, ok := data.Get("srs_server_ip").(*amf0.String); ok {
srs_server_ip = string(*data)
}
}
}
}

比较方便的做法,就是用回调函数,协议栈需要提供个ExpectPacket方法:


func (v *Protocol) ExpectPacket(filter func(m *Message, p Packet)(ok bool)) (err error)

这样可以利用回调函数可以访问上面函数的作用域,直接转换类型和设置目标类型的包:


var protocol *Protocol

var res *ConnectAppResPacket
_ = protocol.ExpectPacket(func(m *Message, p Packet) (ok bool){
res,ok = p.(*ConnectAppResPacket)
})

if data, ok := res.Args.Get("data").(*amf0.EcmaArray); ok {
if data, ok := data.Get("srs_server_ip").(*amf0.String); ok {
srs_server_ip = string(*data)
}
}

这样已经比较方便了,不过还是需要每次都给个回调函数。要是能直接这样用就好了:


var protocol *Protocol

var res *ConnectAppResPacket
_ = protocol.ExpectPacket(&res)

if data, ok := res.Args.Get("data").(*amf0.EcmaArray); ok {
if data, ok := data.Get("srs_server_ip").(*amf0.String); ok {
srs_server_ip = string(*data)
}
}

这样也是可以做到的,不过协议栈函数要定义为:


func (v *Protocol) ExpectPacket(ppkt interface{}) (err error)

在函数内部,使用reflect判断类型是否符合要求,设置返回值。代码参考ExpectPacket,下面是一个简要说明:


func (v *Protocol) ExpectPacket(ppkt interface{}) (m *Message, err error) {
// 由于ppkt是**ptr, 所以取类型后取Elem(),就是*ptr,用来判断是否实现了Packet接口。
ppktt := reflect.TypeOf(ppkt).Elem()
// ppktv是发现匹配的包后,设置值的。
ppktv := reflect.ValueOf(ppkt)

// 要求参数必须是实现了Packet,避免传递错误的值进来。
if required := reflect.TypeOf((*Packet)(nil)).Elem(); !ppktt.Implements(required) {
return nil,fmt.Errorf("Type mismatch")
}

for {
m, err = v.ReadMessage()
pkt, err = v.DecodeMessage(m)

// 判断包是否是匹配的那个类型,如果不是就丢弃这个包。
if pktt = reflect.TypeOf(pkt); !pktt.AssignableTo(ppktt) {
continue
}

// 相当于 *ppkt = pkt,类似C++中对指针的指针赋值。
ppktv.Elem().Set(reflect.ValueOf(pkt))
break
}
return
}

遗憾的就是这个参数ppkt类型不能是Packet,因为会有类型不匹配;也不能是*Packet,因为在GOLANG中传递接口的指针也是不可以的,会导致类型错误(**ConnectAppResPacket并不能匹配*Packet);这个参数只能是interface{}。不过用法也很简单,只是需要注意参数的传递。


var res *ConnectAppResPacket
// 这是正确的做法,传递res指针的地址,相当于指针的指针。
_ = protocol.ExpectPacket(&res)
// 这是错误的做法,会在ExpectPacket检查返回错误,没有实现Packet接口
_ = protocol.ExpectPacket(res)

用起来还不错。

用docker-machine创建Docker Swarm集群

文章分享myml 发表了文章 • 0 个评论 • 200 次浏览 • 2017-03-02 17:57 • 来自相关话题

参考文档 Install and Create a Docker Swarm


安装


要先安装virtualboxDocker Machine,Docker Machine 是一个简化Docker安装的命令行工具,在非linux系统用docker的同学应该用过。


加速


dockerHub访问比较慢,docker-machine执行create时加上--engine-registry-mirror参数来进行加速,例如docker-machine create -d virtualbox --engine-registry-mirror=https://3cd767jz.mirror.aliyuncs.com local


获取Token


已有docker环境
执行docker run swarm create来从dockerHub获取一个全球唯一的token


没有docker环境
执行docker-machine create -d virtualbox local创建一个docker环境


执行eval $(docker-machine env local) 进入刚创建的local,


再执行docker run swarm create获取token,
很简单吧。machine还有其它一些实用功能,可以自行查看文档


创建master


执行docker-machine create -d virtualbox --swarm --swarm-master --swarm-discovery token://$token swarm-master
$token请替换成上一步骤拿到的token


创建节点


创建节点和创建master类似,只是把--swarm-master参数去掉,名字改下。


执行docker-machine create -d virtualbox --swarm --swarm-discovery token://$token swarm-node-0


再创建一个docker-machine create -d virtualbox --swarm --swarm-discovery token://$token swarm-node-1
执行docker-machine ls可以看到


swarm-master   * (swarm)   virtualbox   Running   tcp://192.168.99.100:2376   swarm-master (master)   v17.03.0-ce   
swarm-node-0 - virtualbox Running tcp://192.168.99.101:2376 swarm-master v17.03.0-ce
swarm-node-0 - virtualbox Running tcp://192.168.99.102:2376 swarm-master v17.03.0-ce

执行eval $(docker-machine env --swarm swarm-master) !注意这里加上了--swarm参数,进入master,执行docker info可以看到集群信息

使用什么docker image来运行Go程序

有问必答adolphlwq 回复了问题 • 16 人关注 • 16 个回复 • 1571 次浏览 • 2017-02-28 11:20 • 来自相关话题

dockerSSH用ssh连接到docker

开源程序myml 发表了文章 • 5 个评论 • 325 次浏览 • 2017-02-11 21:59 • 来自相关话题

dockerSSH

用golang.org/x/crypto/ssh 实现的ssh服务器,能直接让你通过ss... 查看全部

dockerSSH


用golang.org/x/crypto/ssh 实现的ssh服务器,能直接让你通过ssh登陆到docker容器内,比如ssh ff756b3ea527@127.0.0.1 会登陆到ID为ff756b3ea527的容器,刚实现的尚不成熟,还有一些功能想去实现,比如scp,sftp

在docker swarm中使用grpc如何做负载均衡

有问必答jmzwcn 回复了问题 • 1 人关注 • 1 个回复 • 600 次浏览 • 2017-02-07 19:40 • 来自相关话题

goroutine leak

回复

有问必答adolphlwq 发起了问题 • 3 人关注 • 0 个回复 • 377 次浏览 • 2017-01-05 22:28 • 来自相关话题

Docker 1.13最实用命令行:终于可以愉快地打扫房间了

文章分享数人云 发表了文章 • 0 个评论 • 548 次浏览 • 2016-12-22 11:09 • 来自相关话题

Docker 1.13出来已经有一段时间了,新版本添加了许多有用的命令,本文作者从处女座的洁癖(此处有雾)出发,告诉大家一些整理环境的小技巧。打扫房间... 查看全部


Docker 1.13出来已经有一段时间了,新版本添加了许多有用的命令,本文作者从处女座的洁癖(此处有雾)出发,告诉大家一些整理环境的小技巧。打扫房间再也不需费时又费力了,简单的命令,就可以轻松地把物品分门别类(容器、镜像、网络、存储卷……)地整理好^_^



在1.13版本中,Docker向CLI添加了一些有用的命令,让环境更加整洁。你可能已经体验了很长时间乱糟糟的开发环境——无用的容器,挂起的Docker镜像,弃置的volume,被遗忘的网络……所有这些过时的事物占据了宝贵的资源,最终导致环境无法使用。在之前的文章中曾经提到用各种各样的命令保持环境的整洁,例如:


docker rm -f $(docker ps -aq)


强制地删除所有正在运行的、暂停的以及终止的容器。同样地,也有命令可以删除挂起的镜像、网络和volume。


尽管上述命令解决了问题,但是它们要么专有,要么冗长或者难用。而新加入的命令直截了当又简单好用,现在就开始一一介绍吧。


管理命令


为了整理CLI,Docker 1.13引进了新的管理命令,如下:



  • system

  • container

  • image

  • plugin

  • secret


Docker的老版本中已经有了 network, node, service, swarm 和 volume 。这些新命令组子命令过去作为root命令直接实现。举个例子:


docker exec -it [container-name] [some-command]


exec 命令现在是 container 下面的一个子命令,这个命令相当于:


docker container exec -it [container-name] [some-command]


个人猜测为了兼容性的考虑,旧语句眼下还会使用一段时间。


Docker系统


现在有一个新管理命令 system 。它有4个子命令分别是 df, events, info 和 prune 。命令 docker system df 提供Docker整体磁盘使用率的概况,包括镜像、容器和(本地)volume。所以我们现在随时都可以查看Docker使用了多少资源。


如果之前的命令展示出 docker 已经占用了太多空间,我们会开始清理。有一个包办一切的命令:


docker system prune


这个命令会删除当前没有被使用的一切项目,它按照一种正确的序列进行清理,所以会达到最大化的输出结果。首先删除没有被使用的容器,然后是volume和网络,最后是挂起的镜像。通过使用 y 回复来确认操作。如果想在脚本中使用这个命令,可以使用参数 --force 或者 -f 告诉Docker不要发来确认请求。


Docker容器


我们已经知道许多 docker container 的子命令。它们过去(现在也是)是 docker 的直接子命令。可以通过下面的命令得到完整的子命令列表:


docker container --help


在列表中会看到一个 prune 命令。如果使用它,那么只会删除无用的容器。因此这条命令比 docker system prune 命令更局限。使用 --force 或者 -f 同意可以让CLI不再进行确认请求。


Docker网络


这里也有一个 prune 命令:


docker network prune


删除所有孤立的网络。


Docker Volume


volume也有新的 prune 命令了:


docker volume prune


删除所有(本地)没有被容器使用的volume。


Docker镜像


新的镜像命令也是 prune 子命令。--force 用法如上面一样, --all 可以删除所有不用的镜像,不只挂起的镜像。


docker image prune --force --all


这个命令可以删除所有不使用的镜像并且不再请求确认。


总结


Docker 1.13不仅通过引入admin command添加了一些需要的命令,也让我们找到了一些非常有用的清理环境的命令。笔者最爱的命令莫过于 docker system prune,让环境一直保持干净整齐。


本文作者:Gabriel Schenker
原文链接:https://lostechies.com/gabrielschenker/2016/12/12/containers-cleanup-your-house-revisited/

数人云Meetup|分布式架构的开源组件大选

线下活动数人云 发表了文章 • 0 个评论 • 292 次浏览 • 2016-11-29 11:30 • 来自相关话题

开源组件与工具方便快捷,

在分布式架构中扮演着重要的角色,

企业按照实际需求也会对开源组件做更多优化,查看全部


开源组件与工具方便快捷,


在分布式架构中扮演着重要的角色,


企业按照实际需求也会对开源组件做更多优化,


无论是开源使用者还是开源贡献者,


面对各种各样的开源组件——容器、编排、大数据、数据库、监控、持续集成等等都会有苦恼,


选择哪一个?


不如,


我们通过「分布式架构的开源组件大选」来看看微软、阿里、PingCAP、数人云这些企业是如何用开源组件来做分布式架构,


并为开源做出了哪些贡献:)


活动中不仅可以了解各种开源组件实践,


还有嘉宾的开源成长分享以及程序员的职业规划建议,


愿来到现场的你可以有更多的收获~


嘉宾及议题介绍



刘鹏,微软云计算事业部 -高级软件开发工程师


曾在Bing新闻团队担任核心技术开发,擅长海量数据分析和系统设计,目前任职微软云计算事业部,主导开发基于Azure的融媒体解决方案


Topic1:《使用visual studio code的docker插件开发和调试nodejs应用程序》


1.VSCode简介
2.搭建nodejs + Docker开发环境
3.利用VSCode远程debug nodejs in Docker



杨成虎,阿里花名(叶翔),分布式和数据库高级技术专家


11年加入淘宝研发开源分布式缓存系统TAIR,此后一直在NoSQL领域摸爬滚打,利用NoSQL技术支撑了淘宝的天文交易量。当前专注于NoSQL的云服务化,正带领团队在MongoDB云服务上深耕细作。


Topic2:《阿里云ApsaraDB云数据库管理之道》


1.云数据库生命周期—生产,维护,下线
2.云数据库监控调度—大数据监控分析



黄东旭,PingCAP 联合创始人兼 CTO
曾就职与微软亚洲研究院,网易有道及豌豆荚,资深基础软件工程师,架构师。


擅长分布式系统以及数据库开发,在分布式存储领域有丰富的经验和独到的见解。狂热的开源爱好者以及开源软件作者,代表作品分布式 Redis 缓存方案 Codis,以及分布式关系型数据库 TiDB。


Topic3:《无痛的 MySQL 扩展方案 - TiDB 运维实践》


分享 TiDB 的用户在实际线上场景中使用 TiDB 来解决 MySQL 扩展性问题中得到的最佳实践和一些运维 tips,为 MySQL 的扩展提供一种新的思路。



春明,数人云资深架构师


8年互联网游戏后台开发经验, 先后就职于OpenFeint, GREE,Kabam等国际知名互联网游戏公司, Docker爱好者。数人云开源容器管理工具 Crane 和开源 Mesos 调度器 Swan 的主要研发者


Topic4:《一款分布式系统应用开发手记》


1.gPRC使用手记
2.将Raft集成到Mesos调度器Swan中
3.Actor模式和事件驱动


议程介绍


13:30 - 14:00 签到


14:00 - 14:40《使用visual studio code的docker插件开发和调试nodejs应用程序》刘鹏@微软


14:40 - 15:20《一款分布式系统应用开发手记》春明@数人云


15:20 - 16:00《阿里云ApsaraDB云数据库管理之道》杨成虎@阿里云


16:00 - 16:40《无痛的 MySQL 扩展方案 - TiDB 运维实践》黄东旭@PingCAP


16:40 - 17:00 自由交流


联合主办:DBAplus社群


时间:12月17日 14:00 - 17:00


地点:北京市海淀区丹棱街5号微软大厦1号楼1层故宫会议室



点击报名

来自沪江、滴滴、蘑菇街架构师的 Docker 实践分享

文章分享lalala 发表了文章 • 1 个评论 • 367 次浏览 • 2016-11-14 18:05 • 来自相关话题


架构师小组交流会是由国内知名公司架构师参与的技术交流会,每期选择一个时下最热门的技术话题进行实践经验分享。


Docker 作为当前最具颠覆性的开源技术之一,其轻量虚拟化、可移植性是
CI/CD、DevOps、微服务的重要实现技术。但目前技术还不够成熟,在生产实践中还存在很多问题。对此,沪江黄凯、滴滴田智伟、蘑菇街张振华、蘑菇街向靖、扇贝丁彦以及七牛云袁晓沛在本期交流会上分享了各自的经验。本文是对此次交流的整理,欢迎探讨。





自由交流


沪江黄凯


大家好,我是来自沪江的 Java 架构师,我叫黄凯。在加入沪江之前,曾在 HP 和 IBM 的云计算部门担任核心开发和架构职位。对 IaaS、PaaS、SaaS,尤其是云存储有较深入的了解。2015 年加入沪江,担任架构师职位,主导的产品有:课件云存储,云转码等等。在这些项目中,我们使用 Mesos 和 Marathon 做 Docker 的编排工具,并开发了一个 Mesos Framework 做云转码的核心框架。


那么我们为什么要使用 Docker,也是机缘巧合。由于我们的服务开始的时候不是特别多,采用的就是一种普通的架构,后来随着服务的增多,发现部署和运维花的时间太长,我们想使用一些新的方式。开始的时候研究过 Openstack,后来觉得 Openstack 慢慢没落,于是我们就选中现在使用的 Docker。我们并不把 Docker 当成 VM 在用,而是使用它的原生的,在 Baremetal 上直接安装 Docker,这样运行效率比在 VM 运行 Docker 要来的快。课件云是由很多微服务组成,不光是一些存储,这种微服务是使用 Docker 部署,就相当于编排,把这些微服务部署上去。转码这一块是使用了 Mesos 框架,和 Docker 没有特别大的关系,但是转码的应用程序,比如说我们现在应用 FFmpeg,这个程序是运行在 Docker 里面的。


为什么要选择 Marathon?第一,我觉得 Mesos+Marathon 非常的容易理解。我们也研究过 Kubernetes 和其他的一些方法,发现从运维和研究的方面来说的话,Kubernetes 实在是太重而且太复杂,后来选择了Marathon。我们现在是内部服务使用,两个部门在使用转码集群,大概是 Baremetal 有 20 台的物理机。除去我们 API 的一些服务,还有一些第三方组件的服务的话,大概是有 400 多个 Docker 容器在跑。


滴滴田智伟


大家好,我是滴滴代驾事业部架构师,代驾事业部是公司最早尝试 Docker 虚拟化的事业部。目前主要方向是业务系统及部分中间件的 Docker 化,我们做的时间也不太长,半年多的时间。线上是因为我们有老的一套发布系统,集成涉及的部门比较多,所以我们基于原来的发布系统完成了预发布环境 Docker 的部署。线下环境基于 Docker+K8s 开发内部的自动化持续交付系统及开发测试环境管理。我们在做开发和测试环境的自动化,另一方面也是做环境管理的,两套环境。对于项目并行的时候发现原来很多不够用,原来很多配置是基于端口绑死的情况。现在基于开发 Kubernetes 的话,网络隔离用了一部分,然后主要是用环境变量这一部分,主要考虑是解决一个配置可以应用到在多个环境的情况,基于这个需求才用它。开发 Kubernetes 基于 Namespace,同一个服务在不同的 Namespace 下,它其实环境变量名可以是相同的,但是IP不同,而这一部分 IP 其实是由开发 Kubernetes 自己去管理的。基于环境变量获取一些配置的话,比如 IP 地址这种,就可以做到拿一份配置可以打出多套环境。


考虑业务的安全性和稳定性,线上基于纯 Docker 的方式在做。我们是基于裸的 Docker 来工作,主要是用资源隔离,没有借助调度框架,也没有自动伸缩。我们是两步走,一步是验证 Docker,其次是做开发 Kubernetes 线下使用和预研。为什么没有考虑 Mesos?刚才跟沪江的同学,我们的考虑是相反的。Mesos 侧重点更专一一点,首先不会有模块的划分,比如 Kubernetes 有 Replication controller ,Namespace 这种概念,而 Mesos 下几乎没有这种概念。我们拿 Kubernetes 主要是做一些编排的功能,而正好开发 Kubernetes 在整个发布和编排上,体系更全面一点。Mesos 最早是做资源管理,基于 Docker 做一个 Framework 接进来的话,它不是专门为编排而生。Kubernetes 首先解决我们的问题是,我们可能不需要加多份配置就可以搭多套不同的环境,它就是基于 Namespace 做一个多租户的概念,会对 Service 做一层隔离,对于动态配置,扩容这一部分暂时我们没用到,确实用到的一些场景比较少。主要是做不同环境的隔离,并没有太多使用编排细节上的东西,动态伸缩之类的目前线下没有太大必要,线上可能会用到。


蘑菇街向靖


大家好,我是向靖,来自蘑菇街的运维架构师。我们接下来会做一个 PaaS 平台,想做 Docker 和结合虚拟机以及我们用到公有云产品,做成一个混合云的架构平台。我们现在 Docker 也在用,更多的是当虚拟机用,后面我们想基于 Docker 原生的方式去用,可能会涉及资源调度,服务发现的问题。除了 Docker,我们还会用到公有云,公有云更多是虚拟机的方式提供。出于混合云,想在资源层做一个抽象,对于上层业务来讲它没有关系,它是跑在 Docker 上,还是云主机上,还是 KVM 虚拟机上,那么我想在这上面做一个抽象。另外还有,刚才我也是提问滴滴架构师的问题,配置怎样和代码做隔离,这个也是我考虑的问题。因为我看 Docker 用了环境变量,通过环境变量做一些配置的参数的传递,但是在虚拟机上,特别是在物理机上,通过环境变量的方式,我还在考虑有没有安全的风险,Docker 可能是一个只读的,不会被修改的,但是对于虚拟机以及物理机来说,可能会存在被修改的风险。


蘑菇街张振华


大家好,我叫张振华,花名郭嘉,我是 14 年从思科加入蘑菇街。我们算是国内用 Docker 比较早的,我们一开始用 Docker 是 1.3.2 的版本,当时我们采用集群管理工具还是 Openstack,因为当时 Kubernetes 还不是很成熟。当时也走了一些弯路,比如我们把 Docker 当成虚拟机来用,曾经在线上的规模也达到几百台虚拟机几千个容器,但是我们逐步发现不能把 Docker 当成虚拟机来使用,因此我们做了一个转型,从去年开始研究 Kubernetes,现在 Kubernetes 加 Docker 的版本开发完成了,准备逐步上线。


我们为什么选用 Kubernetes?编排工具的选择我们也是做过一番调研的,它们没有谁好谁不好这一说,只能说谁更贴切你的需求。对于我们蘑菇街来说,我们需要解决是资源利用率的问题,和运维的对接,我们需要有预发和线上环境的持续集成持续部署的过程,还有我们需要有对资源的隔离,对部署的快速迭代,包括集群管理,这些方面,我们觉得 Kubernetes 更加适合于我们。


在网络方面,我们研究过现在在开源界比较常用的一些方案,但是我们都觉得不太适合,比较 Fannel,Caico 等等,他们一般用的技术都是 VXLAN,或者是用 BGP。因为我们之前对 Openstack 的网络是比较有经验的,然后我们发现有一个项目,具体名字不记得,Neutron 和 Kubernetes 做一个对接,我们在这个项目的基础上做了 VLAN 的方案,我们的网络没有用 VXLAN 来做,而是选择 VLAN 来做,这样的话一个 Docker 它可以获得跟一个物理理同一个网络平面的 IP,我们的应用程序可以直接对外访问,因为我们内部业务有这个需求选择这个方案。虽然 Docker 内部网络和外部网络是通的,但 Docker 还是独立的一个网段,不需要一层 NAT 的转换。我们直接走二层的,是在交换机走 Chunk,本来物理机交换机的 Access 口,这样的话,一台物理机上面允许跑多个 VLAN 的容器,比如说 A 业务和 B 业务要走隔离的话,通过网络的 VLAN 走隔离,它们的数据之间不会有干扰。


Load Balance 我们还没有涉及到这一块,Load Balance 我们应该会在 nginx 上做一层。因为据我了解,现在 Kubernetes 这一块 Proxy 还不是很成熟,这上面还存在一些问题,因此还不敢用 Kubernetes 现有提供的服务。服务发现和注册这一块我们还在做开发,这块会和配置管理中心打通。我们内部也有其他团队在做这些功能,所以我们会和内部的中间件团队合作。


七牛云袁晓沛


大家好,我是七牛云数据处理技术总监袁晓沛。我们的数据处理业务包括了图片和视频的实时在线及异步处理。数据处理的业务量比较大,日均请求量达到百亿级。平台采用容器技术的原因是借助容器技术快速部署,启动的特性,数据处理程序可以根据数据处理量快速地弹性伸缩。借助容器技术内核级别的资源隔离和访问控制,每个数据处理程序可以运行在一个私有的环境,不被其它程序所干扰,保证其上运行数据是安全可靠的。而且容器技术是轻量的,它以最小的资源损耗提供资源隔离和访问控制,而资源特别是计算资源在数据处理中是非常宝贵的。


我们在资源调度上采用的是 Mesos,而二层的业务调度框架则是自己自研的。七牛自身拥有近千台的物理机,容器是直接运行的物理机上,可以减少虚拟层对资源的消耗,提高资源的利用率。


在网络上,对于七牛的自定义数据处理服务直接使用的是 Host 模式,而对第三方数据处理服务则使用的是 Bridge 模式,因为这些程序是用户自己部署运行的,并不知道用户是否有开启其他的端口使用,所以使用的是 Bridge 模式,需要对外使用端口的都需要通过 NAT 进行暴露,这样服务内部使用了什么端口并不会对外界环境造成影响,对平台环境做了非常好的安全隔离。我们是使用 Consul 做注册中心,支持跨数据中心的服务发现。我们为什么自研的调度框架,而不用 Marathon。因为 Marathon 不支持跨数据中心的内部服务或外部服务的发现,而七牛有多个数据中心,影响整体的调度,其次如果选用 Marathon 的话,根据我们业务的特点,还是要再做一层对 Marathon 的包装才能作为 Dora 的调度服务,这样模块就会变多,部署运维会复杂。


扇贝丁彦


大家好,我是扇贝的技术总监丁彦,之前在暴走漫画,先后在暴走漫画和扇贝设计和主导了基于 Docker 的微服务架构系统,以及数据收集和分析系统。去年来到扇贝,这里是 Python 的开发环境。后来发现业务增长快,水平扩展一些机器,出现问题需要换个机器等等,都需要非常熟悉业务的少数开发去做。另外公司对预算控制严格,机器基本都是满负荷运作,平时也不可能多开空置的机器,已有的机器还要根据负载情况调整服务分布情况,所以这种切换服务,增删服务的操作还是比较频繁的。因此,我们用了 2-3 个月的时间将所有的运行环境都切换到 Docker上,这大大提高了我们的运维能力。


Docker 包装有几个好处。


第一个好处是,环境升级非常方便。因为只要pull 一下最新的镜像,启动一个 Container,环境就升级了。而如果直接基于公有云的镜像升级的话就很难,因为一台机器上跑哪些服务其实不一定是固定的,并且之前做的镜像只要有一台机器是还基于它的话,就删除不掉的,镜像数量又有上限。所以 Docker 非常好地解决了我们的问题。


其次是环境的颗粒度会更小,一台机器上配好几个应用的话,往往配着配着,到最后你就不太能精确地记得上面装的程序或者库是给哪个应用服务的,应用之间如果依赖有版本的冲突也很难调和。你想做些服务的迁移,把负载比较小的放一起,把负载比较大的抽出来,这个时候就非常痛苦,但你如果用 Docker 包装后就非常简单,只要在不同的机器上起不同的 Container,就可以实现这一点。


第三,我们不光用了 Docker,还加入了服务发现,刚刚讨论配置管理这些,我们一并做了。Docker 启动时候,我们自己写了一些工具,可以自定义 Docker 启动参数,包括配置参数,比如说,一些程序要运行的参数,我们主要用两种方式,一种方式是通过环境变量灌进去,还有一种方式让程序的启动脚本支持参数,然后拼接不同的参数灌进去,最终都是落实到 Docker 的启动命令上。服务发现是基于 Consul,Docker 的启动命令是从 Consul 里取的。首先 Consul有 HTTP 的 API,我们是自己写的 pip 包,只要 Include 一下这个包就可以了,Docker 的服务启动后会自动注册到 Consul。比如要在负载后加一个服务,只需要找到一台机器,启动对应的 container,剩下的事情它自己会到 Consul,注册它的参数地址一系列东西,自动把它加进去。所以这些都是自动化的,如果检测那台机器/服务挂了,Health Check 也会到 Consul 里面更新。该增加机器就增加机器,该下线就下线。总体来说,我们的生产环境全部跑在 Docker 上面的,然后区分有状态和无状态两种,有状态的定死在机器上,无状态的灵活的自由切换。还有一点,如果是有状态的容器要定死在机器上的时候,我们一般来说都会采取冗余的结构,至少保证有两个在运行,一个挂了,保证整体的服务在运行。其次基于 Docker,我们还做了一套数据搜集以及分析的机制。数据搜集是基于日志来搜集的,利用 Docker 的 Log driver,把日志打到 Filter,把结果存在存储服务上。同时监控也是基于日志做的。第三部分非生产环境,比如开发环境跟测试环境都是 Docker 做的,因为我们每一个服务都做了 Image、镜像,用容器方式跑的。通过参数来决定启动方式的,我们可以在开发环境以及测试环境采用不同的参数来启动容器。 通过 Consul 来隔离的,因为 Consul 的服务发现,开发、生产、测试环境在不同的自动发现框架里不会相互影响到。目前机器在 120 台左右,基于云服务。有些基础的东西不需要依赖于 Docker,比如说申请云主机,申请的时候就可以指定它的 CPU 和内存这些服务器资源的配置。所以这部分东西还是属于 Human schedule,不是完全让编排的系统自己决定该怎么样。


编排工具我们现在在研究进一步,我刚来这工作的时候,所有的服务没有一个跑在 Docker 上面的,我现在把它迁进来。现在数据增长,已经有一些编排的瓶颈,现在在做调研,可能基于 Swarm,做自动编排的设计。




指定话题交流



主持人:容器多的情况下 Kubernetes 存在性能问题,各位在这方面有没有好的经验?



扇贝丁彦:我们其实也遇到了这个问题,找不到办法所以放弃了 Kubernetes。我们也是用公有云,网络直接依赖公有云的网络,有可能是因为公有云造成的,我没有试过在祼机上试过。


沪江黄凯: Kuberneters 的 Fannel 有一种模式是 VXLAN,它的封装折包是做内核里做的,效率会高一点。容器多就会效率会低是因为,在 Kubernetes 1.2 的时候,走这样的一种模式,数据先到内核态中,然后把数据拉回到用户态,用 Proxy 的方式分发给各个容器当中的。其实在 Kubernetes 1.3 以后,它直接在iptables里设规则,相当于用户数据不用跑到用户态,在内核直接分发出去了,这种效率会非常高。所以可以研究一下 Kubernetes 新版本。


扇贝丁彦:我们碰到过网络方面的问题。默认的 Docker engine 的启动参数里面有个 iptables,不知道大家有没有定制化过,如果不定制化这个参数,它默认会帮你建 iptables 的转发规则,并会开启内核的网络追踪的模块。一开始我们没有注意这件事情,当我们的 Nginx 迁到 Docker 的时候,Nginx 服务瞬间会挂。后来查原因,是因为这些参数会开启网络追踪模块。因为我们的 Nginx 流量非常大,当时只有 3 台 Linux 云主机,分发 http 请求的,然后会导致 3 台Linux宿主机,内存会被刷破,网络会出现堵塞。所以我们关掉了 iptables 参数,并采用 Host 的网络模型。所以它的容器拿到的 IP 就是 Host 的 IP。我们一开始也想上一些 Kubernetes 这些东西,然后发现简单跑个模型根本跑不起来,所以一开始就放弃了这一套东西,直接搞了个裸的 Docker。



主持人:关于跨数据中心容器集群的使用,大家有经验么?



沪江黄凯:我们跨数据中心主要是IP分配上的问题,我们现在也在尝试使用 Calico,如果 Host 网络是通的话,那么它的内部网络也就通了,可以自由划 VLAN,这样你就可以解决跨 Data center 的问题。还有一个问题就在跨 Data center 时,服务注册与发现的问题。这个问题也困扰我们很久了,我们现在使用 Consul 做服务注册与发现。虽然 Consul 它是官方支持跨 Data center,但是我们在使用当中的话会发现注册的 IP,在另外一个注册中心,它会发现的比较慢,甚至有时候出现 IP 冲突的时候。


我们的做法是把 Host 的 IP 地址直接用 Environment 的形式注到 Docker 镜像内部,接下 来 Docker 镜像要注册,它就会读取 App 的 IP,然后发送给 Consul,只要保证 Host 的 IP 和 Docker内部容器的 IP 能够互通的就行了。如果不能通的话,比如说完全和 Host IP 隔离,那么起码有几台机器要暴露出去,又比如说,Consul 它本身自己要暴露出去才能访问到。Host 的 IP 是容器启动之后注进去的,启动命令中把 Host 的 IP 地址加在 -e 的后面,容器在启动之后,它的环境就会有这么一个 IP。我们用 Mesos 就没这个问题,但是用 Kubernetes 就有这个问题。Mesos 会自动帮你把这些东西注入容器中去。


滴滴田智伟:其实 Kubernetes 本身也是可以解决这个问题,我们现在在做线下持续交付的时候。定义完 Service 之后,容器会同一个 Namespace 默认加一个系统环境变量。


沪江黄凯:我们试过,在 Pod 启动之后,Pod 里容器想访问 host 的 IP 地址,是没有办法做到的。


蘑菇街张振华:因为我们之前也遇到这个问题,然后我们业务方,他们可能有一些程序会获取本机 IP 地址,如果是内部的 IP 地址,他们程序可能会出现问题,于是我们当时没有用 Docker 默认的网络,而是采用 VLAN。



主持人:我们提到好多 Mesos、Kubernetes、网络,发现没有提自动伸缩,有没有项目涉及到容器的自动伸缩?



沪江黄凯:我们沪江是基于 Mesos+Marathon 做了自己的一个服务,它这个服务是干嘛的呢,就是监测,不停的监测每一个 Docker 的 CPU 和内存的利用率,一旦超过百分之多少以后,就向 Marathon 发一个命令,说我要扩容,它还可以支持时间点,比如 15 分钟监测一次,如果在 15 分钟发现它超过阈值了,就马上扩容出来,但是缩的话,不是适用于频繁监控,如果小于 20% 的话就会缩,一旦缩的话会影响线上用户的请求。怎么办呢?我们在缩的时候可以规定它的时间点,比如半夜里 2-3 点,访问量少于多少点时候把它缩掉。我们监测的是 Docker 内部的 CPU 的使用率。就是监测一个服务,它可以监控所有同一服务的 Container,比如一个服务有 100 个容器,那么这一百多个 CPU 利用率加起来除于一百,相当于平均的利用率。如果平均利用率超过 80%了,那说明这个集群到了扩展程度了,它会以一种比例来扩展。针对单个容器,可以设置内存的限制。我们给每一个容器呢,比如它只能用 4 个 CPU,只能用 8G 的内存,或者更小一点的内存,这些都设好,设好之后它自动扩展相同规格的容器。这么做是因为 Cgroup 有个问题,当利用率到达了启动的限制,Cgroup 会把这个容器 kill 掉。这个是不可理喻的问题,所以我们想到用 Load scale 来扩容,不让他直接死掉。


滴滴田志伟:关于自动扩容,我们线下的时候遇到一个问题,我们最早的时候是用腾讯的公有云,它限制了 NET 的模块,导致我们最初用 Cgroup 的方案去做,绑定端口。内部使用所有应用,端口是要做分配的,要不然出现端口冲突。然后遇到问题是,在这种情况下,如果要做动态扩容的话,它每次先创建一个,再杀掉一个,导致每次起来的时候就起不来了,因为端口的问题。服务启动的时候端口是随机,会出现冲突问题,因为用的是 Host 的模式。



主持人:关于自动伸缩为什么没有考虑到请求数?因为如果内存占用率如果超过一定预支,那么请求数也可能超过一定预支了。把单个容器所处理的请求数给限定了,那么它内存自然不会超,然后也不会被干掉。



沪江黄凯:我个人认为,第一,请求数很难测,你不知道请求数到多少时要扩容,还不如根据 CPU 到 80%,或者 90% 来的直观。我们的 API 也是根据 CPU 来算的。你真正是高并发的 API 的话,我也测过,最后我们能够监测到的,其实还是 CPU 和内存。


扇贝丁彦:我们扩容是根据响应时间,跟请求数类似,请求数定指标不太好定,我们是根据响应时间,比如平时的响应时间是 50 毫秒,当响应时间是 300 毫秒的时候就要扩容了。



主持人:关于自动伸缩为什么没有考虑到请求数?因为如果内存占用率如果超过一定预支,那么请求数也可能超过一定预支了。把单个容器所处理的请求数给限定了,那么它内存自然不会超,然后也不会被干掉。



沪江黄凯:关于存储,我们是有一些研究的。现在容器存储问题分为两种,Kubernetes 官方支持一种理念,任何一种存储都是一个 Volume。Volume 先于 Docker 存在的,而不是 Docker 启动之后再挂载 Volume。不管是网络存储还是本地存储,全部以卷的形式,挂载在 Pod 里面或者是宿主机上,以 Driver mapper 来驱动这个 Volume,来读到你所要的内容。


还有一种情况,就是 Docker 公司主导的存储模型,任何的存储都是一种驱动。如果你想用 NFS 或者如 Ceph 这样分布式存储的话,让 Ceph 开发 Docker 的驱动,Docker run 的时候指定存储的驱动,Docker storage driver 这种方式,外部的存储在容器内部它展现形式可以是目录,也可以是挂载卷、块的形式。如果用块挂载到容器中,这个容器自己格式化它,或直接读取它都是可以的。它只不过它是相当于用了一个 Driver 的形式,把你的容器和分布式存储建立一个连接而已。对于容器,如果原本绑定块或 Volume,容器出现故障的话,直接把容器杀掉,再启动挂在同样一个 块或Volume 就解决了。优点是直接读取,而不是通过再转一层,效率比较高一点。所有存储都是 Volume 的形式理解度比较高一点,所以我们还是赞同于用 Volume 的形式。


有状态的容器。我知道 k8s 的新的计划,如果你没有用 Kubernetes 最新版本的话,一般来说我们都是容器启动在固定 Host 上,下次启动还是在这台 Host 上,它的存储它的内存,包括一些 log,全部是在这台 Host 上。还有一种是用最新的版本,有个 PetSet 的新 kind,Kubernetes 它自己会记录 Pod 在什么 Host 上启动过,不用自己去指定一定要在某一台 Host 上启动,这种方法比较智能化,但是不是特别稳定的一种方法,因为它是刚刚开发出来的新功能。



主持人:关于自动伸缩为什么没有考虑到请求数?因为如果内存占用率如果超过一定预支,那么请求数也可能超过一定预支了。把单个容器所处理的请求数给限定了,那么它内存自然不会超,然后也不会被干掉。



沪江黄凯:我个人认为还是在同一台机器上起一个新的实例,不要让它做数据迁移,因为数据迁移会占用很多资源。而且如果你的想法是说,所有的分布式的存储只是以 Volume 的形式挂载在宿主同上,这也就没什么问题了。因为存储和 Docker 是完全分开来的。如果只有一个 Volume,存储的可靠性会得不到保障,所以在 Kubernetes 新版本当中,它会建立一个 Volume 的 kind,也相当于建立 RC kind一样,是一个 Pod,那这样由 Kubernetes 来保障这个 Volume 的高可用。

GOLANG使用Context管理关联goroutine

技术讨论winlin 发表了文章 • 6 个评论 • 161 次浏览 • 4 天前 • 来自相关话题

一般一个业务很少不用到goroutine的,因为很多方法是需要等待的,例如http.Server.ListenAndServe这个就是等待的,除非关闭了Server或Listener,否则是不会返回的。除非是一个API服务器,否... 查看全部

一般一个业务很少不用到goroutine的,因为很多方法是需要等待的,例如http.Server.ListenAndServe这个就是等待的,除非关闭了Server或Listener,否则是不会返回的。除非是一个API服务器,否则肯定需要另外起goroutine发起其他的服务,而且对于API服务器来说,在http.Handler的处理函数中一般也需要起goroutine,如何管理这些goroutine,在GOLANG1.7提供context.Context


先看一个简单的,如果启动两个goroutine,一个是HTTP,还有个信号处理的收到退出信号做清理:


wg := sync.WaitGroup{}
defer wg.Wait()

wg.Add(1)
go func() {
defer wg.Done()

ss := make(os.Signal, 0)
signal.Notify(ss, syscall.SIGINT, syscall.SIGTERM)
for s := ss {
fmt.Println("Got signal", s)
break
}
}()

wg.Add(1)
go func() {
defer wg.Done()

svr := &http.Server{ Addr:":8080", Handler:nil, }
fmt.Println(svr.ListenAndServe())
}

很清楚,起了两个goroutine,然后用WaitGroup等待它们退出。如果它们之间没有交互,不互相影响,那真的是蛮简单的,可惜这样是不行的,因为信号的goroutine收到退出信号后,应该通知server退出。暴力一点的是直接调用svr.Close(),但是如果有些请求还需要取消怎么办呢?最好用Context了:


wg := sync.WaitGroup{}
defer wg.Wait()

ctx,cancel := context.WithCancel(context.Background())

wg.Add(1)
go func() {
defer wg.Done()

ss := make(chan os.Signal, 0)
signal.Notify(ss, syscall.SIGINT, syscall.SIGTERM)
select {
case <- ctx.Done():
return
case s := <- ss:
fmt.Println("Got signal", s)
cancel() // 取消请求,通知用到ctx的所有goroutine
return
}
}()

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

svr := &http.Server{ Addr:":8080", Handler:nil, }

go func(){
select {
case <- ctx.Done():
svr.Close()
}
}

fmt.Println(svr.ListenAndServe())
}

这个方式可以在新开goroutine时继续使用,譬如新加一个goroutine,里面读写了UDPConn:


wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

var conn *net.UDPConn
if conn,err = net.Dial("udp", "127.0.0.1:1935"); err != nil {
fmt.Println("Dial UDP server failed, err is", err)
return
}

fmt.Println(UDPRead(ctx, conn))
}()

UDPRead = func(ctx context.Context, conn *net.UDPConn) (err error) {
wg := sync.WaitGroup{}
defer wg.Wait()

ctx, cancel := context.WithCancel(ctx)

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

for {
b := make([]byte, core.MTUSize)
size, _, err := conn.ReadFromUDP(b)
// 处理UDP包 b[:size]
}
}()

select {
case <-ctx.Done():
conn.Close()
}
return
}

如果只是用到HTTP Server,可以这么写:


func run(ctx contex.Context) {
server := &http.Server{Addr: addr, Handler: nil}
go func() {
select {
case <-ctx.Done():
server.Close()
}
}()

http.HandleFunc("/api", func(w http.ResponseWriter, r *http.Request) {
})

fmt.Println(server.ListenAndServe())
}

如果需要提供一个API来让服务器退出,可以这么写:


func run(ctx contex.Context) {
server := &http.Server{Addr: addr, Handler: nil}

ctx, cancel := context.WithCancel(ctx)
http.HandleFunc("/quit", func(w http.ResponseWriter, r *http.Request) {
cancel() // 使用局部的ctx和cancel
})

go func() {
select {
case <-ctx.Done():
server.Close()
}
}()

fmt.Println(server.ListenAndServe())
}

使用局部的ctx和cancel,可以避免cancel传入的ctx,只是影响当前的ctx。

GOLANG使用嵌入结构实现接口

技术讨论winlin 发表了文章 • 0 个评论 • 102 次浏览 • 4 天前 • 来自相关话题

考虑一个Packet接口,一般会返回一个Header,例如:

type PacketHeader struct {
    ID uint32
    Timesta... 			查看全部
					

考虑一个Packet接口,一般会返回一个Header,例如:


type PacketHeader struct {
ID uint32
Timestamp uint64
}

type Packet interface {
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
Header() *PacketHeader
}

如果是OO的语言,一般会有一个基类,里面包含了Header和实现这个Header:


class BasePacket : public Packet {
protected:
PacketHeader h;
public:
virtual Header() *PacketHeader;
};

class HandshakePacket : public BasePacket {
};

在子类中就都实现了这个Header()方法了,在GOLANG同样可以做到,通过在Header中定义方法,在Packet中包含Header就可以。


func (v *PacketHeader) Header() *PakcetHeader {
return v
}

type HandshakePacket struct {
PacketHeader
}

看起来还差不多的,都可以实现,golang只是代码少一点,清晰一点点而已。考虑要添加一些辅助函数,譬如给Packet添加是否是紧急类型的包,那OO语言得做一次代理:


type Packet interface {
IsErgency() bool
}

class BasePacketHeader {
public:
bool IsErgency() {
return realtime < 3;
}
}

class BasePacket {
public:
bool IsErgency() {
return h.IsErgency();
}
}

而在GOLANG中,只需要在Header实现就好了:


func (v *PacketHeader) IsErgency() bool {
return v.realtime < 3
}

更高级的可以直接嵌入接口。譬如context.Context的实现,cancelCtx直接嵌入了一个接口:


type cancelCtx struct {
Context

通过指定类型,或者初始化的顺序初始化struct


func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{
Context: parent,
done: make(chan struct{}),
}
}

结构嵌套的方式,让组合实现起来非常便捷,避免频繁的代理。

使用两个context实现CLOSE包的超时等待

技术讨论winlin 发表了文章 • 0 个评论 • 92 次浏览 • 4 天前 • 来自相关话题

在UDP中,一般发送者发送包后,如果一定的时间对方没有收到,就需要重传。例如UDP实现握手的过程,如果握手的包,比如RTMFP协议的IHELLO,发送给对方后,如果一定1秒没有收到,就应该重发一次,然后等3秒、6秒、9秒,如果最后没有收到就是超时了。查看全部

在UDP中,一般发送者发送包后,如果一定的时间对方没有收到,就需要重传。例如UDP实现握手的过程,如果握手的包,比如RTMFP协议的IHELLO,发送给对方后,如果一定1秒没有收到,就应该重发一次,然后等3秒、6秒、9秒,如果最后没有收到就是超时了。


最后一个Close包,发送者不能等待这么长的时间,所以需要设置一个较短的时间做超时退出。一般收发都是一个context,在最后这个Close包时,收到ctx.Done也不能立刻退出,因为还需要稍微等待,譬如600毫秒如果没有收到响应才能退出。


一个可能的实现是这样:


in := make(chan []byte)

func Close(ctx context.Context) (err error) {
timeous := ... // 1s,3s,6s,9s...
for _, to := range timeouts {
// 发送给对方WriteToUDP("CLOSE", peer)
// 另外一个goroutine读取UDP包到in

select {
case <- time.After(to):
case <- in:
fmt.Println("Close ok")
return
case <- ctx.Done():
fmt.Println("Program quit")
return
}
}
return
}

但是这个问题在于,在程序退出时,一般都会cancel ctx然后调用Close方法,这个地方就不会等待任何的超时,就打印"Program quit"然后返回了。解决方案是用另外一个context。但是如何处理之前的ctx的done呢?可以再起一个goroutine做同步:


in := make(chan []byte)

func Close(ctx context.Context) (err error) {
ctxRead,cancelRead := context.WithCancel(context.Background())
go func(){ // sync ctx with ctxRead
select {
case <-ctxRead.Done():
case <-ctx.Done():
select {
case <-ctxRead.Done():
case <-time.After(600*time.Milliseconds):
cancelRead()
}
}
}()

ctx = ctxRead // 下面直接用ctxRead。
timeous := ... // 1s,3s,6s,9s...
for _, to := range timeouts {
// 发送给对方WriteToUDP("CLOSE", peer)
// 另外一个goroutine读取UDP包到in

select {
case <- time.After(to):
case <- in:
fmt.Println("Close ok")
return
case <- ctx.Done():
fmt.Println("Program quit")
return
}
}
return
}

这样在主要的逻辑中,还是只需要处理ctx,但是这个ctx已经是新的context了。不过在实际的过程中,这个sync的goroutine需要确定起来后,才能继续,否则会造成执行顺序不确定:


sc := make(chan bool, 1)
go func(){ // sync ctx with ctxRead
sc <- true
select {
......
}
<- sc

使用context,来控制多个goroutine的执行和取消,是非常好用的,关键可以完全关注业务的逻辑,而不会引入因为ctx取消或者超时机制而造成的特殊逻辑。

GOLANG实现超时对象检测的最好理解的方式

技术讨论winlin 发表了文章 • 0 个评论 • 143 次浏览 • 6 天前 • 来自相关话题

依赖于心跳的系统,都需要超时检测。比如P2P系统中客户端每隔120秒向数据服务器发送一次数据汇总,服务器就需要维护一个超时时间。比如一个UDP服务器,在和客户端之间创建Session之后,如果没有数据包,一般会有Ping包,说明这个Session是存活的... 查看全部

依赖于心跳的系统,都需要超时检测。比如P2P系统中客户端每隔120秒向数据服务器发送一次数据汇总,服务器就需要维护一个超时时间。比如一个UDP服务器,在和客户端之间创建Session之后,如果没有数据包,一般会有Ping包,说明这个Session是存活的,服务器在发现Session超时后也需要清理。


首先,服务器一般需要维护一个列表,以Peer为例:


type Peer struct {
id uint64
heartbeat time.Time
}

type Server struct {
peers map[uint64]*Peer
lock sync.Mutex
}

创建Peer,同时在收到Ping消息后,更新Peer的心跳时间:


func (v *Server) Create(id uint64) *Peer {
v.lock.Lock()
defer v.lock.UnLock()

p = &Peer { id:id, heartbeat: time.Now(), }
v.peers[id] = p
return p
}

func (v *Server) OnPing(id uint64) {
v.lock.Lock()
defer v.lock.UnLock()

if p,ok := v.peers[id]; ok {
p.heatbeat = time.Now()
}
}

当然,需要起一个goroutine定期扫描这个列表, 假设300秒超时:


go func(v *Server) {
for {
func(){
v.lock.Lock()
defer v.lock.UnLock()

now := time.Now()
for id,p := range v.peers {
if p.heartbeat.Add(300 * time.Second).Before(now) {
delete(v.peers, id)
}
}
}()
time.Sleep(30 * time.Second)
}
}(server)

如果Peers的数目非常多,那么扫描时每次都需要锁定v.peers,会导致其他的业务都无法进行。特别是清理Peer这个过程如果比较复杂,譬如需要发起io请求,是一个费时的操作时,就会造成系统的等待。


一般来说,超时的Peer不会很多,因此可以用chan放一个超时的peer,每个peer专门起一个goroutine来看什么时候超时,这样就可以在检测超时时避免用锁了:


timeout := make(chan *Peer)

func (v *Server) Create(id uint64) *Peer {
v.lock.Lock()
defer v.lock.UnLock()

p = &Peer { id:id, heartbeat: time.Now(), }
v.peers[id] = p
return p

go func(p *Peer) {
for {
tm := p.heartbeat
<- time.After(300 * time.Second)
if tm.Equal(p.heartbeat) {
timeout <- p
break
}
}
}(p)
}

go func(v *Server){
for gw := range timeout {
func(){
lgateways.Lock()
defer lgateways.Unlock()

delete(gateways, gw.port)
}()

// Do something cleanup about the gateway.
}
}(server)

这样就只有在有Peer超时时,才真正锁住Server.peers

GOLANG接口适配,组合方式的灵活接口演化

技术讨论winlin 发表了文章 • 0 个评论 • 174 次浏览 • 2017-05-15 20:49 • 来自相关话题

在OO(Object Oriented)原则中,有一条叫做:优先使用组合,而不是继承。虽然GOLANG并不是OO的语言(没有继承和多态),但是不妨碍GOLANG使用这条原则,而GOLANG的作者就强调过这一点,在GOLANG中是使用组合而非继承来扩展。<... 查看全部

在OO(Object Oriented)原则中,有一条叫做:优先使用组合,而不是继承。虽然GOLANG并不是OO的语言(没有继承和多态),但是不妨碍GOLANG使用这条原则,而GOLANG的作者就强调过这一点,在GOLANG中是使用组合而非继承来扩展。


装逼的说来,继承是一种名词化的语言体系,先进行业务抽象然后设计类体系和继承关系。而组合,强制使用接口,因为组合中使用的总是另外一个对象的接口,通过动词的组合,实现目标,比如不管是什么只要有Write([]byte)(int,error)这个动作,就实现了这个接口,其他对象组合这个接口后,对外也看起来就是个io.Writer的接口。


比如,GOALNG1.8支持了writev,一般在面向对象会这么的搞:


class Socket {
int Write(void*, int);
int Writev(const iovec*, int);
};

对的吧?一个Socket可以写数据,也可以用writev写iovec向量,就是一次性写入多个内存块。



Note: 有时候内存块是不连续的,比如一个Video帧,发送给不同的客户端时,Header是需要修改的,但是Payload都一样,那么可以针对每个客户端只创建一个header,然后公用payload,但是这时候两个内存指针是不连续的,特别是需要同时写入多个视频帧时,writev就很神奇的避免了内存拷贝writev(header+payload),具体参考下writev的资料哈。



这样有个问题,并非所有系统都支持Writev的,并非所有Socket都支持Writev的,如果是自己写个代码,当然是可以随便这么搞的,但是作为标准库,GOLANG当然是不能这么做的。GOLANG就加了一个接口(一个新动作)叫做net.buffersWriter,如果实现了这个接口就用writev。先看用法:


    conn,err := net.Dial("tcp", "127.0.0.1:1935")

buffers := Buffers{
[]byte("once upon a time in "),
[]byte("Gopherland ... "),
}

buffers.WriteTo(conn)

在Buffers的WriteTo方法会判断是否是writev的接口,如果是则用writev写,否则就一个个的写:


func (v *Buffers) WriteTo(w io.Writer) (n int64, err error) {
if wv, ok := w.(buffersWriter); ok {
return wv.writeBuffers(v)
}

实际上conn是net.TcpConn,里面有个fd *net.netFD,它实现了net.buffersWriter接口,所以最后调用的就是(fd *netFD) writeBuffers(v *Buffers)


func (c *conn) writeBuffers(v *Buffers) (int64, error) {
n, err := c.fd.writeBuffers(v)

func (fd *netFD) writeBuffers(v *Buffers) (n int64, err error) {
iovecs = append(iovecs, syscall.Iovec{Base: &chunk[0]})
wrote, _, e0 := syscall.Syscall(syscall.SYS_WRITEV,
uintptr(fd.sysfd),
uintptr(unsafe.Pointer(&iovecs[0])),
uintptr(len(iovecs)))

对于其他没有实现这个接口的对象,就每个向量循环的写。


在看一个例子http.Get(url string),客户端发起一个HTTP请求:


http.Get("http://localhost:1985/api/v1/versions")
// 实际上调用的是:
func (c *Client) Get(url string)
// 然后调用:
(c *Client) Do(req *Request)

在GOLANG1.7中引入了context的概念,用来支持cancel,怎么用的:


ctx,cancel := context.WithCancel(context.Background())

select {
case <- ctx.Done():
// Cancelled.
case <- time.After(...):
// Timeout
case <- other events:
// Other events.
}

如何支持取消的HTTP请求呢?给http.Get加个ctx参数?例如http.Get(ctx, url)这样?那改动得多大啊,而且还不能兼容之前的API,泪奔~看看GOLANG的解决:


ctx,cancel := context.WithCancel(context.Background())
go func(){
req,err := http.NewRequest("http://...")
res,err := http.DefaultClient.Do(req.WithContext(ctx))
defer res.Body.Close()
// 读取res响应结果。
}()

select {
case <- ctx.Done():
case <- time.After(3 * time.Second):
cancel() // Timeout to cancel all requests.
}

使用组合,通过req.WithContext再返回一个*http.Request,实现同样的目的。

GOLANG使用简单类型,在协议解析的妙用

技术讨论winlin 发表了文章 • 0 个评论 • 646 次浏览 • 2017-05-11 15:41 • 来自相关话题

在协议解析中,经常需要用到转换不同的含义,比如声音的采样率,在FLV中定义和AAC中定义是不同的。在FLV中只有4中采样率5512, 11025, 22050, 44100。而在AAC中有16种采样率96000, 8... 查看全部

在协议解析中,经常需要用到转换不同的含义,比如声音的采样率,在FLV中定义和AAC中定义是不同的。在FLV中只有4中采样率5512, 11025, 22050, 44100。而在AAC中有16种采样率96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350(还有4个是保留的)。也就是说,1在FLV中标识11025Hz,而在AAC中表示的是88200Hz。如何实现这个转换呢?


C++当然先得定义枚举:


enum SrsAudioSampleRate
{
SrsAudioSampleRate5512 = 0,
SrsAudioSampleRate11025,
SrsAudioSampleRate22050,
SrsAudioSampleRate44100,
SrsAudioSampleRateForbidden,
};

C++当然是用函数了:


SrsAudioSampleRate aac_to_flv(int v) {
if (v >= 0 && v <=5) {
return SrsAudioSampleRate44100;
} else if (v >=6 && v <= 8) {
return SrsAudioSampleRate22050;
} else if (v >= 9 && v <= 11) {
return SrsAudioSampleRate11025;
} else if (v == 12) {
return SrsAudioSampleRate5512;
} else {
return SrsAudioSampleRateForbidden;
}
}

看起来还是挺简单的。慢着,还有的时候需要打印出采样率来,所以还得搞个函数:


string srs_audio_sample_rate2str(SrsAudioSampleRate v)
{
switch (v) {
case SrsAudioSampleRate5512: return "5512";
case SrsAudioSampleRate11025: return "11025";
case SrsAudioSampleRate22050: return "22050";
case SrsAudioSampleRate44100: return "44100";
default: return "Forbidden";
}
}

拿到一个AAC的采样率,然后转换成FLV的,并打印出来,是这么使用的:


// 从文件或者流中读取出AAC的采样率的值。
int samplingFrequencyIndex = ...;
// 转换成FLV的采样率。
SrsAudioSampleRate sampleRate = aac_to_flv(samplingFrequencyIndex);
// 转换成字符串格式。
string sSampleRate = srs_audio_sample_rate2str(sampleRate);
// 打印采样率。
printf("SampleRate=%d/%sHz\n", sampleRate, sSampleRate);

有什么麻烦的呢?



  1. 函数和类型之间没有关系,每次使用的时候都得去翻手册啊翻手册。

  2. 如果定义成一个struct,那转换的时候又太麻烦了。


还能不能愉快的玩耍呢?用GOLANG吧!先看用法:


var sampleRate AudioSamplingRate
sampleRate.From(samplingFrequencyIndex)
fmt.Printf("SampleRate=%d/%v\n", sampleRate, sampleRate)

就是这么简单(此处应该有掌声)~


其实实现起来也非常自然:


type AudioSamplingRate uint8

const (
AudioSamplingRate5kHz AudioSamplingRate = iota // 0 = 5.5 kHz
AudioSamplingRate11kHz // 1 = 11 kHz
AudioSamplingRate22kHz // 2 = 22 kHz
AudioSamplingRate44kHz // 3 = 44 kHz
AudioSamplingRateForbidden
)

func (v AudioSamplingRate) String() string {
switch v {
case AudioSamplingRate5kHz:
return "5.5kHz"
case AudioSamplingRate11kHz:
return "11kHz"
case AudioSamplingRate22kHz:
return "22kHz"
case AudioSamplingRate44kHz:
return "44kHz"
default:
return "Forbidden"
}
}

func (v *AudioSamplingRate) From(a int) {
switch a {
case 0, 1, 2, 3, 4, 5:
*v = AudioSamplingRate44kHz
case 6, 7, 8:
*v = AudioSamplingRate22kHz
case 9, 10, 11:
*v = AudioSamplingRate11kHz
case 12:
*v = AudioSamplingRate5kHz
default:
*v = AudioSamplingRateForbidden
}
}


Remark: 代码参考go-oryx-lib flv.



有几个地方非常不同:



  1. 虽然GOLANG只是在uint8上面加了函数,但是使用起来方便很多了,以前在C++中用这两个枚举,每次都要跳到枚举的定义来看对应的函数是什么。

  2. GOLANG的switch比较强大,可以case好几个值,和C++的if有点想,但是GOLANG的case更直观,知道这几个值会被转换成另外的值,而if读起来像是将一个范围的值转换,不好懂。

  3. GOLANG的枚举使用const实现,也可以带类型,而且有个iota很强大,特别是在定义那些移位的枚举时就很好用。


好吧,这只是几个小的改进,虽然用起来很方便。来看看在AMF0中基本类型的妙用,AMF0是一种传输格式,和JSON很像,不过JSON是文本的,而AMF0是字节的,都是用来在网络中传输对象的。因此,AMF0定义了几个基本的类型:String, Number, Boolean, Object,其中Object的属性定义为String的属性名和值,值可以是其他的类型。


先看看C++的实现,首先定义一个AMF0Any对象,可以转换成具体的String或者Object等对象:


class SrsAmf0Any {
// 提供转换的函数,获取实际的值。
virtual std::string to_str();
virtual bool to_boolean();
virtual double to_number();
virtual SrsAmf0Object* to_object();
// 当然还得提供判断的函数,得知道是什么类型才能转。
virtual bool is_string();
virtual bool is_boolean();
virtual bool is_number();
virtual bool is_object();
// 提供创建基本类型的函数。
static SrsAmf0Any* str(const char* value = NULL);
static SrsAmf0Any* boolean(bool value = false);
static SrsAmf0Any* number(double value = 0.0);
static SrsAmf0Object* object();
};

在实现时,String和Number等基本类型可以隐藏起来(在cpp中实现):


namespace _srs_internal {
class SrsAmf0String : public SrsAmf0Any {
public:
std::string value;
// 当然它必须实现编码和解码的函数。
virtual int total_size();
virtual int read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream);
};
}

AMF0Object当然得暴露出来的:


class SrsAmf0Object : public SrsAmf0Any {
public:
virtual int total_size();
virtual int read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream);
// 提供设置和读取属性的方法。
virtual void set(std::string key, SrsAmf0Any* value);
virtual SrsAmf0Any* get_property(std::string name);
};

用起来是这样:


// 设置Object的属性,并发送给服务器。
SrsConnectAppPacket* pkt = NULL;
pkt->command_object->set("app", SrsAmf0Any::str(app.c_str()));
pkt->command_object->set("tcUrl", SrsAmf0Any::str(tcUrl.c_str()));

// 读取服务器的响应,取出服务器的IP等信息。
SrsConnectAppResPacket* pkt = NULL;
SrsAmf0Any* data = pkt->info->get_property("data");
if (si && data && data->is_object()) {
SrsAmf0Object* obj = data->to_objet();

SrsAmf0Any* prop = obj->get_property("srs_server_ip");
if (prop && prop->is_string()) {
printf("Server IP: %s\n", prop->to_str().c_str());
}

prop = obj->get_property("srs_pid");
if (prop && prop->is_number()) {
printf("Server PID: %d\n, prop->to_number());
}
}

看起来巨繁琐吧?快用GOLANG,如果换成GOLANG,可以用基本类型定义AMF0的基本类型,这样使用起来是这样:


pkt := or.NewConnectAppPacket()
pkt.CommandObject.Set("tcUrl", amf0.NewString(tcUrl))
pkt.CommandObject.Set("app", amf0.NewString(app))

var res *or.ConnectAppResPacket
if data, ok := res.Args.Get("data").(*amf0.Object); ok {
if data, ok := data.Get("srs_server_ip").(*amf0.String); ok {
fmt.Printf("Server IP: %s\n", string(*data))
}
if data, ok := data.Get("srs_pid").(*amf0.Number); ok {
fmt.Printf("Server PID: %d\n, int(*data))
}
}

区别在于:



  1. C++由于不能在基本类型上定义方法,导致必须创建struct或者class类型,有比较繁琐的类型转换和判断。

  2. GOLANG的类型判断,提供了ok的方式,一句话就能把类型转换弄好,而且接口和实现struct的对象可以重用变量名。

  3. 不必加很多类型判断,没有多余的变量,干净利索,需要维护的信息比较少。


实现起来更舒服,基本类型不用定义struct:


type String string
func (v *String) Size() int {}
func (v *String) UnmarshalBinary(data []byte) (err error) {}
func (v *String) MarshalBinary() (data []byte, err error) {}

type Object struct {}
func (v *Object) Size() int {}
func (v *Object) UnmarshalBinary(data []byte) (err error) {}
func (v *Object) MarshalBinary() (data []byte, err error) {}


Remark:代码参考go-oryx-lib amf0.



更神奇的是,因为Object、EcmaArray和StrictArray都是类似的结构,但是有些细微的差异,因此使用GOLANG的结构体嵌套可以很直接的解决问题:


type Object struct {
objectBase
eof objectEOF
}
type EcmaArray struct {
objectBase
count uint32
eof objectEOF
}
type StrictArray struct {
objectBase
count uint32
}

可以对比下SRS的实现,C++可以采用继承,而GOLANG直接组合那些基本的单元。


爱生活,爱够浪(此处可以响起掌声了)~

GOLANG将类型作为参数,用反射设置指针的指针,实现类似模板功能

技术讨论winlin 发表了文章 • 0 个评论 • 152 次浏览 • 2017-05-09 20:13 • 来自相关话题

在协议解析中,C++的模板有比较大的作用,有时候我们希望丢弃所有的包,只留下特定类型的包。参考SRS的代码查看全部

在协议解析中,C++的模板有比较大的作用,有时候我们希望丢弃所有的包,只留下特定类型的包。参考SRS的代码SrsRtmpClient::connect_app2


类型系统的设计, SrsConnectAppResPacket继承自SrsPacket


class SrsPacket;
class SrsConnectAppResPacket : public SrsPacket

协议栈提供了expect_message模板函数,接收特定类型的包:


SrsCommonMessage* msg = NULL;
SrsConnectAppResPacket* pkt = NULL;
if ((ret = protocol.expect_message<SrsConnectAppResPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
return ret;
}

SrsAmf0Any* data = pkt->info->get_property("data");
SrsAmf0EcmaArray* arr = data->to_ecma_array();
SrsAmf0Any* prop = arr->ensure_property_string("srs_server_ip");
string srs_server_ip = prop->to_str();

在向服务器发送了ConnectApp后,就等待ConnectAppRes响应包,丢弃所有的其他的。这个时候,类型SrsConnectAppResPacket就作为了一个参数,也就是C++的模板。如果是GOLANG怎么实现呢?没有直接的办法的,因为没有泛型。


在GOLANG中,也需要定义个interface,参考Packet,当然也是有ConnectAppResPacket实现了这个接口(Message是原始消息,它的Payload可以Unmarshal为Packet):


type Message struct { Payload []byte }
type Packet interface {} // Message.Payload = Packet.Marshal()
type ConnectAppResPacket struct { Args amf0.Amf0 }

第一种方法,协议栈只需要收取Message,然后解析Message为Packet,收到packet后使用类型转换,判断不是自己需要的包就丢弃:


func (v *Protocol) ReadMessage() (m *Message, err error)
func (v *Protocol) DecodeMessage(m *Message) (pkt Packet, err error)

不过这两个基础的API,User在使用时,比较麻烦些,每次都得写一个for循环:


var protocol *Protocol

for {
var m *Message
m,_ = protocol.ReadMessage()

var p Packet
p,_ = protocol.DecodeMessage(m)

if res,ok := p.(*ConnectAppResPacket); ok {
if data, ok := res.Args.Get("data").(*amf0.EcmaArray); ok {
if data, ok := data.Get("srs_server_ip").(*amf0.String); ok {
srs_server_ip = string(*data)
}
}
}
}

比较方便的做法,就是用回调函数,协议栈需要提供个ExpectPacket方法:


func (v *Protocol) ExpectPacket(filter func(m *Message, p Packet)(ok bool)) (err error)

这样可以利用回调函数可以访问上面函数的作用域,直接转换类型和设置目标类型的包:


var protocol *Protocol

var res *ConnectAppResPacket
_ = protocol.ExpectPacket(func(m *Message, p Packet) (ok bool){
res,ok = p.(*ConnectAppResPacket)
})

if data, ok := res.Args.Get("data").(*amf0.EcmaArray); ok {
if data, ok := data.Get("srs_server_ip").(*amf0.String); ok {
srs_server_ip = string(*data)
}
}

这样已经比较方便了,不过还是需要每次都给个回调函数。要是能直接这样用就好了:


var protocol *Protocol

var res *ConnectAppResPacket
_ = protocol.ExpectPacket(&res)

if data, ok := res.Args.Get("data").(*amf0.EcmaArray); ok {
if data, ok := data.Get("srs_server_ip").(*amf0.String); ok {
srs_server_ip = string(*data)
}
}

这样也是可以做到的,不过协议栈函数要定义为:


func (v *Protocol) ExpectPacket(ppkt interface{}) (err error)

在函数内部,使用reflect判断类型是否符合要求,设置返回值。代码参考ExpectPacket,下面是一个简要说明:


func (v *Protocol) ExpectPacket(ppkt interface{}) (m *Message, err error) {
// 由于ppkt是**ptr, 所以取类型后取Elem(),就是*ptr,用来判断是否实现了Packet接口。
ppktt := reflect.TypeOf(ppkt).Elem()
// ppktv是发现匹配的包后,设置值的。
ppktv := reflect.ValueOf(ppkt)

// 要求参数必须是实现了Packet,避免传递错误的值进来。
if required := reflect.TypeOf((*Packet)(nil)).Elem(); !ppktt.Implements(required) {
return nil,fmt.Errorf("Type mismatch")
}

for {
m, err = v.ReadMessage()
pkt, err = v.DecodeMessage(m)

// 判断包是否是匹配的那个类型,如果不是就丢弃这个包。
if pktt = reflect.TypeOf(pkt); !pktt.AssignableTo(ppktt) {
continue
}

// 相当于 *ppkt = pkt,类似C++中对指针的指针赋值。
ppktv.Elem().Set(reflect.ValueOf(pkt))
break
}
return
}

遗憾的就是这个参数ppkt类型不能是Packet,因为会有类型不匹配;也不能是*Packet,因为在GOLANG中传递接口的指针也是不可以的,会导致类型错误(**ConnectAppResPacket并不能匹配*Packet);这个参数只能是interface{}。不过用法也很简单,只是需要注意参数的传递。


var res *ConnectAppResPacket
// 这是正确的做法,传递res指针的地址,相当于指针的指针。
_ = protocol.ExpectPacket(&res)
// 这是错误的做法,会在ExpectPacket检查返回错误,没有实现Packet接口
_ = protocol.ExpectPacket(res)

用起来还不错。

讨论关于Go和相关生态的空间