ghub:一个基于grpc的rpc hub


之前的一篇文章中介绍了grpc的用法,在这篇文章中,我将使用grpc做一个有些实际意义的小项目,一个可以使用的rpc hub。

rpc hub

在多服务器架构中,服务器之间经常需要进行通信。使用message queue构建一个pub-sub的消息传送机制,可以获得较低的耦合度、较高的灵活性,同时容易实现容灾备份。

但是,对于游戏服务器这种多台同构服务器的情况,直接使用rpc进行远程函数调用,会更加方便,开发效率更高。

以我最熟悉的游戏服务器集群为例,普通游戏服需要经常和跨服游戏服进行通信(跨服PVP玩法、全服排行榜等),这是一个多对多的架构,如果直接在普通游戏服/跨服游戏服上使用grpc进行通信的话,需要在每个服务器上启动多个grpc通道,并进行维护,这无形中增加了游戏服务器的负担。

“在软件开发中,没有什么是增加一个中间层解决不了的”。我们要做的这个rpc hub就是这样的一个中间层。每个服务器均只与此hub进行通信,hub负责收集所有服务器的注册信息,并将消息转发到对应的服务器。这样在每个服务器上,就只用维护一对连接,并且是对于hub这种静态服务的连接,而不用去考虑动态变化的服务器集群中的其他服务器。

接口设计

对于hub来说,只需向外提供两个接口即可。

  1. Register 供新上线的服务器注册
  2. RemoteCall 供服务器远程调用函数

而每个服务器上也需要通过grpc提供一个接口,供hub进行调用:

  1. ForwardCall 供hub调用

所以,整个proto文件所描述的rpc接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
syntax = "proto3";

package ghub;

service GHubServer
{
rpc Register (ClientInfo) returns (ReturnState) {}
rpc RemoteCall (CallInfo) returns (ReturnState) {}
}

service GHubClient
{
rpc ForwardCall (CallInfo) returns (ReturnState) {}
}

message ClientInfo
{
string ip = 1;
int32 port = 2;
string name = 3;
}

message CallInfo
{
string dst = 1;
int32 typ = 2;
string entity = 3;
string method = 4;
bytes args = 5;
}

message ReturnState
{
int32 ret = 1;
}

GHubServer

在GHubServer中将要实现上述proto文件中定义的Register和RemoteCall两个rpc方法。

Register:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class GHubServer(ghub_pb2.GHubServerServicer):
def __init__(self):
self.clients = {}

def Register(self, request, context):
now = time.time()
if request.name not in self.clients:
addr = '{}:{}'.format(request.ip, request.port)
channel = grpc.insecure_channel(addr)
stub = ghub_pb2.GHubClientStub(channel)
self.clients[request.name] = Channel(stub, now)
logger.info('client {} from {} registered.'.format(
request.name, addr))
else:
self.clients[request.name].timestamp = now
return ghub_pb2.ReturnState(ret=0)

Client通过GHubServer.Register方法向hub注册自己。GHubServer收到Client的ip/port信息后,建立一个rpc channel并保存下来,同时更新时间戳用于判断Client是否在线。

这样,GHubServer就像一个hub(集线器)一样,维持了通向各个Client的通道。

那么接下来,RemoteCall方法的实现就很直接了,直接按照目的地Client的名字,找到对应的rpc channel,将函数调用信息转发过去。

1
2
3
4
5
6
7
8
9
10
11
class GHubServer(ghub_pb2.GHubServerServicer):
# see above...
def RemoteCall(self, request, context):
dst_name = request.dst
if dst_name not in self.clients:
return ghub_pb2.ReturnState(ret=-1)
channel = self.clients[dst_name]
stub = channel.stub
ret = stub.ForwardCall.future(request)
ret.result()
return ghub_pb2.ReturnState(ret=0)

另外,GHubServer还需要定期的根据Client的注册时间戳信息,判断是否有Client掉线。

1
2
3
4
5
6
7
8
9
10
11
12
class GHubServer(ghub_pb2.GHubServerServicer):
# see above...
def CheckChannels(self):
now = time.time()
rm_channels = []
for name, channel in self.clients.iteritems():
if now - channel.timestamp >= 60:
rm_channels.append(name)

for name in rm_channels:
self.clients.pop(name, None)
logger.info('client {} disconnected.'.format(name))

接下来,就是正常的启动rpc server的流程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def serve():
doc = """Usage:
ghub.py -p <port>
ghub.py (-h | --help)
Options:
-h --help Show this screen
-p Specify the listening port
"""

args = docopt(doc, version="ghub ver1.0")
hub_port = int(args['<port>'])
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
ghub_server = GHubServer()
ghub_pb2.add_GHubServerServicer_to_server(ghub_server, server)
server.add_insecure_port('[::]:{}'.format(hub_port))
server.start()
try:
while True:
time.sleep(10)
ghub_server.CheckChannels()
except KeyboardInterrupt:
server.stop(0)

if __name__ == "__main__":
serve()

GHubClient

Client需要完成两个功能。首先它需要作为一个Proxy,将所有的远程调用通过rpc RemoteCall转发给GHubServer,并定期向GHubServer注册自己;另外,它也需要作为一个rpc调用的接受者,接受GHubServer转发来的rpc ForwardCall调用。

