码迷,mamicode.com
首页 > 编程语言 > 详细

python学习第十二课

时间:2016-01-23 10:31:59      阅读:291      评论:0      收藏:0      [点我收藏+]

标签:memcache   redis   rabbitmq   

memcache 介绍

Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载。它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的速度。Memcached基于一个存储键/值对的hashmap。其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信。

emcached缺乏认证以及安全管制,这代表应该将memcached服务器放置在防火墙后。

由于Redis只使用单核,而Memcached可以使用多核,所以在比较上,平均每一个核上Redis在存储小数据时比Memcached性能更高。而在100k以上的数据中,Memcached性能要高于Redis,虽然Redis最近也在存储大数据的性能上进行优化,但是比起Memcached,还是稍有逊色。

如果要说内存使用效率,使用简单的key-value存储的话,Memcached的内存利用率更高,而如果Redis采用hash结构来做key-value存储,由于其组合式的压缩,其内存利用率会高于Memcached.

如果对数据持久化和数据同步有所要求,那么推荐你选择Redis,因为这两个特性Memcached都不具备。即使只是希望在升级或者重启系统后缓存数据不会丢失,选择Redis也是明智的

Redis相比Memcached来说,拥有更多的数据结构和并支持更丰富的数据操作,通常在 Memcached里,你需要将数据拿到客户端来进行类似的修改再set回去。这大大增加了网络IO的次数和数据体积。在Redis中,这些复杂的操作通常和一般的GET/SET一样高效。所以,如果你需要缓存能够支持更复杂的结构和操作,那么Redis会是不错的选择.

 

memcached使用

启动Memcached

memcached -d -m 10    -u root -l 10.211.55.4 -p 12000 -c 256 -P /tmp/memcached.pid

 

参数说明:

    -d 是启动一个守护进程

    -m 是分配给Memcache使用的内存数量,单位是MB

    -u 是运行Memcache的用户

    -l 是监听的服务器IP地址

    -p 是设置Memcache监听的端口,最好是1024以上的端口

    -c 选项是最大运行的并发连接数,默认是1024,按照你服务器的负载量来设定

    -P 是设置保存Memcachepid文件

Memcached命令

存储命令: set/add/replace/append/prepend/cas

获取命令: get/gets

其他命令: delete/stats..

 

Python操作Memcached

常用操作

import memcache

 

mc = memcache.Client([‘10.211.55.4:12000‘], debug=True) #上线后debug=False

mc.set("foo", "bar")

ret = mc.get(‘foo‘)

print ret

debug: whether to display error messages when a server can‘t be contacted.

 

 

redis 使用

redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)list(链表)set(集合)zset(sorted set --有序集合)hash(哈希类型)。这些数据类型都支持push/popadd/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。Redis支持主从同步。数据可以从主服务器向任意数量的从服务器上同步,从服务器可以是关联其他从服务器的主服务器。这使得Redis可执行单层树复制。存盘可以有意无意的对数据进行写操作。由于完全实现了发布/订阅机制,使得从数据库在任何地方同步树时,可订阅一个频道并接收主服务器完整的消息发布记录。同步对读取操作的可扩展性和数据冗余很有帮助。

 

redis提供五种数据类型:stringhashlistsetzset(sorted set)

 

string(字符串)

string是最简单的类型,你可以理解成与Memcached一模一样的类型,一个key对应一个value,其上支持的操作与Memcached的操作类似。但它的功能更丰富。

redis采用结构sdshdrsds封装了字符串,字符串相关的操作实现在源文件sds.h/sds.c中。

 

list(双向链表)

list是一个链表结构,主要功能是pushpop、获取一个范围的所有值等等。操作中key理解为链表的名字。

list的定义和实现在源文件adlist.h/adlist.c

 

dict(hash)

set是集合,和我们数学中的集合概念相似,对集合的操作有添加删除元素,有对多个集合求交并差等操作。操作中key理解为集合的名字。

在源文件dict.h/dict.c中实现了hashtable的操作

