RabbitMQ
RabbitMQ生产消费者
最简单的通信队列: P为生产者,C为消费者

消费者可以是多个,这时队列会采用轮询的方式分发消息给消费者

生产者:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.queue_declare(queue="hello", durable=True) # durable参数声明队列为持久化队列 data = "" while not data == "bye": data = input("Something you want to publish: ") channel.basic_publish(exchange="", routing_key="hello", body=data, properties=pika.BasicProperties(delivery_mode=2)) # delivery_mode指定消息持久化 print("[INFO] - Message sent.") connection.close() |
消费者:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika import time """ 如果RabbitMQ服务器需要验证,需要指定用户密码 credential = pika.PlainCredentials("user", "password") connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10", credentials=credential)) """ connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.queue_declare(queue="hello", durable=True) # durable参数声明队列为持久化队列 def callback(ch, method, properties, body): print("Received message:") print("""ch: %s method: %s properties: %s body: %s""" % (ch, method, properties, body)) time.sleep(5) # 模拟处理消息所需时间 ch.basic_ack(delivery_tag=method.delivery_tag) # 如果没有自动确认则需手工调用确认方法 print("Jod done") """ basic_consume参数: queue: 要从哪个队列里接收消息 on_message_callback: 收到消息后的执行的回调函数,会传入四个参数channel,method,properties,body auto_ack: 是否自定确认消息,如果不确认,消息会一直存在队列里不删除 """ channel.basic_qos(prefetch_count=5) # 指定缓存窗口大小,只有当缓存窗口未满时才会分发消息给消费者 channel.basic_consume(queue="hello", on_message_callback=callback, auto_ack=False) print("Waiting for message.") channel.start_consuming() connection.close() |
队列和消息持久化
默认队列和消息在RabbitMQ服务器重启后会失效,如果如需是队列一直有效,并且未消费的消息得到保留,需要进行声明
队列持久化
|
1 2 3 |
connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.queue_declare(queue="hello", durable=True) # durable参数声明队列为持久化队列 |
消息持久化
|
1 2 |
channel.basic_publish(exchange="", routing_key="hello", body=data, properties=pika.BasicProperties(delivery_mode=2)) # delivery_mode指定消息持久化 |
公平分发
默认模式下,队列会把消息轮流发送给每个消费者,不管消费者当前是否繁忙,如果需要更加公平有效的分配,需要指定消费者缓存窗口大小

|
1 |
channel.basic_qos(prefetch_count=5) # 指定缓存窗口大小,只有当缓存窗口未满时才会分发消息给消费者 |
订阅发布
默认的分发模式一条消息只能发送给一个消费者,如果需要实现广播的效果,需要用到exchange,excahnge会负责把满足条件的消息分发到绑定到该exchange的所有队列中,消费者再从队列中消费
一般分为以下三种分发(绑定)模式:
fanout
队列直接与exchange绑定来获取消息

生产者
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) # 创建到RabbitMQ服务器的连接 channel = connection.channel() # 创建通道 channel.exchange_declare(exchange="msg", exchange_type="fanout") # 声明exchange名称和类型(fanout为广播模式) message = "" while message != "bye": message = input("Message to publish:>>") channel.basic_publish(exchange="msg", routing_key="", body=message) # 发送消息到exchange进行分发 print("Message sent") connection.close() |
消费者
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.exchange_declare(exchange="msg", exchange_type="fanout") # 声明一个exchange并指定类型为fonout result = channel.queue_declare(queue="", exclusive=True) # 通过指定空白的队列名称来生成一个随机的queue,exclusive声明为唯一 queue_name = result.method.queue # 获取队列名称 print("Queue name:", queue_name) channel.queue_bind(exchange="msg", queue=queue_name) # 将queue绑定到exchange def callback(ch, method, properties, body): print("Received message:") print("""ch: %s method: %s properties: %s body: %s""" % (ch, method, properties, body)) ch.basic_ack(delivery_tag=method.delivery_tag) # 如果没有自动确认则需手工调用确认方法 print("Jod done") channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False) print("Waiting for message.") channel.start_consuming() connection.close() |
direct
direct模式会根据绑定到exchange时所声明的关键字(routing_key)来分发消息到指定队列

