Contents

Python学习笔记 - 旧笔记

  • 默认参数必须指向不变对象!(如若可变多次调用会产生不确定的结果)
  • 在调用函数时普通参数和默认参数都可以用:

python3

fun(参数名=参数值,参数名=参数值....) // 顺序可以调换.

python3

def fun(*args): // 定义,args is tuple
    pass

fun(1,2,3) // 直接传参调用
fun(*l) // l is list or tuple

python3

def fun(**kw): // 定义,kw is dict
    pass
    
fun(key_name1=value1,ke_name2=value2)
fun(**dic) // dic is dict,传入的参数只是dic的拷贝

python3

def fun(arg1,*,city,job): 
    pass

fun('arg1',city='beijing',job='无') // 必须传入参数名,顺序可以调换

def fun2(arg1,*args,city,job) // 此时不需要*,原因可想而知.

必选参数、默认参数、可变参数、命名关键字参数和关键字参数。

在函数返回的时候,调用自身本身,并且,return语句不能包含表达式。这样,编译器或者解释器就可以把尾递归做优化,使递归本身无论调用多少次,都只占用一个栈帧,不会出现栈溢出的情况。(Python不优化…)

  1. 判断一个对象是否可迭代

    python3

    from collections import Iterable
    isinstance('abc', Iterable) # str是否可迭代
  2. 迭代索引+value

    python3

    for i, value in enumerate(['A', 'B', 'C']):
        print(i, value)
  3. 列表生成器

    python3

    l1 = list(range(1, 11))
    l2 = [x * x for x in range(1, 11)]
    l3 = [x * x for x in range(1, 11) if x % 2 == 0]
    l4 = [m + n for m in 'ABC' for n in 'XYZ']
  4. generator

    通过算法生成下一个迭代值,省内存.

    python3

    g = (x * x for x in range(10)) # 生成generator
    next(g) # 通过next函数获取g的下一个元素,当没有元素时会抛出异常

    如果一个函数定义中包含yield关键字,那么这个函数就不再是一个普通函数,而是一个generator.每次调用next()函数执行,遇到yield返回,再次执行从上一次yield之后执行.

  5. Iterator:可以调用next()的对象成为Iterator对象.

    python3

    i = iter([1, 2, 3, 4, 5]) # 取得Iterator对象.
  1. map

    python3

    r = map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9]) # r is Iterator
    l = list(r) # 转换为list
  2. reduce

    效果:

    python3

    reduce(f, [x1, x2, x3, x4]) = f(f(f(x1, x2), x3), x4)

    应用:

    python3

    from functools import reduce 
    def fn(x, y): 
        return x * 10 + y
    
    reduce(fn, [1, 3, 5, 7, 9]) # 生成13579
  3. filter

  4. sorted

    python3

    sorted([36, 5, -12, 9, -21], key=abs) # abs函数作用于每个元素
  5. 闭包:返回函数不要引用任何循环变量,或者后续会发生变化的变量。

    python3

    def count():
        fs = []
        for i in range(1, 4):
            def f():
                return i*i
            fs.append(f)
        return fs
        f1, f2, f3 = count() #9,9,9
    
    def count():
        def f(j):
            def g():
                return j*j
            return g
        fs = []
        for i in range(1, 4):
            fs.append(f(i)) # f(i)立刻被执行,因此i的当前值被传入f()
        return fs    
  6. lambda 参数名,参数名 : 表达式返回值

python3

# 两层调用
def log(func):
    def wrapper(*args, **kw):
        print('call %s():' % func.__name__)
        return func(*args, **kw)
    return wrapper
    
@log                   #相当于now = log(now)   
def now():
    print('2015-3-25') 

# 三层调用
def log(text):
    def decorator(func):
        def wrapper(*args, **kw):
            print('%s %s():' % (text, func.__name__))
            return func(*args, **kw)
        return wrapper
    return decorator
    
    @log('execute')        #相当于now = log('execute')(now)
    def now():
        print('2015-3-25')

python3