dicttabledictEntry指针的数组,数组中每个成员为hash值相同元素的单向链表。set是在dict的基础上实现的,指定了key的比较函数为dictEncObjKeyCompare,若key相等则不再插入。

 

zset(排序set)

zsetset的一个升级版本,他在set的基础上增加了一个顺序属性,这一属性在添加修改元素的时候可以指定,每次指定后,zset会自动重新按新的值调整顺序。可以理解了有两列的mysql表,一列存value,一列存顺序。操作中key理解为zset的名字。

zset利用dict维护key -> value的映射关系,用zsl(zskiplist)保存value的有序关系。zsl实际是叉数

不稳定的多叉树,每条链上的元素从根节点到叶子节点保持升序排序。

 

PYTHON操作redis

1、操作模式

 

redis-py提供两个类RedisStrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,RedisStrictRedis的子类,用于向后兼容旧版本的redis-py

 

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import redis

r = redis.Redis(host=‘10.211.55.4‘, port=6379)

r.set(‘foo‘, ‘Bar‘)

print r.get(‘foo‘)

 

2、连接池

redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import redis

pool = redis.ConnectionPool(host=‘10.211.55.4‘, port=6379)

r = redis.Redis(connection_pool=pool)

r.set(‘foo‘, ‘Bar‘)

print r.get(‘foo‘)

 

3、管道

redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import redis

pool = redis.ConnectionPool(host=‘10.211.55.4‘, port=6379)

r = redis.Redis(connection_pool=pool)

# pipe = r.pipeline(transaction=False)

pipe = r.pipeline(transaction=True)

r.set(‘name‘, ‘alex‘)

r.set(‘role‘, ‘sb‘)

pipe.execute()

 

4、发布订阅

RedisHelper

import redis

class RedisHelper:

    def __init__(self):

        self.__conn = redis.Redis(host=‘10.211.55.4‘)

        self.chan_sub = ‘fm104.5‘

        self.chan_pub = ‘fm104.5‘

    def publis(self, msg):

        self.__conn.publish(self.chan_pub, msg)

        return True

    def subscribe(self):

        pub = self.__conn.pubsub()

        pub.subscribe(self.chan_sub)

        pub.parse_response()

        return pub

订阅者:

#!/usr/bin/env python

# -*- coding:utf-8 -*-

from monitor.RedisHelper import RedisHelper

obj = RedisHelper()

redis_sub = obj.subscribe()

while True:

    msg= redis_sub.parse_response()

    print msg

 

发布者:

#!/usr/bin/env python

# -*- coding:utf-8 -*-

from monitor.RedisHelper import RedisHelper

obj = RedisHelper()

obj.public(‘hello‘)

 

更详细的:https://github.com/andymccurdy/redis-py/

超强超详细的REDIS数据库入库教程:http://www.jb51.net/article/56448.htm

 

上例中,发布者与订阅者都使用同一个class RedisHelper, 现实场景中,发布者和订阅者不在同一台服务器上,试着分拆为:

发布者:

import redis

obj = redis.Redis(host=‘192.168.137.8‘)

msg = ‘short message‘

obj.publish(‘fm104.5‘, msg)        --向频道fm104.5发送msg

订阅者:

import redis

obj = redis.Redis(‘192.168.137.8‘)

redis_sub = obj.pubsub()          --返回一个发布订阅对象,有了这个对象,就可以订阅到指定频道并且接收消息

redis_sub.subscribe(‘fm104.5‘)   --订阅频道

redis_sub.parse_response()     --解析从redis server得到的消息,订阅成功后会默认收到一条 [‘subscribe‘, ‘fm104.5‘, 1L]

while True:

    msg = redis_sub.parse_response()   --接收消息

    print m

可以多发布者多订阅者

 

使用到的知识点 map iter

 

 

RabbitMQ

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

 

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQJMS类似,但不同的是JMSSUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。

RabbitMQ的结构图如下

技术分享

       Broker:简单来说就是消息队列服务器实体。

  Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

  Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

  Binding:绑定,它的作用就是把exchangequeue按照路由规则绑定起来。

  Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

  vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

  producer:消息生产者,就是投递消息的程序。

  consumer:消息消费者,就是接受消息的程序。

  channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

