抓取数据的方法,前面的课程该讲的都已经讲了,爬取下来数据只是第一步,第二步就是要先存起来。我们最容易想到的就是存文件里喽,python写文件之前的课程也已经讲过了。存到文件里当然是可以的,但是你是否想过,每次使用都要把整个文件打开,然后读取,实在是有点不geek啊。
所以我们通常会选择存进数据库,方便写入和读取数据,并且对于大部分情况而言,python数据结构中的dict足够我们去结构化抓取的数据,那么能把两者发挥到极致的神器就是——mongodb!
mongodb
- 分布式
- 松散数据结构(json)
- 查询语言强大
文档
你可以看做是一个dict,dict里面还可以嵌套dict,例如:
{"name": "alan", score_list: {"chinese": 90, "english": 80}}
集合
一组文档,就是一堆dict。
数据库
多个集合组成数据库
这么理解:你可以把mongodb看做一个图书馆,图书馆中每本书就是文档,一个书架上的书是个集合,每个图书室的书架加起来就是个数据库。
安装
官方安装方法
学我教程的同学应该都知道,我不会给出具体步骤,鼓励大家按照官方文档去摸索,屏蔽伸手党。
该如何把抓取到的数据存入mongodb
- 把抓到的数据写成你想要的dict形式
- insert到指定的书架上
- 没了。。。
增删查改例子 python2版本
需要安装pymongo
pip install pymongo
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don‘t forget to add your pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html
import pymongo
import sys
import unittest
reload(sys)
sys.setdefaultencoding(‘utf-8‘)
class MongoAPI(object):
def __init__(self, db_ip, db_port, db_name, table_name):
self.db_ip = db_ip
self.db_port = db_port
self.db_name = db_name
self.table_name = table_name
self.conn = pymongo.MongoClient(host=self.db_ip, port=self.db_port)
self.db = self.conn[self.db_name]
self.table = self.db[self.table_name]
def get_one(self, query):
return self.table.find_one(query, projection={"_id": False})
def get_all(self, query):
return self.table.find(query)
def add(self, kv_dict):
return self.table.insert(kv_dict)
def delete(self, query):
return self.table.delete_many(query)
def check_exist(self, query):
ret = self.get(query)
return len(ret) > 0
# 如果没有 会新建
def update(self, query, kv_dict):
ret = self.table.update_many(
query,
{
"$set": kv_dict,
}
)
if not ret.matched_count or ret.matched_count == 0:
self.add(kv_dict)
elif ret.matched_count and ret.matched_count > 1:
self.delete(query)
self.add(kv_dict)
class DBAPITest(unittest.TestCase):
def setUp(self):
self.db_api = MongoAPI("127.0.0.1", # 图书馆大楼地址
27017, # 图书馆门牌号
"test", # 一号图书室
"test_table") # 第一排书架
def test(self):
db_api = self.db_api
db_api.add({"url": "test_url", "k": "v"})
self.assertEqual(db_api.get_one({"url": "test_url"})["k"], "v")
db_api.update({"url": "test_url"}, {"url_update": "url_update"})
ob = db_api.get_one({"url": "test_url"})
self.assertEqual(ob["url_update"], "url_update")
db_api.delete({"url": "test_url"})
self.assertEqual(db_api.get_one({"url": "test_url"}), None)
if __name__ == ‘__main__‘:
unittest.main()