Proxy:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class GHubProxy(object):
def __init__(self, hub_ip, hub_port, name, port):
channel = grpc.insecure_channel('{}:{}'.format(hub_ip, hub_port))
self.stub = ghub_pb2.GHubServerStub(channel)
self.port = port
self.name = name

def CallMethod(self, remote, typ, entity, method, args):
byte_args = zlib.compress(cPickle.dumps(args, -1))
call_info = ghub_pb2.CallInfo(
dst=remote, typ=typ,
entity=entity, method=method, args=byte_args)
ret = self.stub.RemoteCall(call_info)
return ret.ret

def Register(self):
self.stub.Register(ghub_pb2.ClientInfo(
ip='localhost',
port=self.port,
name=self.name))

在调用远程方法时,我们通过zlib和cPickle将调用参数转成通用的byte形式传输,可以不受函数参数g个数的影响,如需支持关键字参数,还可以加上kwargs。

ForwardCall:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class GHubClient(ghub_pb2.GHubClientServicer):
def ForwardCall(self, request, context):
args = cPickle.loads(zlib.decompress(request.args))
method = request.method
if request.typ == 1:
# global methods
func = globals().get(method, None)
try:
func(*args)
return ghub_pb2.ReturnState(ret=0)
except:
return ghub_pb2.ReturnState(ret=-1)
elif request.typ == 2:
# entity methods
entity = entities.get(request.entity, None)
try:
func = getattr(entity, method)
func(*args)
return ghub_pb2.ReturnState(ret=0)
except:
return ghub_pb2.ReturnState(ret=-1)

return ghub_pb2.ReturnState(ret=-1)

在这里我们只示意了两种情况。一种是直接调用模块内的全局方法;另一种是假设我们有一个可以索引所有实例的地方,从而通过实例的名字,调用对应实例的相关方法。

下面需要构造一些测试数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
shutdown_event = threading.Event()

def serve(ip, port):
client = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
ghub_client = GHubClient()
ghub_pb2.add_GHubClientServicer_to_server(ghub_client, client)
client.add_insecure_port('{}:{}'.format(ip, port))
logger.info('client listening on {}:{}'.format(ip, port))
client.start()
return client

def HeartBeat(proxy):
while not shutdown_event.is_set():
proxy.Register()
time.sleep(10)

def TestMethod(a, b):
logger.info('TestMethod called with: {}, {}'.format(a, b))


class Entity(object):
def __init__(self, name):
self.name = name

def TestMethod(self, a, b):
logger.info('{}.TestMethod called with: {} {}'.format(self.name, a, b))

if __name__ == "__main__":
doc = """Usage:
ghub_client.py -p <port> -s <hub_port> -n <name>
ghub_client.py (-h | --help)
Options:
-h --help Show this screen
-p Listening port
-s Hub port
-n Client name
"""

args = docopt(doc, version="ghub_client ver1.0")
port = int(args['<port>'])
hub_port = int(args['<hub_port>'])
client_name = args['<name>']

client_stub = serve('[::]', port)
proxy = GHubProxy('localhost', hub_port, client_name, port)
t2 = threading.Thread(target=HeartBeat, args=(proxy,))
t2.deamon = True
t2.start()

user = Entity('user')
entities[user.name] = user
account = Entity('account')
entities[account.name] = account

time.sleep(2)
for a, b in zip(range(1, 3), range(11, 13)):
proxy.CallMethod(client_name, 1, '', 'TestMethod', (a, b))
proxy.CallMethod(client_name, 2, 'user', 'TestMethod', (a, b))
proxy.CallMethod(client_name, 2, 'account', 'TestMethod', (a, b))
time.sleep(1)

try:
while True:
time.sleep(10)
except KeyboardInterrupt:
logger.error('KeyboardInterrupt')
shutdown_event.set()
client_stub.stop(0)

实际测试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$./ghub.py -p 33333
INFO:root:client c1 from localhost:33334 registered.
INFO:root:client c2 from localhost:33335 registered.

$./ghub_client.py -p 33334 -s 33333 -n c1 -t c2
INFO:root:client listening on [::]:33334
INFO:root:TestMethod called with: 1, 11
INFO:root:user.TestMethod called with: 1 11
INFO:root:account.TestMethod called with: 1 11
INFO:root:TestMethod called with: 2, 12
INFO:root:user.TestMethod called with: 2 12
INFO:root:account.TestMethod called with: 2 12

$./ghub_client.py -p 33335 -s 33333 -n c2 -t c1
INFO:root:client listening on [::]:33335
INFO:root:TestMethod called with: 1, 11
INFO:root:user.TestMethod called with: 1 11
INFO:root:account.TestMethod called with: 1 11
INFO:root:TestMethod called with: 2, 12
INFO:root:user.TestMethod called with: 2 12
INFO:root:account.TestMethod called with: 2 12

总结

这样的一个rpc hub架构简明,实现也比较简单,易于维护和调试。但是同时也给系统引入了单点。另外在效率方面,一般的对于GHubServer可以考虑使用C++实现,因为这一模块功能简单,实现完成之后也基本不会做功能上的变动。而Client端为了兼顾开发效率,一般选择直接使用脚本实现。

完整代码详见ghub

转载请注明出处: http://blog.guoyb.com/2016/11/23/ghub/

欢迎使用微信扫描下方二维码,关注我的微信公众号TechTalking,技术·生活·思考:
后端技术小黑屋

Comments