任务队列:gearman

介绍

gearman是一个通用的应用程序框架,用来在机器或进程之间分发任务。它支持并发的执行任务,负载均衡处理,和多语言系统集成。gearman能够应用的领域非常广泛,比如:高可用的网站,数据库复制事件的传输等。


原理

gearman的请求处理过程涉及三个角色:client->jobserver->worker:

  • client
    请求的发起者,可以是C,PHP,Perl,MySQL UDF等等。
  • jobserver
    请求的调度者,负责协调把client发出的请求转发给合适的worker。
  • worker
    (可以注册处理函数的、)请求的处理者,可以是C,PHP,Perl等等。

gearman的工作原理图如下:
gearman原理图 从上图可以看出,client api,jobserver,worker api都是gearman本身提供的,并且client api和worker api都很丰富。开发者只需要实现client应用程序和worker应用程序。


典型的部署架构

gearman部署架构


特性

  • 高可用
    可以启动多个jobserver,它们都是独立的服务进程(无法感知彼此的存在),有各自的内存队列,这样当某个jobserver出现故障时,其他的jobserver可以正常调度,由worker api和client api来自动的完成jobserver的fail over(在建立到jobserver的连接的时候,client api和worker api都支持传入一个jobserver的列表,其中client api会随机的选择一个可用的jobserver;worker api会连接到所有可用的jobserver,并且在jobserver重启之后,worker api会自动重新注册)
    gearman不会让正在被执行的任务丢失,由于worker和jobserver之间是长连接的,所以一旦worker发生异常,jobserver能够迅速感知,并重新派发这个异常的worker刚才执行的任务。
  • 负载均衡
    jobserver不会主动给worker分派任务。而是在worker从空闲状态被唤醒之后,到jobserver主动抓取工作任务。
  • 高性能
    作为gearman的核心,jobserver是由c/c++实现的,由于只做简单的任务派发,因此系统的瓶颈不会出现在jobserver上。
  • 分布式
    gearman是分布式的任务分发框架,worker与jobserver,client与jobserver的连接都是基于tcp的socket连接。
  • 可扩展
    因为松耦合的接口和无状态的job,所以只需要启动一个worker,注册到jobserver集群即可。新加入的worker不会对现有集群有任何的影响。甚至可以通过增加更多的worker,很方便的实现分布式负载均衡架构。
  • 多语言系统集成
    gearman支持的语言种类非常丰富,让我们能够用一种语言来编写Worker程序,但是用另外一种语言编写Client程序。所以有利于多语言系统之间的集成。
  • 队列机制
    gearman是把任务存储在本地的内存队列(哈希表)中的。默认情况,这个哈希表的桶的数量是991(可以通过gearmand的--hashtable-buckets arg (=991)选项来修改哈希桶的数量),最多可存储3M个任务,如果队列中任务的数量超过了300万,那么应该使用更多的哈希桶(991 * job的数量 / 3M),比如说为了容纳2^32个任务,那么应该将--hashtable-buckets arg (=991)设置为1733003,这会额外消耗大约26M的内存。gearman无法支持在队列中的任务数超过2^32个。

后台任务 和 一般工作任务

后台任务时序图:
后台任务时序图 client提交完任务,jobserver成功接收并返回给client JOB_CREATED消息之后,client就断开与jobserver之间的连接了。在后台任务中,client是无法从jobserver中获取到任务的执行结果的。

一般工作任务时序图:
一般任务时序图 client在job执行的整个过程中,与jobserver都是保持连接的,因此,在一般的工作任务中,client是可以从jobserver中获取到任务的执行结果的。
同时,client端还可以发起job status的查询。当然,这需要worker端的支持。(后面会有一个python的例子)


worker的工作流程

  • worker通过CAN_DO消息,注册到jobserver上。
  • worker发起GRAB_JOB,主动要求分派任务。
  • jobserver如果没有job可分配,则返回NO_JOB
  • worker收到NO_JOB后,进入空闲状态,并给jobserver返回PRE_SLEEP消息,告诉jobserver:“如果有工作来的话,先用NOOP请求我。”
  • jobserver收到worker的PRE_SLEEP消息后,知道了发送这条消息的worker已经进入了空闲态。
  • 如果有job提交上来,jobserver会先给worker发送一个NOOP消息。
  • worker收到NOOP消息后,发送GRAB_JOB向jobserver请求任务。
  • jobserver把工作派发给worker。
  • worker执行任务,完成之后返回WORK_COMPLETE给jobserver。