生产者
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.exchange_declare(exchange="logs", exchange_type="direct") # 声明一个exchange,类型为direct try: while True: level = input("Level of log: >>").strip() # 消息级别,用作routing_key message = input("Message: >>").strip() channel.basic_publish(exchange="logs", routing_key=level, body=message) print("Message sent") finally: connection.close() |
消费者
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.exchange_declare(exchange="logs", exchange_type="direct") # 声明一个exchange,类型为direct result = channel.queue_declare(queue="", exclusive=True) # 通过指定空白的队列名称来生成一个随机的queue,exclusive声明为唯一 queue_name = result.method.queue # 获取队列名称 print("Queue name: ", queue_name) levels = input("Log levels for subscriber: >>").strip().split() # 根据输入的监听级别,循环绑定routing_key到exchange中 for level in levels: channel.queue_bind(queue=queue_name, exchange="logs", routing_key=level) def callback(ch, method, properties, body): print("Received message:") print("""ch: %s method: %s properties: %s body: %s""" % (ch, method, properties, body)) ch.basic_ack(delivery_tag=method.delivery_tag) # 如果没有自动确认则需手工调用确认方法,method.delivery_tag为消息编号 channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False) print("Waiting for message.") channel.start_consuming() connection.close() |
topic
topic模式时更为细致的过滤模式,例如unix系统中的日志来源一般为source.level格式,如sys.info, kernel.warn,这时可以使用topic模式来实现更灵活的配置

生产者
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.exchange_declare(exchange="info", exchange_type="topic") # 声明一个exchange,类型为topic try: while True: key = input("Key of the message: >>").strip() message = input("Something to send: >>").strip() channel.basic_publish(exchange="info", routing_key=key, body=message) print("Message sent") finally: connection.close() |
消费者
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.exchange_declare(exchange="info", exchange_type="topic") # 声明一个exchange,类型为topic result = channel.queue_declare(queue="", exclusive=True) # 通过指定空白的队列名称来生成一个随机的queue,exclusive声明为唯一 queue_name = result.method.queue # 获取队列名称 print("Queue name: ", queue_name) keys = input("Your focus: >>").strip().split() # 输入需要监听的内容,如*.info,、ssh.*、*.utils.*(单独一个#代表所有) # 根据输入的监听内容,循环绑定routing_key到exchange中 for key in keys: channel.queue_bind(queue=queue_name, exchange="info", routing_key=key) def callback(ch, method, properties, body): print("Received message:") print("""ch: %s method: %s properties: %s body: %s""" % (ch, method, properties, body)) ch.basic_ack(delivery_tag=method.delivery_tag) # 如果没有自动确认则需手工调用确认方法,method.delivery_tag为消息编号 channel.basic_consume(queue=queue_name, on_message_callback=callback) print("Waiting for message.") channel.start_consuming() connection.close() |
RabbitMQ RPC
为了说明如何使用RPC服务,我们将创建一个简单的客户机类。它将公开一个名为run的方法,该方法发送RPC请求并阻塞,直到收到答案:

RPC Server
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika def fib(n): """ :param n: 位数 :return: 返回第n位斐波那契数列 """ if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def response(ch, method, pros, body): result = fib(int(body)) # 计算斐波那契数列 # 发送消息到客户端指定的返回队列 ch.basic_publish(exchange="", routing_key=pros.reply_to, properties=pika.BasicProperties(correlation_id=pros.correlation_id), body=str(result)) connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.queue_declare(queue="rpc_queue") channel.basic_qos(prefetch_count=1) # 指定缓存窗口大小,只有当缓存窗口未满时才会分发消息给消费者 channel.basic_consume(queue="rpc_queue", on_message_callback=response, auto_ack=True) # 收到消息后交给response函数处理 print("Server started.") channel.start_consuming() # 接收客户端消息 connection.close() |
RPC Client
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika import uuid import time class RpcClient(object): """ RCP客户端 """ def __init__(self): self.uid = "" self.response = None self.conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) self.channel = self.conn.channel() queue_tmp = self.channel.queue_declare(queue="", exclusive=True) # 通过指定空白的队列名称来生成一个随机的queue self.callback_queue = queue_tmp.method.queue # 获取队列名称 self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.get_result, auto_ack=True) def get_result(self, ch, method, properties, body): """ 获取队列中的消息内容 :param ch: 返回消息的通道名 :param method: :param properties: 消息的附加属性 :param body: 消息体 :return: 返回数字形式的消息体 """ if properties.correlation_id == self.uid: self.response = int(body) def run(self, n): self.response = None # 重置状态 self.uid = str(uuid.uuid4()) # 生成本次消息的uid self.channel.basic_publish(exchange="", routing_key="rpc_queue", properties=pika.BasicProperties(reply_to=self.callback_queue, correlation_id=self.uid), body=str(n)) while self.response is None: time.sleep(0.5) self.conn.process_data_events() # 查询是否有新消息 return self.response Fib_RpcClient = RpcClient() while True: try: num = int(input(">> ")) print("获取第[%s]位斐波那契数列是[%s]" % (num, Fib_RpcClient.run(num))) except ValueError: print("非法输入,请输入一个正整数") |
Redis
redis基本操作(redis-cli)
字符串操作
# 在Redis中设置值,默认,不存在则创建,存在则修改
set key value [expiration EX seconds|PX milliseconds] [NX|XX]
# 参数
key: 键名
value: 键值
ex: 设置过期时间(秒)
px: 过期时间(毫秒)
nx: 如果设置nx,则只有name不存在时,当前set操作才执行(仅新建)
xx: 如果设置xx,则只有name存在时,当前set操作才执行(仅更新)
例:set 1 a ex 60 nx
# 获取键值
get key
# 参数
key: 键名
# 设置值,只有name不存在时,执行设置操作(添加)
setnx key, value
# 更新键值和过期时间,只有当name存在时,才执行设置操作
setex key second value
# 参数
second: 过期时间(秒)
# 以毫秒为过期时间设置值
psetex name milliseconds value
# 参数
milliseconds: 过期时间(毫秒)
# 批量设置值
mset key value [key value...]
例:
mset 1 a 2 b
# 批量获取值
mget key [key ...]
例:
mset 1 2
# 为键赋与新值同时返回旧值
getset key value
# 获取键值的指定位置的文本
getrange key start end
# 参数
start: 开始位置,0表示第一位
end: 结束位置
# 从键值指定位置开始修改值
setrange key offset value
# 参数
offset: 偏移位置
# 设置二进制位置的值
setbit key offset value
# 参数
offset: 要设置的二进制位置(一个字节占8位)
# 获取指定位置的二进制值
getbit key offset
# 返回指定键值中的二进制1的个数
bitcount key [start end]
# 参数
start: 开始计数的二进制位置
end: 结束计数的二进制位置
# 返回指定键值的长度(UTF-8编码字节数)
strlen key
# 在指定键值后面追加字符串
append key value
# 对数字类型的键值进行递增(加1)
incr key
# 对数字类型的键值进行递减(减1)
decr key
# 对数字类型的键值增加指定数值
incrby key increment
# 参数
increment: 增加的值
# 对数字类型的键值减少指定数值
decrby key decrement
# 参数
decrement: 减少的值
# 对指定键值加上浮点数值
incrbyfloat key increment
# 参数
increment: 要增加的值(浮点数)
# 搜索符合条件的键值
scan cursor [MATCH pattern] [COUNT count]
# 参数
cursor: 游标位置,配合count参数使用,当结果数量大于count数量时,返回一个非0的游标位置,
下次scan时可以从返回的游标位置继续往后搜索
pattern: 匹配条件(可选)
count: 返回结果数量(可选)
hash操作(类似于字典)
# 设置一个hash键值对
hset key field value
# 参数
key: 键名
field: 键值中的字段名
value: 键值中的字段值
# 批量设置hash键值对
hmset key field value [field value ...]
# 例子:
hmset info name Bob age 22
# 获取hash键值的指定字段的值
hget key field
# 获取hash键值的所有字段
hgetall key
# 获取hash键值的字段个数
hlen key
# 获取hash键值中的所有字段名
hkyes key
# 获取hash键值中的所有字段值
hvals key
# 检查hash键值中是否存在指定的字段
hexists key field
# 删除hash键值中的指定字段
hdel key field [field ...]
# 对hash键值中的指定字段增加指定数值
hincrby key field increment
# 参数
increment: 要增加的值
# 对hash键值中的指定字段增加指定浮点数
hincrbyfloat key field increment
# 参数
increment: 要增加的值(浮点数)
# 从hash键值中搜索指定的字段名
hscan key cursor [MATCH pattern] [COUNT count]
# 参数
cursor: 游标位置,配合count参数使用,当结果数量大于count数量时,返回一个非0的游标位置,
下次scan时可以从返回的游标位置继续往后搜索
pattern: 匹配条件(可选)
count: 返回结果数量(可选)
# 例: 查找n开头的字段
hscan info 0 n*
List操作
# 存放一个List类型的键值,一次可以放多个value(从右往左放)
lpush key value [value ...]
# 例:
lpush list1 1 2 3 # 列表结果:3 2 1
# 存放一个List类型的键值,一次可以放多个value(从左往右放)
rpush key value [value ...]
# 例:
lpush list2 1 2 3 # 列表结果:1 2 3
# 仅当key存在时,才在左边添加value
lpushx key value
# 仅当key存在时,才在右边添加value
rpushx key value
# 获取List键值的内容
lrange key start stop
# 参数
start: 开始index,0 表示开头
stop: 结束index,-1表是到结尾
# 获取指定list键值的长度(列表元素个数)
llen key
# 在指定值的前面或后面插入值
linsert key BEFORE|AFTER pivot value
# 参数
BEFORE|AFTER: 插入方式
pivot: 参考元素
# 例
linsert list1 after 2 a # 在元素2的后面插入a
# 修改list键值指定位置元素的值
lset key index value
# 参数
index: 元素在列表中的的位置
# 删除list键值中指定内容的元素
lrem key count value
# 参数
count: 要删除的个数(多个相同元素删除),为正数时从左往右删,为负数时从右往左删
# 从list键值中获取最左侧的元素返回并删除
lpop key
# 从list键值中获取最右侧的元素返回并删除
rpop key
# 获取list指定位置的值
lindex key index
# 从list删除指定范围以外的值
ltrim key start stop
# 参数
start: 保留开始的位置
stop: 暴力结束的位置
# 将一个list键值的最右边的元素移到另一个list键值最左边
rpoplpush source destination
# 参数
source: 要从最右边删除元素的list
destination: 要在左边添加元素的list
# 从list键值中取出最左边的元素
blpop key [key ...] timeout
# 参数
timeout: 当没有元素可取时等待超时的时间(秒)
# 从list键值中取出最右边的元素
brpop key [key ...] timeout
# 参数
timeout: 当没有元素可取时等待超时的时间(秒)
# 将一个list键值的最右边的元素移到另一个list键值最左边
brpoplpush source destination timeout
# 参数
source: 要从最右边删除元素的list
destination: 要在左边添加元素的list
timeout: 当源list没有元素可取时等待超时的时间(秒)
set操作(集合)
# 添加一个集合类型的键值,如果集合已经存在,在追加新的值
sadd key member [member ...]
# 参数
key: 键名
member: 集合成员(自动去重)
# 获取集合中改动全部成员
smembers key
# 获取集合键中的成员数量
scard key
# 求差集(在第一个集合中不在第二个集合中的值)
sdiff key [key ...]
# 求差集,并将结果保存到另外一个集合
sdiffstore destination key [key ...]
# 参数
destination: 要保存结果的目标集合
# 求两个集合的交集
sinter key [key ...]
# 求两个集合的交集,并将结果保存到另外一个集合
sinterstore destination key [key ...]
# 求两个集合的并集
sunion key [key ...]
# 求两个集合的并集,并将结果保存到另外一个集合
sunionstore destination key [key ...]
# 判断值是否是集合成员
sismember key member
# 参数
member: 要进行判断的值
# 将集合中的一个成员转移到另外一个集合
smove source destination member
# 参数
source: 源集合
destination: 目标集合
member: 成员值
# 从集合尾部取出值并删除
spop key [count]
# 参数
count: 要取出的值的个数(可选)
# 从集合中随机取出值
srandmember key [count]
# 从集合中删除指定的值
srem key member [member ...]
# 从集合键值中搜索符合条件的值,与string和hash的scan用法一致
sscan key cursor [MATCH pattern] [COUNT count]
有序集合
有序集合是在集合的基础上,为每个元素指定一个分数(权重)以进行排序(分数高的靠右)
# 添加一个有序集合,如果键已经存在,则添加或修改集合键中的值(权重)
zadd key [NX|XX] [CH] [INCR] score member [score member ...]
# 参数
NX: 只有当member不存在时才执行操作(用于添加新成员)
XX: 只有当member存在时才执行操作(用于修改现有成员分数)
CH: 指定返回值为发生变化(包括新增和修改)的成员总数,默认返回值只是新增的成员数
INCR: 在原有的分数基础上增加
score: 分数,即权值
member: 成员值
# 返回集合中成员数量
zcard key
# 统计集合中在指定分数范围内的数字
zcount key min max
# 参数
min: 分数下限
max: 分数上限
# 增加集合中指定成员的分数
zincrby key increment member
# 获取集合中成员
zrange key start stop [WITHSCORES]
# 参数
start: 开始位置(0表示开头)
stop: 结束位置(-1表示末尾)
WITHSCORES: 展示成员的分数(可选)
# 获取集合中指定成员的位置
zrank key member
# 删除集合中指定的值
zrem key member [member ...]
# 根据位置范围删除集合中的值
zremrangebyrank key start stop
# 根据分数范围删除集合中的值
zremrangebyscore key min max
# 查找集合中指定值的分数
zscore key member
# 求两个有序集合的并集
zunionstore destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX]
# 参数
destination: 要保存交集的目标集合
numkeys: 将要计算交集的集合数
weight: 为每个键指定权重,即进行分数合并时前面的key中元素的分数要乘以权重
AGGREGATE: 对两个集合中元素的分数的聚合方式
SUM: 合并所有集合中该元素的分数(默认)
MIN: 取集合中该元素最小的分数
MAX: 取集合中该元素最大的分数
# 例
zunionstore zset3 2 zset1 zset2 WEIGHTS 0.5 1 # 求zset1和zset2的并集,保存到zset3,zset1中元素的分数要乘以0.5
# 求两个有序集合的交集,参数与求并集一样
zinterstore destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX]
# 从有序集合中搜索符合条件的元素
zscan key cursor [MATCH pattern] [COUNT count]
通用操作
# 删除指定键
del key [key ...]
# 判断键是否存在
exists key [key ...]
# 为一个键指定过期时间(过期自动删除)
expire key seconds
# 参数
seconds: 多少秒后过期
# 重命名键
rename key newkey
# 参数
newkey: 新键名
# 将指定的键移动到新的db下
move key db
# 参数
db: 要移动到的目标db编号
# 随机返回一个存在的键名
randomkey
# 查询指定键的类型
type key
redis管道(pipeline)
实现一次请求指定多个命令,默认情况下一次pipline是原子性操作(相当于sql的事务)。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import redis import time pool = redis.ConnectionPool(host="192.168.80.10", port=6379, db=6) # 创建一个redis连接池 r = redis.Redis(connection_pool=pool) # 从连接池中申请连接 p = r.pipeline(transaction=True) # 声明一个管道,可以将多个命令合并执行 p.set("1001", "aaa") # 在管道中加入一个set命令 time.sleep(10) p.set("1002", "bbb") p.execute() # 最终执行,将两次set一起完成,在此之前不会有值 |
redis发布订阅
发布者(publish)
|
1 2 3 4 5 6 7 8 9 10 11 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import redis conn = redis.Redis(host="192.168.80.10") channel = "ch1" # 发布的通道名 while True: msg = input(">> ") conn.publish(channel, msg) # 发布消息到通道 |
订阅者(pubsub)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import redis conn = redis.Redis(host="192.168.80.10") channel = "ch1" # 订阅的通道名 pub = conn.pubsub() # 创建订阅者 pub.subscribe(channel) # 订阅通道 pub.parse_response() # 准备接收(处理第一条系统消息[b'subscribe', b'ch1', 1]) while True: msg = pub.parse_response() # 接收数据 print(msg) |
原文链接:Python 从入门到放弃 - Lesson 11 redis缓存、rabbitMQ队列,转载请注明来源!
