探索skynet(三):消息队列


skynet框架底层使用消息队列作为各个服务之间通信的工具,之前在探索skynet如何启动一个服务时也提到了创建skynet_context时会初始化一个队列,并且也笼统的提到了如何为一个服务设置callback。那么接下来就详细的看看,skynet里有关消息队列的部分是如何设计和实现的。

基础结构

skynet中有关消息队列的实现主要放在skynet_mq.c中。

skynet_mq.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct message_queue {
struct spinlock lock;
uint32_t handle;
int cap;
int head;
int tail;
int release;
int in_global;
int overload;
int overload_threshold;
struct skynet_message *queue;
struct message_queue *next;
};

struct global_queue {
struct message_queue *head;
struct message_queue *tail;
struct spinlock lock;
};


在skynet的设计中,有一个全局队列global_queue,其中保存了非空的各个服务的消息队列message_queue。

message_queue中的handle,就是对应服务的handle,可以通过它找到对应服务的skynet_context。

在两个队列的实现部分,全局队列使用链表,而服务队列则是使用了动态数组模拟一个循环队列,并且在动态数组被填满时(head==tail),自动进行了扩充。所有这些内容,都不会超过大学数据结构课程的范畴,而云风大大的实现,简洁而完整,堪称典范。

自旋锁

这其中,由于skynet底层是一个多线程的架构,所以在对队列进行操作时,使用了自旋锁(SPINLOCK)。

SPINLOCK的实现在spinlock.h中,提供了两种实现。一种利用gcc的__sync_lock_test_and_set原子操作,实现循环检测的自旋锁(spinlock)。而另一种,则直接利用了互斥锁pthread_mutex_t,但是也提供了统一的接口,通过编译条件选择,便于测试性能。说到自旋锁,它和互斥锁的区别主要在于,自旋锁在加锁等待时并不会切换当前线程的上下文,而是忙等待(busy-waiting):

spinlock.h
1
2
3
4
static inline void
spinlock_lock(struct spinlock *lock) {

while (__sync_lock_test_and_set(&lock->lock,1)) {}
}

从实现上也可以看出,自旋锁在等待时,实际上一直在进行循环。

由于自旋锁不会切换线程上下文(睡眠),所以自旋锁的加锁效率要更高,但是也会导致CPU使用率增高。一般多核服务器、每次加锁等待时间不长时,适合使用自旋锁来提高执行效率。

callback与消息队列

skynet的服务之间的调用,都会封装成一个message,通过skynet_mq_push塞到对应服务的消息队列中,等待处理。

服务在处理其消息队列时,会提前注册callback。之前在skynet如何启动一个服务中说到,从lua层面skynet.core.callback设置skynet.dispatch_message这一callback的过程,是在lua-skynet.c中的lcallback函数执行的。

其中,有一些用到了Lua虚拟机的代码:

lua-skynet.c
1
2
3
4
int forward = lua_toboolean(L, 2); 
luaL_checktype(L,1,LUA_TFUNCTION);
lua_settop(L,1);
lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);

这一段代码,取出了skynet.core.callback的第二个参数,作为forward(区别是回调函数的返回值。返回值不同,所以后续在skynet_server.c中对处理过的msg->data的处理也不一样)。然后把指针放到第一个参数上(lua_settop(L,1))。最后,将第一个参数设置到全局注册表(LUA_REGISTRYINDEX)中,并且以函数_cb为key。

之后,

lua-skynet.c
1
2
3
4
5
6
7
8
lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
lua_State *gL = lua_tothread(L,-1);

if (forward) {
skynet_callback(context, gL, forward_cb);
} else {
skynet_callback(context, gL, _cb);
}

取出了当前主线程的lua_State,以此为cb_ud,设置callback。(取主线程的主要意义是和云风使用的skynet热更新机制有关,防止在实际执行时,原来的lua_State已经被释放了)

而上面的skynet_callback函数,其实就是简单的设置了context->cb和context->cb_ud。

实际执行callback的逻辑在skynet_server.c中的dispatch_message中,其实是从服务的消息队列中,取出消息,调用了_cb,并传入了cb_ud,也就是主线程的lua_State。

skynet-server.c
1
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);

而在_cb函数中

lua-skynet.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int
_cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
lua_State *L = ud;
int trace = 1;
int r;
int top = lua_gettop(L);
if (top == 0) {
lua_pushcfunction(L, traceback);
lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
} else {
assert(top == 2);
}
lua_pushvalue(L,2);

lua_pushinteger(L, type);
lua_pushlightuserdata(L, (void *)msg);
lua_pushinteger(L,sz);
lua_pushinteger(L, session);
lua_pushinteger(L, source);

r = lua_pcall(L, 5, 0 , trace);

// other code ...
}

先设置了一个错误处理函数traceback(lua_pushcfunction(L, traceback), index 1),然后以_cb为key,将实际的回调函数从全局注册表LUA_REGISTRYINDEX中取出, 然后依次压入参数,最后调用lua_pcall具体执行真正的回调函数。

这里有个lua_pushvalue(L, 2)好像有点多余,因为目前来看,真正的回调函数就是一定在栈顶位置的。有哪位大侠知道原因,还望不吝赐教。

总结

skynet以消息队列为基础,实现了服务与服务之间的消息转发。在具体实现中,将消息队列mq、服务handle、服务上下文context、回调函数cb、回调的lua_State结合在一起,实现了lua层面的各个服务回调函数的设置。

推荐阅读:
探索skynet(一):从skynet_sample说起
探索skynet(二):skynet如何启动一个服务

转载请注明出处: http://blog.guoyb.com/2017/03/08/skynet-3/

欢迎使用微信扫描下方二维码,关注我的微信公众号TechTalking,技术·生活·思考:
后端技术小黑屋

Comments