安装

  • 安装依赖:
    • 安装gperf:
      wget http://ftp.gnu.org/gnu/gperf/gperf-3.0.4.tar.gz tar zxvf gperf-3.0.4.tar.gz cd gperf-3.0.4 ./configure && make && sudo make install
    • 安装libevent
      sudo yum install -y libevent-devel
    • 安装libuuid
      sudo yum install -y libuuid-devel
  • https://launchpad.net/gearmand/+download 下载最新的稳定版本的gearman jobserver(也就是gearmand)目前是1.1.12 release
  • tar zxvf gearmand-1.1.12.tar.gz
  • cd gearmand-1.1.12
  • ./configure
  • make && sudo make install

常见问题:

  • 提示找不到boost>=1.39,可能是没安装gcc-c++,有的机器可能安装了gcc,但是没有安装gcc-c++,可使用命令:yum install gcc-c++来安装。
  • 安装后启动不成功,在日志文件看到000000 [ main ] socket()(Address family not supported by protocol) -> libgearman-server/gearmand.cc:470错误,解决办法是启动时,添加参数-L <your ip>,限定只绑定ipv4地址,忽略ipv6。

使用gearman的例子

  • 启动jobserver:
    gearmand -L 127.0.0.1 -p 4730 -r http --http-port 8080 --verbose DEBUG -d -l gearman.log -P gearman.pid -R -w 10
  • 编写worker应用程序(python版):
"""
gm_worker.py  
"""
import time  
import gearman

gm_worker = gearman.GearmanWorker(['127.0.0.1:4730'])

def task_listener_reverse_inflight(gearman_worker, gearman_job):  
    reversed_data = list(reversed(gearman_job.data))
    total_chars = len(reversed_data)

    for idx, character in enumerate(reversed_data):
        gearman_worker.send_job_data(gearman_job, str(character))
        gearman_worker.send_job_status(gearman_job, idx + 1, total_chars)
        time.sleep(0.1)

    return 'successul'

gm_worker.register_task('reverse', task_listener_reverse_inflight)  
gm_worker.work()
  • 编写client应用程序(python版本):
"""
gm_client.py  
"""
import time  
from gearman import GearmanClient

gearman_client = GearmanClient(['127.0.0.1:4730'])  
gearman_request = gearman_client.submit_job('reverse', 'test gearman', wait_until_complete=False, background=False)

while gearman_client.get_job_status(gearman_request).state != 'COMPLETE':  
    time.sleep(0.2)
    print 'status:', gearman_request.status
    print 'data:', gearman_request.data_updates
    print '\n'
  • 启动worker和client:
python gm_worker.py &  
python gm_client.py  

client应用程序会在标准输出打印类似的信息:

$python gm_client.py 
status: {'handle': 'H:e010125026018.bja:14', 'denominator': 0, 'numerator': 0, 'running': False, 'known': True, 'time_received': 1447068562.876595}  
data: deque([])


status: {'handle': 'H:e010125026018.bja:14', 'denominator': 12, 'numerator': 2, 'running': True, 'known': True, 'time_received': 1447068563.077448}  
data: deque(['n', 'a'])


status: {'handle': 'H:e010125026018.bja:14', 'denominator': 12, 'numerator': 4, 'running': True, 'known': True, 'time_received': 1447068563.278444}  
data: deque(['n', 'a', 'm', 'r'])


status: {'handle': 'H:e010125026018.bja:14', 'denominator': 12, 'numerator': 6, 'running': True, 'known': True, 'time_received': 1447068563.479357}  
data: deque(['n', 'a', 'm', 'r', 'a', 'e'])


status: {'handle': 'H:e010125026018.bja:14', 'denominator': 12, 'numerator': 8, 'running': True, 'known': True, 'time_received': 1447068563.680194}  
data: deque(['n', 'a', 'm', 'r', 'a', 'e', 'g', ' '])


status: {'handle': 'H:e010125026018.bja:14', 'denominator': 12, 'numerator': 10, 'running': True, 'known': True, 'time_received': 1447068563.881131}  
data: deque(['n', 'a', 'm', 'r', 'a', 'e', 'g', ' ', 't', 's'])


status: {'handle': 'H:e010125026018.bja:14', 'denominator': 12, 'numerator': 12, 'running': True, 'known': True, 'time_received': 1447068564.082074}  
data: deque(['n', 'a', 'm', 'r', 'a', 'e', 'g', ' ', 't', 's', 'e', 't'])


