Python学习笔记 - 旧笔记
函数
默认参数
- 默认参数必须指向不变对象!(如若可变多次调用会产生不确定的结果)
- 在调用函数时普通参数和默认参数都可以用:
fun(参数名=参数值,参数名=参数值....) // 顺序可以调换.
可变参数
def fun(*args): // 定义,args is tuple
pass
fun(1,2,3) // 直接传参调用
fun(*l) // l is list or tuple
关键字参数
def fun(**kw): // 定义,kw is dict
pass
fun(key_name1=value1,ke_name2=value2)
fun(**dic) // dic is dict,传入的参数只是dic的拷贝
命名关键词参数
def fun(arg1,*,city,job):
pass
fun('arg1',city='beijing',job='无') // 必须传入参数名,顺序可以调换
def fun2(arg1,*args,city,job) // 此时不需要*了,原因可想而知.
参数定义顺序
必选参数、默认参数、可变参数、命名关键字参数和关键字参数。
尾递归
在函数返回的时候,调用自身本身,并且,return语句不能包含表达式。这样,编译器或者解释器就可以把尾递归做优化,使递归本身无论调用多少次,都只占用一个栈帧,不会出现栈溢出的情况。(Python不优化…)
循环
判断一个对象是否可迭代
from collections import Iterable isinstance('abc', Iterable) # str是否可迭代
迭代索引+value
for i, value in enumerate(['A', 'B', 'C']): print(i, value)
列表生成器
l1 = list(range(1, 11)) l2 = [x * x for x in range(1, 11)] l3 = [x * x for x in range(1, 11) if x % 2 == 0] l4 = [m + n for m in 'ABC' for n in 'XYZ']
generator
通过算法生成下一个迭代值,省内存.
g = (x * x for x in range(10)) # 生成generator next(g) # 通过next函数获取g的下一个元素,当没有元素时会抛出异常
如果一个函数定义中包含yield关键字,那么这个函数就不再是一个普通函数,而是一个generator.每次调用next()函数执行,遇到yield返回,再次执行从上一次yield之后执行.
Iterator:可以调用next()的对象成为Iterator对象.
i = iter([1, 2, 3, 4, 5]) # 取得Iterator对象.
函数式编程
map
r = map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9]) # r is Iterator l = list(r) # 转换为list
reduce
效果:
reduce(f, [x1, x2, x3, x4]) = f(f(f(x1, x2), x3), x4)
应用:
from functools import reduce def fn(x, y): return x * 10 + y reduce(fn, [1, 3, 5, 7, 9]) # 生成13579
filter
sorted
sorted([36, 5, -12, 9, -21], key=abs) # abs函数作用于每个元素
闭包:返回函数不要引用任何循环变量,或者后续会发生变化的变量。
def count(): fs = [] for i in range(1, 4): def f(): return i*i fs.append(f) return fs f1, f2, f3 = count() #9,9,9 def count(): def f(j): def g(): return j*j return g fs = [] for i in range(1, 4): fs.append(f(i)) # f(i)立刻被执行,因此i的当前值被传入f() return fs
lambda 参数名,参数名 : 表达式返回值
decorator
# 两层调用
def log(func):
def wrapper(*args, **kw):
print('call %s():' % func.__name__)
return func(*args, **kw)
return wrapper
@log #相当于now = log(now)
def now():
print('2015-3-25')
# 三层调用
def log(text):
def decorator(func):
def wrapper(*args, **kw):
print('%s %s():' % (text, func.__name__))
return func(*args, **kw)
return wrapper
return decorator
@log('execute') #相当于now = log('execute')(now)
def now():
print('2015-3-25')
偏函数
import functools
int2 = functools.partial(int, base=2) # 返回的函数base=2
模块
1. 一个py文件就是一个模块.
2. 一个目录里面包含__init__.py,这个目录就是一个包(package).
3. 类似_xxx和__xxx这样的函数或变量就是非公开的(private),不应该被直接引用,比如_abc,__abc等;
class
实例的变量名如果以__开头,就变成了一个私有变量(private),只有内部可以访问,外部不能访问.
实的变量名如果以_开头,外部可以访问,但是视为private.
鸭子类型
type()
>>> import types >>> def fn(): ... pass ... >>> type(fn)==types.FunctionType True >>> type(abs)==types.BuiltinFunctionType True >>> type(lambda x: x)==types.LambdaType True >>> type((x for x in range(10)))==types.GeneratorType True
isinstance()
>>> isinstance([1, 2, 3], (list, tuple)) True >>> isinstance((1, 2, 3), (list, tuple)) True
测试对象属性和方法
dir('ABC') #获取对象的属性和方法 >>> hasattr(obj, 'x') # 有属性'x'吗? True >>> obj.x 9 >>> hasattr(obj, 'y') # 有属性'y'吗? False >>> setattr(obj, 'y', 19) # 设置一个属性'y' >>> hasattr(obj, 'y') # 有属性'y'吗? True >>> getattr(obj, 'y') # 获取属性'y' 19 >>> obj.y # 获取属性'y' 19 >>> getattr(obj, 'z', 404) # 获取属性'z',如果不存在,返回默认值404 404
__slots__:限制属性
class Student(object): __slots__ = ('name', 'age') # 用tuple定义允许绑定的属性名称
@property:把一个方法变成属性的调用
class Student(object): @property def score(self): return self._score @score.setter def score(self, value): if not isinstance(value, int): raise ValueError('score must be an integer!') if value < 0 or value > 100: raise ValueError('score must between 0 ~ 100!') self._score = value
__XXX__
__len__():调用len()时调用
__str__():类似toString
__repr__():为调试服务的toString
__iter__():该方法返回一个迭代对象,然后,Python的for循环就会不断调用该迭代对象的__next__()方法拿到循环的下一个值
class Fib(object): def __init__(self): self.a, self.b = 0, 1 # 初始化两个计数器a,b def __iter__(self): return self # 实例本身就是迭代对象,故返回自己 def __next__(self): self.a, self.b = self.b, self.a + self.b # 计算下一个值 if self.a > 100000: # 退出循环的条件 raise StopIteration() return self.a # 返回下一个值
__getitem__(n):
# 实现切片 class Fib(object): def __getitem__(self, n): if isinstance(n, int): # n是索引 a, b = 1, 1 for x in range(n): a, b = b, a + b return a if isinstance(n, slice): # n是切片 start = n.start stop = n.stop if start is None: start = 0 a, b = 1, 1 L = [] for x in range(stop): if x >= start: L.append(a) a, b = b, a + b return L
此外,如果把对象看成dict,getitem()的参数也可能是一个可以作key的object,例如str。与之对应的是__setitem__()方法,把对象视作list或dict来对集合赋值。最后,还有一个__delitem__()方法,用于删除某个元素。
__getattr__(self, attr):当获取属性时没有找到,就调用它.
__call__():可以实现实例本身调用.
# 判断对象是否可以实例调用 >>> callable(Student()) True >>> callable(max) True >>> callable([1, 2, 3]) False >>> callable(None) False >>> callable('str') False
枚举类
定义
from enum import Enum Month = Enum('Month', ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'))
遍历
for name, member in Month.__members__.items(): print(name, '=>', member, ',', member.value) # value属性则是自动赋给成员的int常量,默认从1开始计数。
自定义枚举类
from enum import Enum, unique @unique class Weekday(Enum): Sun = 0 # Sun的value被设定为0 Mon = 1 Tue = 2 Wed = 3 Thu = 4 Fri = 5 Sat = 6
访问枚举类型
>>> day1 = Weekday.Mon
>>> print(day1)
Weekday.Mon
>>> print(Weekday.Tue)
Weekday.Tue
>>> print(Weekday['Tue'])
Weekday.Tue
>>> print(Weekday.Tue.value)
2
>>> print(day1 == Weekday.Mon)
True
>>> print(day1 == Weekday.Tue)
False
>>> print(Weekday(1))
Weekday.Mon
>>> print(day1 == Weekday(1))
True
>>> Weekday(7)
Traceback (most recent call last):
...
ValueError: 7 is not a valid Weekday
>>> for name, member in Weekday.__members__.items():
... print(name, '=>', member)
...
Sun => Weekday.Sun
Mon => Weekday.Mon
Tue => Weekday.Tue
Wed => Weekday.Wed
Thu => Weekday.Thu
Fri => Weekday.Fri
Sat => Weekday.Sat
type():动态创建类
>>> def fn(self, name='world'): # 先定义函数
... print('Hello, %s.' % name)
...
>>> Hello = type('Hello', (object,), dict(hello=fn)) # 创建Hello class
>>> h = Hello()
>>> h.hello()
Hello, world.
>>> print(type(Hello))
<class 'type'>
>>> print(type(h))
<class '__main__.Hello'>
要创建一个class对象,type()函数依次传入3个参数:
1. class的名称;
2. 继承的父类集合,注意Python支持多重继承,如果只有一个父类,别忘了tuple的单元素写法;
3. class的方法名称与函数绑定,这里我们把函数fn绑定到方法名hello上。
metaclass
metaclass的类名总是以Metaclass结尾,以便清楚地表示这是一个metaclass.
示例:
# metaclass是类的模板,所以必须从`type`类型派生: class ListMetaclass(type): def __new__(cls, name, bases, attrs): attrs['add'] = lambda self, value: self.append(value) return type.__new__(cls, name, bases, attrs) class MyList(list, metaclass=ListMetaclass): pass
__new__()方法接收到的参数依次是:
1. 当前准备创建的类的对象; 2. 类的名字; 3. 类继承的父类集合; 4. 类的方法集合;
调试
异常处理:
try: print('try...') r = 10 / int('2') print('result:', r) except ValueError as e: print('ValueError:', e) except ZeroDivisionError as e: print('ZeroDivisionError:', e) else: print('no error!') finally: print('finally...') print('END')
assert(启动Python解释器时可以用-O参数来关闭assert):
def foo(s): n = int(s) assert n != 0, 'n is zero!' return 10 / n def main(): foo('0')
unittest
import unittest from mydict import Dict class TestDict(unittest.TestCase): def test_init(self): d = Dict(a=1, b='test') self.assertEqual(d.a, 1) self.assertEqual(d.b, 'test') self.assertTrue(isinstance(d, dict)) def test_key(self): d = Dict() d['key'] = 'value' self.assertEqual(d.key, 'value') def test_attr(self): d = Dict() d.key = 'value' self.assertTrue('key' in d) self.assertEqual(d['key'], 'value') def test_keyerror(self): d = Dict() with self.assertRaises(KeyError): value = d['empty'] def test_attrerror(self): d = Dict() with self.assertRaises(AttributeError): value = d.empty
setUp()和tearDown()方法会分别在每调用一个测试方法的前后分别被执行。
IO
简单示例:
# 编码和忽略错误 f = open('/Users/michael/gbk.txt', 'r', encoding='gbk', errors='ignore')
StringIO(在内存中读写str):
>>> from io import StringIO >>> f = StringIO('Hello!\nHi!\nGoodbye!') >>> while True: ... s = f.readline() ... if s == '': ... break ... print(s.strip()) ... Hello! Hi! Goodbye!
BytesIO
>> from io import BytesIO >>> f = BytesIO(b'\xe4\xb8\xad\xe6\x96\x87') >>> f.read() b'\xe4\xb8\xad\xe6\x96\x87'
操作文件和目录
目录
# 查看当前目录的绝对路径: >>> os.path.abspath('.') '/Users/michael' # 在某个目录下创建一个新目录,首先把新目录的完整路径表示出来: >>> os.path.join('/Users/michael', 'testdir') '/Users/michael/testdir' # 然后创建一个目录: >>> os.mkdir('/Users/michael/testdir') # 删掉一个目录: >>> os.rmdir('/Users/michael/testdir') # 拆分路径 >>> os.path.split('/Users/michael/testdir/file.txt') ('/Users/michael/testdir', 'file.txt') # 文件扩展名 >>> os.path.splitext('/path/to/file.txt') ('/path/to/file', '.txt')
文件
# 对文件重命名: >>> os.rename('test.txt', 'test.py') # 删掉文件: >>> os.remove('test.py') ''' 制文件的函数居然在os模块中不存在!原因是复制文件并非由操作系统提供的系统调用。理论上讲,我们通过上一节的读写文件可以完成文件复制,只不过要多写很多代码。 幸运的是shutil模块提供了copyfile()的函数,你还可以在shutil模块中找到很多实用函数,它们可以看做是os模块的补充。 ''' # 列出文件 >>> [x for x in os.listdir('.') if os.path.isfile(x) and os.path.splitext(x)[1]=='.py'] ['apis.py', 'config.py', 'models.py', 'pymonitor.py', 'test_db.py', 'urls.py', 'wsgiapp.py']
序列化
# 序列化 >>> import pickle >>> d = dict(name='Bob', age=20, score=88) >>> pickle.dumps(d) b'\x80\x03}q\x00(X\x03\x00\x00\x00ageq\x01K\x14X\x05\x00\x00\x00scoreq\x02KXX\x04\x00\x00\x00nameq\x03X\x03\x00\x00\x00Bobq\x04u.' # 保存到文件 >>> f = open('dump.txt', 'wb') >>> pickle.dump(d, f) >>> f.close() # 从文件读取 >>> f = open('dump.txt', 'rb') >>> d = pickle.load(f) >>> f.close() >>> d {'age': 20, 'score': 88, 'name': 'Bob'} # JSON print(json.dumps(s, default=lambda obj: obj.__dict__)) # __dict__用来存储实例变量
多进程
linux
import os print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
通用
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.')
进程池
from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.')
子进程
import subprocess print('$ nslookup www.python.org') r = subprocess.call(['nslookup', 'www.python.org']) print('Exit code:', r)
多线程
创建线程:
import time, threading # 新线程执行的代码: def loop(): print('thread %s is running...' % threading.current_thread().name) n = 0 while n < 5: n = n + 1 print('thread %s >>> %s' % (threading.current_thread().name, n)) time.sleep(1) print('thread %s ended.' % threading.current_thread().name) print('thread %s is running...' % threading.current_thread().name) t = threading.Thread(target=loop, name='LoopThread') t.start() t.join() print('thread %s ended.' % threading.current_thread().name)
互斥锁
balance = 0 lock = threading.Lock() def run_thread(n): for i in range(100000): # 先要获取锁: lock.acquire() try: # 放心地改吧: change_it(n) finally: # 改完了一定要释放锁: lock.release()
Global Interpreter Lock:
任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
ThreadLocal:线程之间传递参数,每个线程都有一副本,互不干扰.
import threading # 创建全局ThreadLocal对象: local_school = threading.local() def process_student(): # 获取当前线程关联的student: std = local_school.student print('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread(name): # 绑定ThreadLocal的student: local_school.student = name process_student() t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A') t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join()
常用模块
collections
namedtuple
>>> from collections import namedtuple >>> Point = namedtuple('Point', ['x', 'y']) >>> p = Point(1, 2) >>> p.x 1 >>> p.y 2
deque
# 双向列表 >>> from collections import deque >>> q = deque(['a', 'b', 'c']) >>> q.append('x') >>> q.appendleft('y') >>> q deque(['y', 'a', 'b', 'c', 'x'])
defaultdict
>>> from collections import defaultdict >>> dd = defaultdict(lambda: 'N/A') >>> dd['key1'] = 'abc' >>> dd['key1'] # key1存在 'abc' >>> dd['key2'] # key2不存在,返回默认值 'N/A'
OrderedDict
>>> from collections import OrderedDict >>> d = dict([('a', 1), ('b', 2), ('c', 3)]) >>> d # dict的Key是无序的 {'a': 1, 'c': 3, 'b': 2} >>> od = OrderedDict([('a', 1), ('b', 2), ('c', 3)]) >>> od # OrderedDict的Key是有序的 OrderedDict([('a', 1), ('b', 2), ('c', 3)]) # 根据插入顺序
Counter
>>> from collections import Counter >>> c = Counter() >>> for ch in 'programming': ... c[ch] = c[ch] + 1 ... >>> c Counter({'g': 2, 'm': 2, 'r': 2, 'a': 1, 'i': 1, 'o': 1, 'n': 1, 'p': 1})
base64:将3个字节的二进制编码成四字节对应的字符
>>> import base64 >>> base64.b64encode(b'binary\x00string') b'YmluYXJ5AHN0cmluZw==' >>> base64.b64decode(b'YmluYXJ5AHN0cmluZw==') b'binary\x00string' # url safe >>> base64.b64encode(b'i\xb7\x1d\xfb\xef\xff') b'abcd++//' >>> base64.urlsafe_b64encode(b'i\xb7\x1d\xfb\xef\xff') b'abcd--__' >>> base64.urlsafe_b64decode('abcd--__') b'i\xb7\x1d\xfb\xef\xff'
struct
>>> import struct >>> struct.pack('>I', 10240099) # >表示字节顺序是big-endian,也就是网络序,I表示4字节无符号整数。 b'\x00\x9c@c' >>> struct.unpack('>IH', b'\xf0\xf0\xf0\xf0\x80\x80') # I:4字节无符号整数和H:2字节无符号整数。 (4042322160, 32896)
hashlib
import hashlib md5 = hashlib.md5() md5.update('how to use md5 in python hashlib?'.encode('utf-8')) print(md5.hexdigest()) # 如果数据量过大,可以分块调用update import hashlib md5 = hashlib.md5() md5.update('how to use md5 in '.encode('utf-8')) md5.update('python hashlib?'.encode('utf-8')) print(md5.hexdigest())
itertools
>>> import itertools # 无限迭代 >>> natuals = itertools.count(1) # 重复无限迭代 >>> cs = itertools.cycle('ABC') # 注意字符串也是序列的一种 # 单一元素迭代,可以指定次数 ns = itertools.repeat('A', 3) # 合并迭代 >>> for c in itertools.chain('ABC', 'XYZ'): ... print(c) # groupby()把迭代器中相邻的重复元素挑出来放在一起 >>> for key, group in itertools.groupby('AAABBBCCAAA'): ... print(key, list(group)) ... A ['A', 'A', 'A'] B ['B', 'B', 'B'] C ['C', 'C'] A ['A', 'A', 'A']
contextlib:任何对象,只要正确实现了上下文管理,就可以用于with语句。实现上下文管理是通过__enter__和__exit__这两个方法实现的。
from contextlib import contextmanager class Query(object): def __init__(self, name): self.name = name def query(self): print('Query info about %s...' % self.name) @contextmanager def create_query(name): print('Begin') q = Query(name) yield q print('End') # 用contextmanager实现前后自动执行代码 @contextmanager def tag(name): print("<%s>" % name) yield print("</%s>" % name) with tag("h1"): print("hello") print("world") # 如果一个对象没有实现上下文,我们就不能把它用于with语句。这个时候,可以用closing()来把该对象变为上下文对象。 from contextlib import closing from urllib.request import urlopen with closing(urlopen('https://www.python.org')) as page: for line in page: print(line)
urllib
from urllib import request with request.urlopen('https://api.douban.com/v2/book/2129650') as f: data = f.read() print('Status:', f.status, f.reason) for k, v in f.getheaders(): print('%s: %s' % (k, v)) print('Data:', data.decode('utf-8')) # 加请求头 from urllib import request req = request.Request('http://www.douban.com/') req.add_header('User-Agent', 'Mozilla/6.0 (iPhone; CPU iPhone OS 8_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/8.0 Mobile/10A5376e Safari/8536.25') with request.urlopen(req) as f: print('Status:', f.status, f.reason) for k, v in f.getheaders(): print('%s: %s' % (k, v)) print('Data:', f.read().decode('utf-8')) # post登录 from urllib import request, parse print('Login to weibo.cn...') email = input('Email: ') passwd = input('Password: ') login_data = parse.urlencode([ ('username', email), ('password', passwd), ('entry', 'mweibo'), ('client_id', ''), ('savestate', '1'), ('ec', ''), ('pagerefer', 'https://passport.weibo.cn/signin/welcome?entry=mweibo&r=http%3A%2F%2Fm.weibo.cn%2F') ]) req = request.Request('https://passport.weibo.cn/sso/login') req.add_header('Origin', 'https://passport.weibo.cn') req.add_header('User-Agent', 'Mozilla/6.0 (iPhone; CPU iPhone OS 8_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/8.0 Mobile/10A5376e Safari/8536.25') req.add_header('Referer', 'https://passport.weibo.cn/signin/login?entry=mweibo&res=wel&wm=3349&r=http%3A%2F%2Fm.weibo.cn%2F') with request.urlopen(req, data=login_data.encode('utf-8')) as f: print('Status:', f.status, f.reason) for k, v in f.getheaders(): print('%s: %s' % (k, v)) print('Data:', f.read().decode('utf-8')) # 代理 proxy_handler = urllib.request.ProxyHandler({'http': 'http://www.example.com:3128/'}) proxy_auth_handler = urllib.request.ProxyBasicAuthHandler() proxy_auth_handler.add_password('realm', 'host', 'username', 'password') opener = urllib.request.build_opener(proxy_handler, proxy_auth_handler) with opener.open('http://www.example.com/login.html') as f: pass
PIL(Python Imaging Library)
# 操作图片 from PIL import Image # 打开一个jpg图像文件,注意是当前路径: im = Image.open('test.jpg') # 获得图像尺寸: w, h = im.size print('Original image size: %sx%s' % (w, h)) # 缩放到50%: im.thumbnail((w//2, h//2)) print('Resize image to: %sx%s' % (w//2, h//2)) # 把缩放后的图像用jpeg格式保存: im.save('thumbnail.jpg', 'jpeg') #模糊效果 from PIL import Image, ImageFilter # 打开一个jpg图像文件,注意是当前路径: im = Image.open('test.jpg') # 应用模糊滤镜: im2 = im.filter(ImageFilter.BLUR) im2.save('blur.jpg', 'jpeg') # 生成验证码 from PIL import Image, ImageDraw, ImageFont, ImageFilter import random # 随机字母: def rndChar(): return chr(random.randint(65, 90)) # 随机颜色1: def rndColor(): return (random.randint(64, 255), random.randint(64, 255), random.randint(64, 255)) # 随机颜色2: def rndColor2(): return (random.randint(32, 127), random.randint(32, 127), random.randint(32, 127)) # 240 x 60: width = 60 * 4 height = 60 image = Image.new('RGB', (width, height), (255, 255, 255)) # 创建Font对象: font = ImageFont.truetype('Arial.ttf', 36) # 创建Draw对象: draw = ImageDraw.Draw(image) # 填充每个像素: for x in range(width): for y in range(height): draw.point((x, y), fill=rndColor()) # 输出文字: for t in range(4): draw.text((60 * t + 10, 10), rndChar(), font=font, fill=rndColor2()) # 模糊: image = image.filter(ImageFilter.BLUR) image.save('code.jpg', 'jpeg')
异步IO
协程(Coroutine):
def consumer(): r = '' while True: n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) r = '200 OK' def produce(c): c.send(None) n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c)
- 首先调用c.send(None)启动生成器;
- 然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
- consumer通过yield拿到消息,处理,又通过yield把结果传回;
- produce拿到consumer处理的结果,继续生产下一条消息;
- produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
asyncio
import asyncio @asyncio.coroutine def wget(host): print('wget %s...' % host) connect = asyncio.open_connection(host, 80) reader, writer = yield from connect header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) yield from writer.drain() while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close() loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
Python从3.5版开始为asyncio提供了async和await的新语法;
aiohttp:
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(0.5) return web.Response(body=b'<h1>Index</h1>') async def hello(request): await asyncio.sleep(0.5) text = '<h1>hello, %s!</h1>' % request.match_info['name'] return web.Response(body=text.encode('utf-8')) async def init(loop): app = web.Application(loop=loop) app.router.add_route('GET', '/', index) app.router.add_route('GET', '/hello/{name}', hello) srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv loop = asyncio.get_event_loop() loop.run_until_complete(init(loop)) loop.run_forever()
廖雪峰python教程 https://www.liaoxuefeng.com/