文章目录
  1. 1. 一些学习资源:
  2. 2. 简单用法
  3. 3. 使用gevent来实现并行查询数据
  4. 4. 后记

花了点时间学习gevent, 目前只了解其基本用法,但还不清楚实现原理。

一些学习资源:

简单用法

下面的代码来自Gevent Tutorial. 首先定义两函数foo()bar()来实现具体的操作,再由gevent.spawn()创建Greenlet对象来执行foo()/bar(),最后gevent.joinall()来等待两greenlet执行结束。foo()/bar()中调用gevent.sleep(0)来显式切换到不同的Greenlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gevent
def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')
def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])

要点:

  • 使用gevent.spawn()直接创建Greenlet来新建执行单元
  • join()函数来等待所有执行单元结束
  • 执行的函数里在需要等待时能切换到其它,比如gevent.sleep(0), 或是网络等待事件
  • Monkey patching打补丁来支持gevent, 如:
1
2
>>> from gevent import monkey; monkey.patch_socket()
>>> import urllib2 # it is usable from multiple greenlets now
  • 提供一些数据结构来支持Greenlet之间的通信,如gevent.queue.Queue

使用gevent来实现并行查询数据

作为练习,实现的功能是手机号码归属地查询。代码仅为学习交流之用,故隐藏了查询所使用的url, 切勿用于其它目的,且本人不对使用产生的任何后果负责。

  • tasks = Queue()用于存放phone number, leadWorker()生成号码放在tasks中,queryWorker()tasks中取出一个号码查询
  • 查询的结果放入resultQue = Queue()writeWorker()会读取该queue的内容然后写入文件
  • 1个leadWorker()和10个queryWorker()同时工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import gevent;
from gevent import monkey; monkey.patch_socket(); monkey.patch_ssl()
from gevent.queue import Queue, Empty
import urllib2
import json
import random
REQ_URL = 'https://....xxx.com/callback?...&phone={0}&...'
tasks = Queue()
resultQue = Queue()
runningQue = Queue()
completed = False
def queryWorker(n):
runningQue.put(n)
try:
while not tasks.empty() or not completed:
try:
phn = tasks.get(timeout = 30)
except Empty:
continue
print n, ' - ', phn
req = urllib2.Request(REQ_URL.format(phn), headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) Chrome/39.0.2171.71'})
resp = urllib2.urlopen(req)
resp_content = resp.read()
if len(resp_content) > 0:
i = resp_content.find('(')
if i > 0:
j = resp_content.rfind(')')
jsonData = json.loads(resp_content[i+1:j])
if jsonData['data']:
line = '{0}|{1}|{2}|{3}'.format(phn, jsonData[u'data'][u'operator'].encode('utf-8'), jsonData[u'data'][u'area'].encode('utf-8'), jsonData[u'data'][u'area_operator'].encode('utf-8'))
resultQue.put(line)
gevent.sleep(0.2)
finally:
runningQue.get(timeout = 1)
print n, ' Completed'
def leadWorker():
global completed
n = 0
for i in xrange(10):
for j in xrange(10000):
t = ''.join([str(random.randint(0,9)) for _ in range(4)])
phn = '13%d%04d%s'%(i, j, t)
tasks.put(phn)
n += 1
if n == 100:
gevent.sleep(30)
n = 0
completed = True
print 'Lead Worker Completed'
def writeWorker():
global resultQue, runningQue
fd = open('phn_area.txt', 'w')
try:
while not resultQue.empty() or not runningQue.empty():
try:
line = resultQue.get(timeout = 1)
except Empty:
continue
fd.write(line)
fd.write('\n')
finally:
fd.close()
workers = [gevent.spawn(queryWorker, i) for i in range(10)]
workers.append(gevent.spawn(leadWorker))
workers.append(gevent.spawn(writeWorker))
gevent.joinall(workers)
#fix: gevent.hub.LoopExit: This operation would block forever
#resultQue.put(StopIteration)
#fd = open('phn_area.txt', 'w')
#for line in resultQue:
# fd.write(line)
# fd.write('\n')
#fd.close()
print 'All Completed'

后记

尽管添加了try-catch,但在获取1万个号码地区时还是遇到了一些问题。代码运行开始时设定的worker数是10个,但在接近完成一半时发现有些worker不打印log了:第一次结尾时有5个worker打印出完成,第二次时只有一个了。可能是有等待,两次都没有正常退出,只好ctrl+c强制结束了。这是一个问题,需要解决。

后来看到有错误Connection timed out。尽管有try-finally,但看样子是没catch住。

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/gevent/greenlet.py", line 327, in run
    result = self._run(*self.args, **self.kwargs)
  File "phn.py", line 29, in queryWorker
    resp = urllib2.urlopen(req)
  File "/usr/lib/python2.7/urllib2.py", line 127, in urlopen
    return _opener.open(url, data, timeout)
  File "/usr/lib/python2.7/urllib2.py", line 404, in open
    response = self._open(req, data)
  File "/usr/lib/python2.7/urllib2.py", line 422, in _open
    '_open', req)
  File "/usr/lib/python2.7/urllib2.py", line 382, in _call_chain
    result = func(*args)
  File "/usr/lib/python2.7/urllib2.py", line 1222, in https_open
    return self.do_open(httplib.HTTPSConnection, req)
  File "/usr/lib/python2.7/urllib2.py", line 1184, in do_open
    raise URLError(err)
URLError: 
 failed with URLError
文章目录
  1. 1. 一些学习资源:
  2. 2. 简单用法
  3. 3. 使用gevent来实现并行查询数据
  4. 4. 后记