在非后台任务中,gearman支持在任务真正完成之前,就向客户端发送数据或更新job status,gearman client使用一些队列来追踪,何时收到了哪些特定的更新。
这个例子就展示了,在非后台任务中,client端如何对job status进行查询。尤其需要注意gm_worker.py的编写,和gm_client.py中发起任务请求时设置的wait_until_completebackground参数。


监控

  • gearadmin
    该工具是在0.33版本以后增加的功能
$gearadmin --help
Options:  
  --help                         Options related to the program.
  -h [ --host ] arg (=localhost) Connect to the host
  -p [ --port ] arg (=4730)      Port number or service to use for connection
  --server-version               Fetch the version number for the server.
  --server-verbose               Fetch the verbose setting for the server.
  --create-function arg          Create the function from the server.
  --cancel-job arg               Remove a given job from the server's queue
  --drop-function arg            Drop the function from the server.
  --show-unique-jobs             Show unique jobs on server.
  --show-jobs                    Show all jobs on the server.
  --getpid                       Get Process ID for the server.
  --status                       Status for the server.
  --workers                      Workers for the server.
  --shutdown                     Shutdown server.
  -S [ --ssl ]                   Enable SSL connections.
  • telnet
    telnet直接连接到gearmand,查看jobserver状态:
[root@iZ23dastruaZ ~]# telnet 127.0.0.1 4730
Trying 127.0.0.1...  
Connected to 127.0.0.1.  
Escape character is '^]'.  
workers    #显示连接的worker及注册的函数  
31 127.0.0.1 - :  
32 127.0.0.1 - : echo echo2  
.
status  #Function Name,Jobs in queue,Jobs running,Workers registered  
echo2    0   0   1  
echo    0   0   1  
.
maxqueue echo 1000    #限制某个函数在队列中的最大任务数,第三个参数是可省的,表示不限制  
OK  
version    #jobserver的版本  
OK 1.1.12  
shutdown graceful    #graceful代表优雅关闭,否则强制关闭。  
OK  
verbose    #log级别  
OK DEBUG  

python_gearman_admin_client.py

import gearman

gm_admin_client = gearman.GearmanAdminClient(['localhost:4730'])

print gm_admin_client.get_status()  
print gm_admin_client.get_version()  
print gm_admin_client.get_workers()  
print gm_admin_client.ping_server()  


输出结果:

({'workers': 1, 'running': 0, 'task': 'echo2', 'queued': 0}, {'workers': 1, 'running': 0, 'task': 'echo', 'queued': 0})
OK 1.1.12  
({'file_descriptor': '31', 'tasks': ('echo', 'echo2'), 'client_id': '-', 'ip': '127.0.0.1'}, {'file_descriptor': '32', 'tasks': (), 'client_id': '-', 'ip': '127.0.0.1'})
0.000275850296021  

持久化队列

在gearman jobserver内部,job是存储在内存的哈希表中的,这意味着一旦jobserver重启或崩溃,未执行的job会丢失而不会被worker执行。
持久化队列是通过在将任务放到(用于存储要被执行的job的)内部队列之前,调用一个模块回调函数,用某种持久化的方式存储将要执行的job,以便在jobserver崩溃或重启的时候,job可以在稍后重新执行。在job通过这个模块被存储之后,它会被放进队列,如果需要的话,也会唤醒可用的worker。
在job被worker成功执行之后,(jobserver)会调用另外一个模块回调函数,通知模块,这个job完成了,可以被删除了。
对于一个job来说,如果在这两个回调函数之间,jobserver崩溃或重启了,当下一个jobserver启动的时候,这个job会被重新加载。
当jobserver启动的时候,它会调用这个模块中的replay回调函数,这个回调函数会提供所有未完成的job的列表。这个列表被用于填充内部的(用于存储要被执行的job的)内存队列。当replay完成的时候,jobserver也就完成了它的初始化,此时所有的job都是可运行的(队列的状态应该是与jobserver崩溃时是一样的)。当这些任务被执行完成的时候,它们会被(从持久化队列中)删除。

TIP:  
持久化队列只对后台job有效,因为前台job依附于客户端,如果jobserver崩溃了,客户端将会从其它地方重新启动这个前台job或者返回错误。而后台job没有依附于客户端,如果要想让它在jobserver崩溃后重新执行,则需要持久化队列。

Note:持久化必然影响高性能。

