2014-04-07
gearman是一个支持多语言的任务分发框架,很像远程过程调用。
图片来自网络
上图是来自资料[1],展示了gearman的基本设计思路。worker作为执行任务的节点,在生成时会向job server注册。client节点向job server发起任务,job server将任务分配给worker,job server作为一个中间人会负责client和worker之间的任务参数、执行结果等信息的传递。Gearman的更多内容,请参考资料[2]。
在ubuntu下安装:
sudo apt-get install gearman
启动:
service gearman-job-server start
或者使用gearmand
启动。
安装相应的python库:
sudo pip install gearman
关于这个库,具体请参考资料[3]。
从示例开始
创建文件worker.py
,内容如下:
import gearman
import pickle
gm_worker = gearman.GearmanWorker(['localhost:4730'])
def sum_worker(gearman_worker, gearman_job):
print gearman_job
print gearman_job.data
print type(gearman_job.data)
return pickle.dumps(sum(pickle.loads(gearman_job.data)))
def str2upper_worker(gearman_worker, gearman_job):
print gearman_job
print gearman_job.data
print type(gearman_job.data)
return gearman_job.data.upper()
gm_worker.register_task('sum', sum_worker)
gm_worker.register_task('str2upper', str2upper_worker)
gm_worker.work()
创建文件client.py
,内容如下:
import gearman
import pickle
gm_client = gearman.GearmanClient(['localhost:4730'])
request = gm_client.submit_job("str2upper", "hello world")
print request
print request.result
request = gm_client.submit_job("sum", pickle.dumps([1,2,3]))
print request
print pickle.loads(request.result)
打开终端A,运行gearman:
sudo gearmand
打开另外一个终端B,运行worker.py:
python worker.py
再打开一个终端C,运行client.py:
python client.py
此时,终端B输出以下内容:
<GearmanJob connection/handle=(<GearmanConnection localhost:4730 connected=True>, 'H:myhost:5'), task=str2upper, unique=7f3bc96ba032288de7e55570fe5ea9b7, data='hello world'>
hello world
<type 'str'>
<GearmanJob connection/handle=(<GearmanConnection localhost:4730 connected=True>, 'H:myhost:6'), task=sum, unique=9195ef08761f921c322a716aa1db61a5, data='(lp0\nI1\naI2\naI3\na.'>
(lp0
I1
aI2
aI3
a.
<type 'str'>
终端C中输出以下内容:
<GearmanJobRequest task='str2upper', unique='7f3bc96ba032288de7e55570fe5ea9b7', priority=None, background=False, state='COMPLETE', timed_out=False>
HELLO WORLD
<GearmanJobRequest task='sum', unique='9195ef08761f921c322a716aa1db61a5', priority=None, background=False, state='COMPLETE', timed_out=False>
6
如果打开终端D,运行worker.py,此时,会有两个worker处理任务sum和str2upper,job server在收到任务后会进行合理地选择,然后将通过IP和port将任务发给某一个worker。
分析worker.py
gearmand的默认端口为4730。在worker.py中gm_worker.register_task()
用来注册worker来处理指定的任务,例如:
gm_worker.register_task('sum', sum_worker)
意味着,任务sum使用函数sum_worker()
处理。函数sum_worker()
有两个参数,其通过第二个参数gearman_job
获取client传递的参数数据,即gearman_job.data
,这是一个字符串。参数需要使用str传递,如果参数不是str则需要转换为str。str2upper_worker()
便使用了pickle库将list和str互相转换。
gm_worker.work()
负责让这个worker工作起来。
分析client.py
client.py中gm_client
是类gearman.GearmanClient()的一个实例。函数gm_client.submit_job()
用来将任务名和任务参数传递给job server。
如果任务参数不是字符串
上面有讲过,如何任务的参数不是str,则需要设法将其转换为str,例如使用pickle。 另外一种方法,就是给gearman.GearmanClient()和gearman.GearmanWorker()定义一个“编码/解码器”。
为worker.py加上编码/解码器:
import gearman
import pickle
class PickleDataEncoder(gearman.DataEncoder):
@classmethod
def encode(cls, encodable_object):
return pickle.dumps(encodable_object)
@classmethod
def decode(cls, decodable_string):
return pickle.loads(decodable_string)
class MyGearmanWorker(gearman.GearmanWorker):
data_encoder = PickleDataEncoder
gm_worker = MyGearmanWorker(['localhost:4730'])
def sum_worker(gearman_worker, gearman_job):
print gearman_job
print gearman_job.data
print type(gearman_job.data)
return sum(gearman_job.data)
def str2upper_worker(gearman_worker, gearman_job):
print gearman_job
print gearman_job.data
print type(gearman_job.data)
return gearman_job.data.upper()
gm_worker.register_task('sum', sum_worker)
gm_worker.register_task('str2upper', str2upper_worker)
gm_worker.work()
为client.py加上编码/解码器:
import gearman
import pickle
class PickleDataEncoder(gearman.DataEncoder):
@classmethod
def encode(cls, encodable_object):
return pickle.dumps(encodable_object)
@classmethod
def decode(cls, decodable_string):
return pickle.loads(decodable_string)
class MyGearmanClient(gearman.GearmanClient):
data_encoder = PickleDataEncoder
gm_client = MyGearmanClient(['localhost:4730'])
request = gm_client.submit_job("str2upper", "hello world")
print request
print request.result
request = gm_client.submit_job("sum", [1,2,3])
print request
print request.result
资料
[1] 任务调度程序Gearman http://www.oschina.net/p/gearman
[2] gearman官网 http://gearman.org/
[3] python-gearman http://pythonhosted.org//gearman/