标签:不同 sel channel sage 规范 推送 oca red func
最终效果
ASGI 、Django Channels 简介
ASGI 的完整说明我在去年做了一个翻译。
ASGI 由 Django 团队提出,为了解决在一个网络框架里(如 Django)同时处理 HTTP、HTTP2、WebSocket 协议。为此,Django 团队开发了 Django Channels 插件,为 Django 带来了 ASGI 能力。
在 ASGI 中,将一个网络请求划分成三个处理层面,最前面的一层,interface server(协议处理服务器),负责对请求协议进行解析,并将不同的协议分发到不同的 Channel(频道);频道属于第二层,通常可以是一个队列系统。频道绑定了第三层的 Consumer(消费者)。
比如说,HTTP 协议的频道绑定了 HTTP 的消费者,当有新的 HTTP 请求过来时,interface server 将该请求分发到 HTTP 频道,HTTP 频道绑定的 HTTP 消费者对该请求进行处理,将处理结果返回给 HTTP 频道,最终传回给客户端。
下面,我们用一个实例演示下这种能力。
实时博客的实现
第一步:跑起来
本例基于 Python 3.5.1、macOS 10.12.4 beta 5,理论上 Python 2.7、Linux 可以跑起来(测试工作请自行认领)。
安装依赖:pip install -r requirements.txt
requirements.txt 如下:
django channels asgi_redis feedparser
新建 Django 项目:django-admin startproject livelog。
在 settings.py 里面,将 channels 添加到 INSTALLED_APPS,再加上基础的 channels 相关配置:
... INSTALLED_APPS = ( ... ‘channels‘, ) ... # Redis REDIS_OPTIONS = { ‘HOST‘: ‘127.0.0.1‘, ‘PORT‘: 6379, ‘DB‘: 0 } USE_REDIS = True # Channel settings CHANNEL_LAYERS = { "default": { "BACKEND": "asgi_redis.RedisChannelLayer", "CONFIG": { "hosts": [‘redis://{}:{}‘.format(REDIS_OPTIONS[‘HOST‘], REDIS_OPTIONS[‘PORT‘])] }, "ROUTING": "livelog.routing.channel_routing" } }
注意到使用了 Redis 来做为 Channels 的 Backend。使用 Redis 是为了跨进程的消息处理,为了简便(不需要跨进程),也可以使用 In Memory Channel Layer。
接着,我们在 livelog app 目录下添加一个 routing.py 文件,用以配置 channel 路由,现在可以留空:
channel_routing = []
现在,通过以下命令即可跑起来我们的第一个 ASGI Server。
# run following commands in command line separately redis-server /usr/local/etc/redis.conf python manage.py runserver
可以看到和普通的 Django app server 的启动方式没什么两样,这归功于 Django Channels 封装。实际工作中,要拆解为三个单独的启动步骤:
daphne livelog.asgi:channel_layer --port 8080 //daphne 是 asgi interface sever 的一种实现 python manage.py runworker // 启动 ASGI consumer worker python manage.py runserver --noasgi --noworker // 启动原始的 WSGI server,不运行 ASGI interface server、worker
第二步:处理 WebSocket
Channels 将 WebSocket 连接映射到三个不同到频道:
首先,我们需要将我们的处理逻辑绑定到这三个频道。
在 livelog app 目录中,添加 comsumers.py 文件:
... def ws_connect(message): message.reply_channel.send({‘accept‘: True}) Group(const.GROUP_NAME).add(message.reply_channel) def ws_disconnect(message): Group(const.GROUP_NAME).discard(message.reply_channel) def ws_receive(message): pass
在本例中,使用 WSGI 也即原生 Django 提供 http 服务。
在 routing.py 文件中,将逻辑绑定到路由:
... channel_routing = [ route(‘websocket.connect‘, ws_connect), route(‘websocket.disconnect‘, ws_disconnect), route(‘websocket.receive‘, ws_receive) ]
在 ws_connect 频道中,做了两件事:
在 ws_disconnect 频道中,将断开连接的用户移出群组。
在 ws_receive 频道中,暂时什么也不做,因为本例中,我们不通过被动接受新消息的方式来更新博客。
为了较为全面地展示 ASGI 的能力,本例中,我们使用 Background worker(后台进程)来主动更新博客,并给群组用户推送新消息。
第三步:推送消息
更新实时博客的方式有很多种,在这里我们使用一个 RSS Feed 作为内容源,对该内容源定时抓取,将新内容更新到博客中,以此来做一个演示。在此之前,介绍一个 Django 的小特性,这个特性将允许我们这么运行我们自己的后台进程:
python manage.py blogging_worker // blogging_worker 是我们自定义的命令
这个特新就是 Django 的自定义命令
具体做法在此不赘述,看文档或看本例完整源码即可得知。下面我们回到正事儿上:
blogging_worker.py
# -*- coding: utf-8 -*- ... class Command(BaseCommand): """ Command to start blogging worker from command line. """ help = ‘Fetch and parse RSS feed and send over channel‘ def handle(self, *args, **options): rc = redis.Redis(host=settings.REDIS_OPTIONS[‘HOST‘], port=settings.REDIS_OPTIONS[‘PORT‘], db=settings.REDIS_OPTIONS[‘DB‘]) rc.delete(const.GROUP_NAME) # flush live blogs while True: feed = feedparser.parse(const.IFANR_FEED_URL) for entry in feed.get(‘entries‘)[::-1]: if not rc.hexists(const.GROUP_NAME, entry.get(‘id‘)): Group(const.GROUP_NAME).send({‘text‘: json.dumps(entry)}) rc.hset(const.GROUP_NAME, entry.get(‘id‘), json.dumps(entry)) logger.debug(‘send a message %s ‘ % entry.get(‘title‘)) time.sleep(5)
这个文件里的大量写法都是 Django 约束的,所以不必过分追究,我们看 handle 方法,在这里,我们做这么几件事:
第四步:接收新消息并渲染
我们已经有了一个永不停歇的博客更新服务在运行着,一旦有新的内容,这个服务将为用户推送。现在,我们需要一个页面来渲染呈现新消息。
在 templates 目录下,新建 blog.html 文件:
<!DOCTYPE html> <html lang="en"> ... <div id="container"> <h2>Live Blog</h2> <ul id="log"> </ul> </div> <script type="application/javascript"> var ws_scheme = window.location.protocol == "https:" ? "wss" : "ws"; var ws = new WebSocket(ws_scheme + ‘://‘ + window.location.host + window.location.pathname); console.log(ws); ws.onmessage = function (message) { var data = JSON.parse(message.data); var logList = document.querySelector(‘#log‘); var logItem = document.createElement(‘li‘); var itemTmp = ` <h3>${data.title}</h3> <p> <date>${new Date(data.published).toLocaleString()}</date> <span>${data.source.title}</span> </p> `; logItem.innerHTML = itemTmp; logList.insertBefore(logItem, logList.firstChild); } </script> </body> </html>
在这个页面里,有一个 WebSocket Client 将被新建,并向服务器发起建立连接的请求。当有新消息,将在 #log ul 中插入新的消息。
接下来就是传统 Django 的工作:
到此,一个主动更新博客内容并将新消息推送给每位在线的用户的服务就完成了。
References
标签:不同 sel channel sage 规范 推送 oca red func
原文地址:https://www.cnblogs.com/pythongood/p/11183061.html