使用mysql作为gearman的持久化队列

  • 创建数据库和表
create database gearman;

create table `gearman_queue` (  
`unique_key` varchar(64) NOT NULL,
`function_name` varchar(255) NOT NULL,
`priority` int(11) NOT NULL,
`data` LONGBLOB NOT NULL,
`when_to_run` INT, PRIMARY KEY  (`unique_key`)
);
  • 在启动gearmand的时候指定下面的参数:
    • -q [ --queue-type ] arg (=builtin):要使用的持久化队列类型,此时应该设置为mysql
    • --mysql-host arg (=localhost):MySQL服务器监听的主机名或ip地址
    • --mysql-port arg (=3306):MySQL服务器监听的端口,默认是3306
    • --mysql-user arg:MySQL的用户名
    • --mysql-password arg:MySQL的用户的密码
    • --mysql-db arg:存储job的MySQL库名称
    • --mysql-table arg (=gearman_queue):存储job的MySQL表名称

gearman框架中的一个问题

从典型部署结构可以看出,jobserver之间是没有连接的,也就是jobserver间是不共享background job的。如果通过让多个jobserver指向同一个持久化队列,是否可以实现的jobserver的互相备份呢?答案是否定的,因为jobserver只有在启动时才会将持久化队列中的background job转入到内存队列,也就是说,jobserver1如果宕机且永远不启动,jobserver2一直正常运行,那么在jobserver1宕机前被提交到它的未被执行的background job将永远都待在持久化队列中,得不到执行。另外如果多个job server实例指向同一个持久化队列,同时重启多个jobserver实例会导致持久化队列中的工作任务被多次载入,从而导致消息重复处理。

一个建议的部署架构 建议的部署架构 在这种部署架构下,需要对jobserver进行监控,在jobserver异常退出之后,进行重启,最大化的保证了jobserver的高可用。


性能优化

  • 线程模型
    可以通过gearmand的-t选项指定多个I/O线程,在jobserver内部有三种类型的线程:
    • Listening and management thread - 只有一个
    • I/O thread - 可以有多个(性能调优的主要目标就是I/O线程的数量)
    • Processing thread - 只有一个

-t选项的默认值是4,-t 0意味着gearmand尝试猜测它所能够使用的最大I/O线程的数量。

监听和管理线程主要负责接受新的连接,并把这些连接分派给I/O线程。它也会协调jobserver的启动和关闭。这个线程会使用一个libevent的实例来管理socket事件和内部管道上的信号。这个管道用于唤醒线程或协调关闭。
I/O线程负责socket上的读写系统调用和最初的包解析。当包被解析完成的时候,它会被放到一个(给处理线程使用的)内部队列(每个线程都有它自己的队列,因此只有非常少的连接)。每个I/O线程都使用它自己的libevent实例管理socket事件和内部管道上的信号。
处理线程内部应该没有系统调用,它负责管理许多列表和用于追踪唯一的key的哈希表,任务操作,函数,和任务队列。所有需要发送回连接的包都会被放进一个用于I/O线程的异步队列,I/O线程会取出这些数据包,并把他们发回被连接的socket。所有的包都会通过处理线程,因为它包含处理数据包所需要的信息。由于列表和哈希表复杂的“天性”,如果多个线程修改它们,那么过度的锁会导致性能可能比单线程还差(同时也会使代码复杂化)。未来更多的工作会交给I/O线程处理,处理线程只会保留管理哈希表和列表这些极少的功能。到目前为止还没有发现瓶颈,一个16核的Intel机器每秒能处理超过50K个任务。
为了保证在多线程的情况下生成的UUID的唯一性,那么应该运行uuidd守护进程。

sudo yum install -y uuidd  
service uuidd restart  
  • 进程句柄数
    gearmand会为每个注册的worker和client分配一个fd(文件描述符)。而fd的总数是受用户限制的,可以使用ulimit -n来查看当前的限制:
$ulimit -n
4096  


当fd用光之后,worker和client就会出现连接超时或无响应等异常情况。因此当发生类似情况时,应该首先检查/proc/[PID]/fd目录下文件描述符的数量,是否达到了ulimit -n的限制,并根据需要进行调整(请参考Linux increase the maximum number of open files or file descriptors)。
启动gearmand时可以通过-f [ --file-descriptors ] arg选项来设置gearmand进程的最大文件描述符数量,但是非特权用户不能设置soft limit的值。