import functools
int2 = functools.partial(int, base=2) # 返回的函数base=2
1. 一个py文件就是一个模块.
2. 一个目录里面包含__init__.py,这个目录就是一个包(package).
3. 类似_xxx和__xxx这样的函数或变量就是非公开的(private),不应该被直接引用,比如_abc,__abc等;
  1. 实例的变量名如果以__开头,就变成了一个私有变量(private),只有内部可以访问,外部不能访问.

  2. 实的变量名如果以_开头,外部可以访问,但是视为private.

  3. 鸭子类型

  4. type()

    python3

    >>> import types
    >>> def fn():
    ...     pass
    ...
    >>> type(fn)==types.FunctionType
    True
    >>> type(abs)==types.BuiltinFunctionType
    True
    >>> type(lambda x: x)==types.LambdaType
    True
    >>> type((x for x in range(10)))==types.GeneratorType
    True
  5. isinstance()

    python3

    >>> isinstance([1, 2, 3], (list, tuple))
    True
    >>> isinstance((1, 2, 3), (list, tuple))
    True
  6. 测试对象属性和方法

    python3

        dir('ABC') #获取对象的属性和方法
    
    >>> hasattr(obj, 'x') # 有属性'x'吗?
    True
    >>> obj.x
    9
    >>> hasattr(obj, 'y') # 有属性'y'吗?
    False
    >>> setattr(obj, 'y', 19) # 设置一个属性'y'
    >>> hasattr(obj, 'y') # 有属性'y'吗?
    True
    >>> getattr(obj, 'y') # 获取属性'y'
    19
    >>> obj.y # 获取属性'y'
    19
    
    >>> getattr(obj, 'z', 404) # 获取属性'z',如果不存在,返回默认值404
    404
  7. __slots__:限制属性

    python3

    class Student(object):
        __slots__ = ('name', 'age') # 用tuple定义允许绑定的属性名称
  8. @property:把一个方法变成属性的调用

    python3

    class Student(object):
    
    @property
    def score(self):
        return self._score
    
    @score.setter
    def score(self, value):
        if not isinstance(value, int):
            raise ValueError('score must be an integer!')
        if value < 0 or value > 100:
            raise ValueError('score must between 0 ~ 100!')
        self._score = value
  9. __XXX__

    1. __len__():调用len()时调用

    2. __str__():类似toString

    3. __repr__():为调试服务的toString

    4. __iter__():该方法返回一个迭代对象,然后,Python的for循环就会不断调用该迭代对象的__next__()方法拿到循环的下一个值

      python3

      class Fib(object):
      def __init__(self):
          self.a, self.b = 0, 1 # 初始化两个计数器a,b
      
      def __iter__(self):
          return self # 实例本身就是迭代对象,故返回自己
      
      def __next__(self):
          self.a, self.b = self.b, self.a + self.b # 计算下一个值
          if self.a > 100000: # 退出循环的条件
              raise StopIteration()
          return self.a # 返回下一个值
    5. __getitem__(n):

      python3

      # 实现切片
      class Fib(object):
      def __getitem__(self, n):
          if isinstance(n, int): # n是索引
              a, b = 1, 1
              for x in range(n):
                  a, b = b, a + b
              return a
          if isinstance(n, slice): # n是切片
              start = n.start
              stop = n.stop
              if start is None:
                  start = 0
              a, b = 1, 1
              L = []
              for x in range(stop):
                  if x >= start:
                      L.append(a)
                  a, b = b, a + b
              return L

      此外,如果把对象看成dict,getitem()的参数也可能是一个可以作key的object,例如str。与之对应的是__setitem__()方法,把对象视作list或dict来对集合赋值。最后,还有一个__delitem__()方法,用于删除某个元素。

    6. __getattr__(self, attr):当获取属性时没有找到,就调用它.

    7. __call__():可以实现实例本身调用.

      python3

      # 判断对象是否可以实例调用
      >>> callable(Student())
      True
      >>> callable(max)
      True
      >>> callable([1, 2, 3])
      False
      >>> callable(None)
      False
      >>> callable('str')
      False
  1. 定义

    python3

    from enum import Enum
    
    Month = Enum('Month', ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'))
  2. 遍历

    python3

    for name, member in Month.__members__.items():
        print(name, '=>', member, ',', member.value)
    # value属性则是自动赋给成员的int常量,默认从1开始计数。
  3. 自定义枚举类

    python3

    from enum import Enum, unique
    
    @unique
    class Weekday(Enum):
        Sun = 0 # Sun的value被设定为0
        Mon = 1
        Tue = 2
        Wed = 3
        Thu = 4
        Fri = 5
        Sat = 6
  4. 访问枚举类型

python3

>>> day1 = Weekday.Mon
>>> print(day1)
Weekday.Mon
>>> print(Weekday.Tue)
Weekday.Tue
>>> print(Weekday['Tue'])
Weekday.Tue
>>> print(Weekday.Tue.value)
2
>>> print(day1 == Weekday.Mon)
True
>>> print(day1 == Weekday.Tue)
False
>>> print(Weekday(1))
Weekday.Mon
>>> print(day1 == Weekday(1))
True
>>> Weekday(7)
Traceback (most recent call last):
...
ValueError: 7 is not a valid Weekday
>>> for name, member in Weekday.__members__.items():
...     print(name, '=>', member)
...
Sun => Weekday.Sun
Mon => Weekday.Mon
Tue => Weekday.Tue
Wed => Weekday.Wed
Thu => Weekday.Thu
Fri => Weekday.Fri
Sat => Weekday.Sat

python3

>>> def fn(self, name='world'): # 先定义函数
...     print('Hello, %s.' % name)
...
>>> Hello = type('Hello', (object,), dict(hello=fn)) # 创建Hello class
>>> h = Hello()
>>> h.hello()
Hello, world.
>>> print(type(Hello))
<class 'type'>
>>> print(type(h))
<class '__main__.Hello'>

要创建一个class对象,type()函数依次传入3个参数:

1. class的名称;
2. 继承的父类集合,注意Python支持多重继承,如果只有一个父类,别忘了tuple的单元素写法;
3. class的方法名称与函数绑定,这里我们把函数fn绑定到方法名hello上。
  1. metaclass的类名总是以Metaclass结尾,以便清楚地表示这是一个metaclass.

  2. 示例:

    python3

    # metaclass是类的模板,所以必须从`type`类型派生:
    class ListMetaclass(type):
        def __new__(cls, name, bases, attrs):
            attrs['add'] = lambda self, value: self.append(value)
            return type.__new__(cls, name, bases, attrs)
    
    class MyList(list, metaclass=ListMetaclass):
        pass

    __new__()方法接收到的参数依次是:

     1. 当前准备创建的类的对象;
    
     2. 类的名字;
    
     3. 类继承的父类集合;
    
     4. 类的方法集合;    
    
  1. 异常处理:

    python3

    try:
        print('try...')
        r = 10 / int('2')
        print('result:', r)
    except ValueError as e:
        print('ValueError:', e)
    except ZeroDivisionError as e:
        print('ZeroDivisionError:', e)
    else:
        print('no error!')
    finally:
        print('finally...')
    print('END')
  2. assert(启动Python解释器时可以用-O参数来关闭assert):

    python3

    def foo(s):
        n = int(s)
        assert n != 0, 'n is zero!'
        return 10 / n
    
    def main():
        foo('0')
  3. unittest

    python3

    import unittest
    
    from mydict import Dict
    
    class TestDict(unittest.TestCase):
    
        def test_init(self):
            d = Dict(a=1, b='test')
            self.assertEqual(d.a, 1)
            self.assertEqual(d.b, 'test')
            self.assertTrue(isinstance(d, dict))
    
        def test_key(self):
            d = Dict()
            d['key'] = 'value'
            self.assertEqual(d.key, 'value')
    
        def test_attr(self):
            d = Dict()
            d.key = 'value'
            self.assertTrue('key' in d)
            self.assertEqual(d['key'], 'value')
    
        def test_keyerror(self):
            d = Dict()
            with self.assertRaises(KeyError):
                value = d['empty']
    
        def test_attrerror(self):
            d = Dict()
            with self.assertRaises(AttributeError):
                value = d.empty

    setUp()和tearDown()方法会分别在每调用一个测试方法的前后分别被执行。

  1. 简单示例:

    python3

    # 编码和忽略错误
    f = open('/Users/michael/gbk.txt', 'r', encoding='gbk', errors='ignore')
  2. StringIO(在内存中读写str):

    python3

    >>> from io import StringIO
    >>> f = StringIO('Hello!\nHi!\nGoodbye!')
    >>> while True:
    ...     s = f.readline()
    ...     if s == '':
    ...         break
    ...     print(s.strip())
    ...
    Hello!
    Hi!
    Goodbye!
  3. BytesIO

    python3

    >> from io import BytesIO
    >>> f = BytesIO(b'\xe4\xb8\xad\xe6\x96\x87')
    >>> f.read()
    b'\xe4\xb8\xad\xe6\x96\x87'
  4. 操作文件和目录

    1. 目录

      python3

      # 查看当前目录的绝对路径:
      >>> os.path.abspath('.')
      '/Users/michael'
      # 在某个目录下创建一个新目录,首先把新目录的完整路径表示出来:
      >>> os.path.join('/Users/michael', 'testdir')
      '/Users/michael/testdir'
      # 然后创建一个目录:
      >>> os.mkdir('/Users/michael/testdir')
      # 删掉一个目录:
      >>> os.rmdir('/Users/michael/testdir')
      
      # 拆分路径
      >>> os.path.split('/Users/michael/testdir/file.txt')
      ('/Users/michael/testdir', 'file.txt')
      # 文件扩展名
      >>> os.path.splitext('/path/to/file.txt')
      ('/path/to/file', '.txt')
    2. 文件

      python3

      # 对文件重命名:
      >>> os.rename('test.txt', 'test.py')
      # 删掉文件:
      >>> os.remove('test.py')
      
      '''
          制文件的函数居然在os模块中不存在!原因是复制文件并非由操作系统提供的系统调用。理论上讲,我们通过上一节的读写文件可以完成文件复制,只不过要多写很多代码。
      
      幸运的是shutil模块提供了copyfile()的函数,你还可以在shutil模块中找到很多实用函数,它们可以看做是os模块的补充。
      '''
      # 列出文件
      >>> [x for x in os.listdir('.') if os.path.isfile(x) and os.path.splitext(x)[1]=='.py']
      ['apis.py', 'config.py', 'models.py', 'pymonitor.py', 'test_db.py', 'urls.py', 'wsgiapp.py']
  5. 序列化

    python3

    # 序列化
    >>> import pickle
    >>> d = dict(name='Bob', age=20, score=88)
    >>> pickle.dumps(d)
    b'\x80\x03}q\x00(X\x03\x00\x00\x00ageq\x01K\x14X\x05\x00\x00\x00scoreq\x02KXX\x04\x00\x00\x00nameq\x03X\x03\x00\x00\x00Bobq\x04u.'
    
    # 保存到文件
    >>> f = open('dump.txt', 'wb')
    >>> pickle.dump(d, f)
    >>> f.close()
    
    # 从文件读取
    >>> f = open('dump.txt', 'rb')
    >>> d = pickle.load(f)
    >>> f.close()
    >>> d
    {'age': 20, 'score': 88, 'name': 'Bob'}
    
    # JSON
    print(json.dumps(s, default=lambda obj: obj.__dict__)) # __dict__用来存储实例变量
  1. linux

    python3

    import os
    
    print('Process (%s) start...' % os.getpid())
    # Only works on Unix/Linux/Mac:
    pid = os.fork()
    if pid == 0:
        print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
    else:
        print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
  2. 通用

    python3

    from multiprocessing import Process
    import os
    
    # 子进程要执行的代码
    def run_proc(name):
        print('Run child process %s (%s)...' % (name, os.getpid()))
    
    if __name__=='__main__':
        print('Parent process %s.' % os.getpid())
        p = Process(target=run_proc, args=('test',))
        print('Child process will start.')
        p.start()
        p.join()
        print('Child process end.')
  3. 进程池

    python3

    from multiprocessing import Pool
    import os, time, random
    
    def long_time_task(name):
        print('Run task %s (%s)...' % (name, os.getpid()))
        start = time.time()
        time.sleep(random.random() * 3)
        end = time.time()
        print('Task %s runs %0.2f seconds.' % (name, (end - start)))
    
    if __name__=='__main__':
        print('Parent process %s.' % os.getpid())
        p = Pool(4)
        for i in range(5):
            p.apply_async(long_time_task, args=(i,))
        print('Waiting for all subprocesses done...')
        p.close()
        p.join()
        print('All subprocesses done.')
  4. 子进程

    python3

    import subprocess
    
    print('$ nslookup www.python.org')
    r = subprocess.call(['nslookup', 'www.python.org'])
    print('Exit code:', r)
  1. 创建线程:

    python3

    import time, threading
    
    # 新线程执行的代码:
    def loop():
        print('thread %s is running...' % threading.current_thread().name)
        n = 0
        while n < 5:
            n = n + 1
            print('thread %s >>> %s' % (threading.current_thread().name, n))
            time.sleep(1)
        print('thread %s ended.' % threading.current_thread().name)
    
    print('thread %s is running...' % threading.current_thread().name)
    t = threading.Thread(target=loop, name='LoopThread')
    t.start()
    t.join()
    print('thread %s ended.' % threading.current_thread().name)
  2. 互斥锁

    python3

    balance = 0
    lock = threading.Lock()
    
    def run_thread(n):
        for i in range(100000):
            # 先要获取锁:
            lock.acquire()
            try:
                # 放心地改吧:
                change_it(n)
            finally:
                # 改完了一定要释放锁:
                lock.release()
  3. Global Interpreter Lock:

    任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

  4. ThreadLocal:线程之间传递参数,每个线程都有一副本,互不干扰.

    python3

    import threading
    
    # 创建全局ThreadLocal对象:
    local_school = threading.local()
    
    def process_student():
        # 获取当前线程关联的student:
        std = local_school.student
        print('Hello, %s (in %s)' % (std, threading.current_thread().name))
    
    def process_thread(name):
        # 绑定ThreadLocal的student:
        local_school.student = name
        process_student()
    
    t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
    t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
    t1.start()
    t2.start()
    t1.join()
    t2.join()
  1. collections

    1. namedtuple

      python3

      >>> from collections import namedtuple
      >>> Point = namedtuple('Point', ['x', 'y'])
      >>> p = Point(1, 2)
      >>> p.x
      1
      >>> p.y
      2
    2. deque

      python3

      # 双向列表
      >>> from collections import deque
      >>> q = deque(['a', 'b', 'c'])
      >>> q.append('x')
      >>> q.appendleft('y')
      >>> q
      deque(['y', 'a', 'b', 'c', 'x'])
    3. defaultdict

      python3

      >>> from collections import defaultdict
      >>> dd = defaultdict(lambda: 'N/A')
      >>> dd['key1'] = 'abc'
      >>> dd['key1'] # key1存在
      'abc'
      >>> dd['key2'] # key2不存在,返回默认值
      'N/A'
    4. OrderedDict

      python3

      >>> from collections import OrderedDict
      >>> d = dict([('a', 1), ('b', 2), ('c', 3)])
      >>> d # dict的Key是无序的
      {'a': 1, 'c': 3, 'b': 2}
      >>> od = OrderedDict([('a', 1), ('b', 2), ('c', 3)])
      >>> od # OrderedDict的Key是有序的
      OrderedDict([('a', 1), ('b', 2), ('c', 3)]) # 根据插入顺序
    5. Counter

      python3

      >>> from collections import Counter
      >>> c = Counter()
      >>> for ch in 'programming':
      ...     c[ch] = c[ch] + 1
      ...
      >>> c
      Counter({'g': 2, 'm': 2, 'r': 2, 'a': 1, 'i': 1, 'o': 1, 'n': 1, 'p': 1})
  2. base64:将3个字节的二进制编码成四字节对应的字符

    python3

    >>> import base64
    >>> base64.b64encode(b'binary\x00string')
    b'YmluYXJ5AHN0cmluZw=='
    >>> base64.b64decode(b'YmluYXJ5AHN0cmluZw==')
    b'binary\x00string'
    
    # url safe
    >>> base64.b64encode(b'i\xb7\x1d\xfb\xef\xff')
    b'abcd++//'
    >>> base64.urlsafe_b64encode(b'i\xb7\x1d\xfb\xef\xff')
    b'abcd--__'
    >>> base64.urlsafe_b64decode('abcd--__')
    b'i\xb7\x1d\xfb\xef\xff'
  3. struct

    python3

    >>> import struct
    >>> struct.pack('>I', 10240099) # >表示字节顺序是big-endian,也就是网络序,I表示4字节无符号整数。
    b'\x00\x9c@c'
    
    >>> struct.unpack('>IH', b'\xf0\xf0\xf0\xf0\x80\x80') # I:4字节无符号整数和H:2字节无符号整数。
    (4042322160, 32896)
  4. hashlib

    python3

    import hashlib
    md5 = hashlib.md5()
    md5.update('how to use md5 in python hashlib?'.encode('utf-8'))
    print(md5.hexdigest())
    
    # 如果数据量过大,可以分块调用update
    import hashlib
    md5 = hashlib.md5()
    md5.update('how to use md5 in '.encode('utf-8'))
    md5.update('python hashlib?'.encode('utf-8'))
    print(md5.hexdigest())
  5. itertools

    python3

    >>> import itertools
    
    # 无限迭代
    >>> natuals = itertools.count(1)
    
    # 重复无限迭代    
    >>> cs = itertools.cycle('ABC') # 注意字符串也是序列的一种
    
    # 单一元素迭代,可以指定次数
    ns = itertools.repeat('A', 3)
    
    # 合并迭代 
    >>> for c in itertools.chain('ABC', 'XYZ'):
    ...     print(c)
    
    # groupby()把迭代器中相邻的重复元素挑出来放在一起
    >>> for key, group in itertools.groupby('AAABBBCCAAA'):
    ...     print(key, list(group))
    ...
    A ['A', 'A', 'A']
    B ['B', 'B', 'B']
    C ['C', 'C']
    A ['A', 'A', 'A']
  6. contextlib:任何对象,只要正确实现了上下文管理,就可以用于with语句。实现上下文管理是通过__enter__和__exit__这两个方法实现的。

    python3

    from contextlib import contextmanager
    class Query(object):
    
        def __init__(self, name):
            self.name = name
    
        def query(self):
            print('Query info about %s...' % self.name)
    
    @contextmanager
    def create_query(name):
        print('Begin')
        q = Query(name)
        yield q
        print('End')
    
    
    # 用contextmanager实现前后自动执行代码
    @contextmanager
    def tag(name):
        print("<%s>" % name)
        yield
        print("</%s>" % name)
    
    with tag("h1"):
        print("hello")
        print("world")
    
    # 如果一个对象没有实现上下文,我们就不能把它用于with语句。这个时候,可以用closing()来把该对象变为上下文对象。
    from contextlib import closing
    from urllib.request import urlopen
    
    with closing(urlopen('https://www.python.org')) as page:
        for line in page:
            print(line)
  7. urllib

    python3

    from urllib import request
    
    with request.urlopen('https://api.douban.com/v2/book/2129650') as f:
        data = f.read()
        print('Status:', f.status, f.reason)
        for k, v in f.getheaders():
            print('%s: %s' % (k, v))
        print('Data:', data.decode('utf-8'))
    
    # 加请求头
    from urllib import request
    
    req = request.Request('http://www.douban.com/')
    req.add_header('User-Agent', 'Mozilla/6.0 (iPhone; CPU iPhone OS 8_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/8.0 Mobile/10A5376e Safari/8536.25')
    with request.urlopen(req) as f:
        print('Status:', f.status, f.reason)
        for k, v in f.getheaders():
            print('%s: %s' % (k, v))
        print('Data:', f.read().decode('utf-8'))
    
    # post登录
    
    from urllib import request, parse
    
    print('Login to weibo.cn...')
    email = input('Email: ')
    passwd = input('Password: ')
    login_data = parse.urlencode([
        ('username', email),
        ('password', passwd),
        ('entry', 'mweibo'),
        ('client_id', ''),
        ('savestate', '1'),
        ('ec', ''),
        ('pagerefer', 'https://passport.weibo.cn/signin/welcome?entry=mweibo&r=http%3A%2F%2Fm.weibo.cn%2F')
    ])
    
    req = request.Request('https://passport.weibo.cn/sso/login')
    req.add_header('Origin', 'https://passport.weibo.cn')
    req.add_header('User-Agent', 'Mozilla/6.0 (iPhone; CPU iPhone OS 8_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/8.0 Mobile/10A5376e Safari/8536.25')
    req.add_header('Referer', 'https://passport.weibo.cn/signin/login?entry=mweibo&res=wel&wm=3349&r=http%3A%2F%2Fm.weibo.cn%2F')
    
    with request.urlopen(req, data=login_data.encode('utf-8')) as f:
        print('Status:', f.status, f.reason)
        for k, v in f.getheaders():
            print('%s: %s' % (k, v))
        print('Data:', f.read().decode('utf-8'))
    
    # 代理
    proxy_handler = urllib.request.ProxyHandler({'http': 'http://www.example.com:3128/'})
    proxy_auth_handler = urllib.request.ProxyBasicAuthHandler()
    proxy_auth_handler.add_password('realm', 'host', 'username', 'password')
    opener = urllib.request.build_opener(proxy_handler, proxy_auth_handler)
    with opener.open('http://www.example.com/login.html') as f:
        pass
  8. PIL(Python Imaging Library)

    python3

    # 操作图片
    from PIL import Image
    
    # 打开一个jpg图像文件,注意是当前路径:
    im = Image.open('test.jpg')
    # 获得图像尺寸:
    w, h = im.size
    print('Original image size: %sx%s' % (w, h))
    # 缩放到50%:
    im.thumbnail((w//2, h//2))
    print('Resize image to: %sx%s' % (w//2, h//2))
    # 把缩放后的图像用jpeg格式保存:
    im.save('thumbnail.jpg', 'jpeg')
    
    #模糊效果
    from PIL import Image, ImageFilter
    
    # 打开一个jpg图像文件,注意是当前路径:
    im = Image.open('test.jpg')
    # 应用模糊滤镜:
    im2 = im.filter(ImageFilter.BLUR)
    im2.save('blur.jpg', 'jpeg')
    
    # 生成验证码
    from PIL import Image, ImageDraw, ImageFont, ImageFilter
    
    import random
    
    # 随机字母:
    def rndChar():
        return chr(random.randint(65, 90))
    
    # 随机颜色1:
    def rndColor():
        return (random.randint(64, 255), random.randint(64, 255), random.randint(64, 255))
    
    # 随机颜色2:
    def rndColor2():
        return (random.randint(32, 127), random.randint(32, 127), random.randint(32, 127))
    
    # 240 x 60:
    width = 60 * 4
    height = 60
    image = Image.new('RGB', (width, height), (255, 255, 255))
    # 创建Font对象:
    font = ImageFont.truetype('Arial.ttf', 36)
    # 创建Draw对象:
    draw = ImageDraw.Draw(image)
    # 填充每个像素:
    for x in range(width):
        for y in range(height):
            draw.point((x, y), fill=rndColor())
    # 输出文字:
    for t in range(4):
        draw.text((60 * t + 10, 10), rndChar(), font=font, fill=rndColor2())
    # 模糊:
    image = image.filter(ImageFilter.BLUR)
    image.save('code.jpg', 'jpeg')
  1. 协程(Coroutine):

    python3

    def consumer():
        r = ''
        while True:
            n = yield r
            if not n:
                return
            print('[CONSUMER] Consuming %s...' % n)
            r = '200 OK'
    
    def produce(c):
        c.send(None)
        n = 0
        while n < 5:
            n = n + 1
            print('[PRODUCER] Producing %s...' % n)
            r = c.send(n)
            print('[PRODUCER] Consumer return: %s' % r)
        c.close()
    
    c = consumer()
    produce(c)
    1. 首先调用c.send(None)启动生成器;
    2. 然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
    3. consumer通过yield拿到消息,处理,又通过yield把结果传回;
    4. produce拿到consumer处理的结果,继续生产下一条消息;
    5. produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
  2. asyncio

    python3

    import asyncio
    
    @asyncio.coroutine
    def wget(host):
        print('wget %s...' % host)
        connect = asyncio.open_connection(host, 80)
        reader, writer = yield from connect
        header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
        writer.write(header.encode('utf-8'))
        yield from writer.drain()
        while True:
            line = yield from reader.readline()
            if line == b'\r\n':
                break
            print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
        # Ignore the body, close the socket
        writer.close()
    
    loop = asyncio.get_event_loop()
    tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

Python从3.5版开始为asyncio提供了async和await的新语法;

  1. aiohttp:

    python3

    import asyncio
    
    from aiohttp import web
    
    async def index(request):
        await asyncio.sleep(0.5)
        return web.Response(body=b'<h1>Index</h1>')
    
    async def hello(request):
        await asyncio.sleep(0.5)
        text = '<h1>hello, %s!</h1>' % request.match_info['name']
        return web.Response(body=text.encode('utf-8'))
    
    async def init(loop):
        app = web.Application(loop=loop)
        app.router.add_route('GET', '/', index)
        app.router.add_route('GET', '/hello/{name}', hello)
        srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
        print('Server started at http://127.0.0.1:8000...')
        return srv
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init(loop))
    loop.run_forever()

    廖雪峰python教程 https://www.liaoxuefeng.com/