消息队列的使用过程大概如下:

        1)客户端连接到消息队列服务器,打开一个channel

  (2)客户端声明一个exchange,并设置相关属性。

  (3)客户端声明一个queue,并设置相关属性。

  (4)客户端使用routing key,在exchangequeue之间建立好绑定关系。

  (5)客户端投递消息到exchange

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:

  (1exchange持久化,在声明时指定durable => 1

  (2queue持久化,在声明时指定durable => 1

  (3)消息持久化,在投递时指定delivery_mode => 21是非持久化)

如果exchangequeue都是持久化的,那么它们之间的binding也是持久化的。如果exchangequeue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

 

RabbitMQ的生产和消费是某台服务器上的RabbitMQ Server实现的消息队列

生产者:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘192.168.137.8‘
))
channel = connection.channel()
channel.queue_declare(
queue=‘hello‘
)
channel.basic_publish(
exchange=‘‘
,
                      
routing_key=‘hello‘
,
                     
body=‘Hello World!‘
)
print(" [x] Sent ‘Hello World!‘"
)
connection.close()

 

消费者

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘192.168.137.8‘
))
channel = connection.channel()
channel.queue_declare(
queue=‘hello‘)
 #此处可以不声明,有什么影响?
def
callback(ch, method, properties, body):
   
print(" [x] Received %r"
% body)
channel.basic_consume(callback,
queue=‘hello‘, no_ack=True
)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘
)
channel.start_consuming()

当有多个消费者时,它们会排队领取生产者的消息,先来先取,后来后取

 

1acknowledgment 消息不丢失

 

no-ack False,如果生产者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘))

channel = connection.channel()

channel.queue_declare(queue=‘hello‘)

def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print ‘ok‘

    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback, queue=‘hello‘, no_ack=False)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)

channel.start_consuming()

如果设置 no_ackFalse,是否就得加上ch.basic_ack(delivery_tag = method.delivery_tag)?

        ---必须加,如果不加,

        1.如果此消费者没退出,那它不会再接收其它任务(队列认为此消费者有问题或还在等它的是否成功的返回),如果有新的任务,会被其它消费者拿走

        2.如果此消费者消费完后退出,生产者仍会把这条任务添加到队列中

 

2durable   消息不丢失

 

生产者

#!/usr/bin/env python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘))

channel = connection.channel()

# make message persistent

channel.queue_declare(queue=‘hello‘, durable=True)

channel.basic_publish(exchange=‘‘,

                      routing_key=‘hello‘,

                      body=‘Hello World!‘,

                      properties=pika.BasicProperties(

                          delivery_mode=2, # make message persistent

                      ))

print(" [x] Sent ‘Hello World!‘")

connection.close()

 

消费者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘))

channel = connection.channel()

# make message persistent

channel.queue_declare(queue=‘hello‘, durable=True)

def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print ‘ok‘

    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,

                      queue=‘hello‘,

                      no_ack=False)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)

channel.start_consuming()

 

3、消息获取顺序

 

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取奇数序列的任务,消费者2去队列中获取偶数序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

 

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘))

channel = connection.channel()

# make message persistent

channel.queue_declare(queue=‘hello‘)

def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print ‘ok‘

    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback, queue=‘hello‘,no_ack=False)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)

channel.start_consuming()

 

4.发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

exchange type = fanout   广播发送

发布者

import pika

import sys

connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘))

channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,type=‘fanout‘)

message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"

channel.basic_publish(exchange=‘logs‘, routing_key=‘‘,body=message)

print(" [x] Sent %r" % message)

connection.close()

 

消费者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))

channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,type=‘fanout‘)

result = channel.queue_declare(exclusive=True)

queue_name = result.method.queue

channel.queue_bind(exchange=‘logs‘,queue=queue_name)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):

    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name,

channel.start_consuming()

python学习第十二课

标签:memcache   redis   rabbitmq   

原文地址:http://120662.blog.51cto.com/110662/1737810

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!