-f [ --file-descriptors ] arg
Number of file descriptors to allow for the process (total connections will be slightly less). Default is max allowed for user.  
  • 轮询调度
    默认情况下,gearmand按照函数被worker注册的顺序,给每个worker连接分配任务。可以通过开启-R [ --round-robin ]选项,使gearmand按照round-robin 模式给worker连接分派任务,避免工作过于集中在某些设备上。

  • 受限唤醒
    通过-w [ --worker-wakeup ] arg (=0)选项,可以指定gearmand在收到任务时,唤醒多少个worker来处理,避免在worker数量非常大时,发送大量不必要的NOOP报文,试图唤醒所有的worker。
    根据gearman协议设计,worker 如果发现队列中没有任务需要处理,可以通过发送PRE_SLEEP命令给gearmand,告知gearmand自己将进入睡眠状态。在这个状态下,worker不会再去主动抓取任务,只有服务器发送NOOP命令唤醒后,才会恢复正常的任务抓取和处理流程。因此gearmand在收到任务时,会去尝试唤醒足够的worker来抓取任务;此时如果worker的总数超过可能的任务数,则有可能产生惊群效应。


HTTP协议插件

HTTP协议插件可以把HTTP请求映射成gearman任务,当前它只提供了客户端任务提交,但是将来它可能会被扩展成支持其他的请求类型。这个插件既能处理get请求,也能处理post请求,后者可以用来给jobserver发送一个workload,被请求的URL会被转换成被调用的函数。
比如,下面的这个请求:

POST /reverse HTTP/1.1  
Content-Length: 12

Hello world!  


它会被转换成一个任务提交请求:调用的函数是reverse,workload是Hello world!,响应是:

HTTP/1.0 200 OK  
X-Gearman-Job-Handle: H:lap:4  
Content-Length: 12  
Server: Gearman/0.8

!dlrow olleH


可以传递下面的请求头,来改变任务的行为:

* X-Gearman-Unique: <unique key>
* X-Gearman-Background: true
* X-Gearman-Priority: <high|low>


比如说,想要运行一个低优先级的后台任务,可以发送下面的请求:

POST /reverse HTTP/1.1  
Content-Length: 12  
X-Gearman-Background: true  
X-Gearman-Priority: low

Hello world!  


这个请求的响应,不会有任何相关的数据,因为它是一个后台任务:

HTTP/1.0 200 OK  
X-Gearman-Job-Handle: H:lap:6  
Content-Length: 0  
Server: Gearman/0.8  


!!!http协议应该被认为是实验性的!!!

TIPS:  
我在最新的release版本1.1.12中,对http协议插件进行测试的结果是:
1,该插件确实既支持get请求,也支持post请求,并且post请求可以给jobserver传递workload。  
2,当传递`X-Gearman-Background: true`请求头的时候,客户端会阻塞,无法获取到响应。换言之:在该版本中,并不支持后台任务!  
3,当调用前台任务的时候,并不能通过响应的body中获取到任务的执行结果,因为响应的body是空的,同时`Content-Length`的值是`0`。但是`X-Gearman-Command: WORK_COMPLETE`表明任务已经成功执行完成了。

之后在0.8版本中,对http协议插件进行测试,测试的效果与官网描述的一致。

重试失败的job

gearman不支持把失败的job重新放回队列,当任务在执行过程中出现异常,它只是将这个任务标记为“失败”,不会重新派发给其他的worker。这在worker遇到某些可恢复的错误,需要重试的情况下,是非常不友好的。下面是我总结的两个解决方案,可供参考:

  • 在worker执行job期间,如果worker异常退出,gearman会将这个job重新派发,并且jobserver在启动的时候,支持使用-j [ --job-retries ] arg (=0)选项设置:在jobserver移除这个job之前,允许的最大重试次数。所以使用这种方式重试的话,那么需要做的是:
    • 使用进程管理工具,比如supervisord,来重启异常退出的worker。
    • 启动gearmand的时候,给-j [ --job-retries ] arg (=0)选项设置一个合理的值。
  • 在job执行失败,worker向jobserver发送job status的时候,会调用相关的回调函数(比如在python的client中这个回调函数叫on_job_exception(self, current_job, exc_info)),可以在这个回调函数中,重新发送相同的任务。但是需要自己控制重试次数。

参考资料

感谢浏览tim chow的作品!

如果您喜欢,可以分享到: 更多

如果您有任何疑问或想要与tim chow进行交流

可点此给tim chow发信

如有问题,也可在下面留言: