(3) 需求:MongDB和redis由于需求的不同,一样平常都是合营利用。须要高性能的地方利用Redis,不须要高性能的地方利用MongDB。存储数据在MongDB和Redis之间做同步。以是一样平常环境下,利用MongDB作为持久化存储数据库存储数据,利用redis作为缓存提升读取速率。
二、数据同步实现方案把二手房小区价格数据,持久化存储在MongDB数据库中,然后利用redis作为缓存数据库,实现数据的快速读取。这样就须要保持redis和MongDB数据库的数据同等性,接下来,紧张讲解查询和数据更新过程的数据库同等性实现。
查询数据时,由于redis作为缓存实现快速读取数据,以是首先查询redis中是否存在数据,若存在则返回查询结果,若不存在,则向MongDB数据库要求查询数据,然后由MongDB数据库返回结果。查询流程如下如所示。而且,由于本文中redis作为缓存利用,以是须要添加过期韶光,也便是为redis的每条数据记录添加过期韶光,若过期韶光数据没有被查询则打消,若此韶光内,数据被查询,则过期韶光重置,这样可以定时打消查询不频繁的数据存在redis中,增加数据读取速率。

redis作为MongoDB的缓存在线实时去重,可以办理在多进程、多线程、异步爬虫时的数据实时重复问题。什么叫数据实时重复呢?
数据重复紧张表示在,爬虫一但须要全体库全部数据的实时更新,但并不知道对方网站数据只有一部分数据更新。但要全体库全部数据不能有重复,没法监控对方网页,就须要重新爬全体网站。用案例最能解释问题。
案例一:二手房价格更新8月数据如下9月数据如下如何判断是数据库有,但真实新数据变革的?
如果有数据,看新爬取的价格韶光是否在数据库价格韶光列表中,如果priceMonth不在个中则插入,如果在个中则不插入。这里例如第一次爬取在8月,数据大多展示7月数据。本月9月须要把数据更新到8月,要担保数据成功更新,又不重复。就须要快速判断什么页面须要更新,什么页面不须要更新。从而减少要求次数,提高爬虫效率,并且减少对方做事器压力。
明确重复数据的特色首先明确什么样的数据是重复的。
由于有一万五相同小区名字,但在不同城市的数据。如果拿小区+月份重复,这一万五就丧失落了。就和一个"春天花园"一样,六七个省都有这个小区名字 。那就须要思考省市区(县)+小区+月份更准确 还是 城市+小区+月份就可以。会不会有同一个城市不同的区县有相同的小区名字。
由于中国没有相同市、区的名字,不加“省”也是可以的。终极拿市+区(县)+小区名字+年月作为主键,担保数据绝对的不重复。因此这里redis表的设计采取hash类型的数据,这样可以存在多个key-value对,以用户ID作为hash表的名称。
keyend=item["city"]+item["region"]+item["projectName"]+item["priceMonth"]
基于Redis的数据库性子,查询插入hkeys,hmset的效率比用pandas去重,mongoDB去重,效率高很多,这是大费周章用redis的缘故原由,不然为啥不用mongoDB呢?这里读者创造我不对,或者这种方法效率更低,我乐意有偿听你见地(我只是redis的菜鸟)
import redisfrom multiprocessing.dummy import Pool as ThreadPoolimport copypool =redis.ConnectionPool(host='localhost',port=6379,db=2)connection = redis.Redis(connection_pool=pool)# connection.flushall()#清空数据库list_=copy.deepcopy(mycol2_list)#list(mycol1.find())def pross_Redis(item): item["_id"]=str(item['_id']) item["center"]=str(item["center"]) keyend=item["city"]+item["region"]+item["projectName"]+item["priceMonth"]# print(len(connection.hkeys(keyend))) if len(connection.hkeys(keyend))!=10:#如果没有 print(1) connection.hmset(keyend, mapping=item) # 批量插入 elif len(connection.hkeys(keyend))==10:#如果有 if item["priceMonth"] not in [connection.hget(keyend,"priceMonth").decode('utf-8')]: print(2) connection.hmset(keyend, mapping=item) # 批量插入 else: print("重复数据")# referencePrice=connection.hget(keyend,"referencePrice") else: print("其他数据")# mycol1.insert_one(item)#.update({'_id':id_}, {'$rename': {'updateDate': 'priceMonth'}}, False, True)pool = ThreadPool(10)pool.map(pross_Redis,list_[:30])pool.close()pool.join()fauture=mycol2_list[36]item=fauturekeyend=item["city"]+item["region"]+item["projectName"]+item["priceMonth"]isExists = connection.hexists(keyend,"projectName")if isExists!=True:#如果新数据不存在于数据库,插入 print(1)
终极效果如下
详细设置方法
方案1 缓存方案
爬"上海市"的时候,把mongoDB上海市的数据缓存到redis,然后设置30分钟或者爬取完开释缓存。
方案2 Redis过滤器快速去重
(1)刚开始爬虫 把MongDB数据库所有数据缓存到redis
(2)把新爬的数据按照keyend和redis主键比拟
(3)如果keyend不在redis主键中,插入redis,再插入MongDB
(4)如果keyend在redis主键中,不插入redis
(5) connection.flushall()开释redis数据的缓存
这个拿字典也能实现,只是效率没有redis高,等到一百万数据的时候这个会很明显
终极pipeline的写法如下
from itemadapter import ItemAdapterimport pymongo as pymongofrom .items import ShellItemimport pandas as pdimport redisfrom multiprocessing.dummy import Pool as ThreadPoolimport copyclass ShellPipeline:"省略数据库部分" def process_item(self, items, spider): items1= ItemAdapter(items).asdict() global connection # 用redis作为缓存来去重数据 pool = redis.ConnectionPool(host='localhost', port=6379, db=2) connection = redis.Redis(connection_pool=pool)#这里必须每次插入都导入最新redis def pross_Redis(item): # item["_id"] = str(item['_id']) item = { "province": item["province"], "city": item["city"], "spiderDate": item["spiderDate"], "projectName": item["projectName"], "referencePrice": item["referencePrice"], "region": item["region"], "priceMonth": item["priceMonth"], "deliveryDate": item["deliveryDate"], "center": item["center"]} item["center"] = str(item["center"]) keyend = item["city"] + item["region"] + item["projectName"] + item["priceMonth"] # print(len(connection.hkeys(keyend))) if len(connection.hkeys(keyend)) != 9: # 如果没有数据,9是我数据item有9个key print(1) connection.hmset(keyend, mapping=item) # 批量插入 return True elif len(connection.hkeys(keyend)) == 9: # 如果有 if item["priceMonth"] not in [connection.hget(keyend, "priceMonth").decode('utf-8')]: print(2) connection.hmset(keyend, mapping=item) # 批量插入 else: print("重复数据") else: print("其他数据") postItem = dict(items1) keyend1= items1["city"] + items1["region"] + items1["projectName"] + items1["priceMonth"] isExists = connection.hexists(keyend1, "projectName") if isExists != True: # 如果新数据不存在于数据库,插入 # 把item转化成字典形式 print(postItem) judje=pross_Redis(postItem) if judje==True: print(postItem) self.coll.insert_one(postItem)#self.coll为MongoDB return items1
启动函数main如下
from twisted.internet import reactor, deferfrom scrapy.crawler import CrawlerRunnerfrom scrapy.utils.log import configure_loggingimport timeimport loggingfrom scrapy.utils.project import get_project_settingsimport multiprocessing# import psycopg2import time# # 在掌握台打印日志# configure_logging()# # CrawlerRunner获取settings.py里的设置信息# runner = CrawlerRunner(get_project_settings())import redisfrom multiprocessing.dummy import Pool as ThreadPoolimport copyglobal connectionimport pymongomyclient = pymongo.MongoClient("mongodb://localhost:27017/") #,username='root',password='18091471364@ch'利用MongoClient工具,连接数据库collist = myclient.list_database_names() # 获取所有数据库mydb = myclient["companyln"] # 数据库名 esfcomunicatedatalist=[]#for i in ["test_815_Night","test_815_Night_end","test_esf815"]:mycol2 = mydb["company_second_hand_house_price"]# collection凑集(类似SQL的表)# datalist=datalist+list(mycol.find())# 用redis作为缓存来去重数据pool1 = redis.ConnectionPool(host='localhost', port=6379, db=2)connection = redis.Redis(connection_pool=pool1)# 更新取消注释,把底库导入redis## def pross_Redis(item):# # item["_id"] = str(item['_id'])# item = {"province": item["province"],# "city": item["city"],# "spiderDate": item["spiderDate"],# "projectName": item["projectName"],# "referencePrice": item["referencePrice"],# "region": item["region"],# "priceMonth": item["priceMonth"],# "deliveryDate": item["deliveryDate"],# "center": item["center"]}## item["center"] = str(item["center"])# keyend = item["city"] + item["region"] + item["projectName"] + item["priceMonth"]# # print(len(connection.hkeys(keyend)))# if len(connection.hkeys(keyend)) != 9: # 如果没有# print(1)# connection.hmset(keyend, mapping=item) # 批量插入# return True# elif len(connection.hkeys(keyend)) == 9: # 如果有# if item["priceMonth"] not in [connection.hget(keyend, "priceMonth").decode('utf-8')]:# print(2)# connection.hmset(keyend, mapping=item) # 批量插入# else:# print("重复数据")# # referencePrice=connection.hget(keyend,"referencePrice")# else:# print("其他数据")# connection.flushall()#清空数据库# 如何判断是数据库有,但真实新数据变革的# [connection.hget(keyend,"priceMonth").decode('utf-8')]#==xinshuji.encode('utf-8')# 把不同月份的合并起来成一个列表,但只显示最新数据# 如果有数据,看新爬取的价格韶光是否在数据库价格韶光列表中,如果priceMonth不在个中则插入,如果在个中则不插入# list_ = copy.deepcopy(list(mycol2.find()))# from multiprocessing import Pool# multiprocessing = Pool(processes=8)## def ThreadPool1(list_):# pool = ThreadPool(10)# pool.daemon=True# pool.map(pross_Redis, list_)# pool.close()# pool.join()# multiprocessing.map(pross_Redis, list_)# multiprocessing.close()# multiprocessing.join()#第1行代码导入CMDLINE模块来实行命令行指令。第2行代码用split()函数根据空格拆分指令字符串,再用execute()函数输入到命令行中实行,相称于直接在终端中实行指令“scapy crawl爬虫名”。from scrapy.crawler import CrawlerProcessfrom scrapy.utils.project import get_project_settingsfrom scrapy.spiderloader import SpiderLoaderimport timefrom multiprocessing import Poollist_all=[['End_gansu','End_guangdong','End_guangxi','End_guizhou'],['End_hainan','End_hebei','End_heilongjiang','End_henan'],['End_hubei','End_jiangsu','End_jiangxi','End_shanghai'],['End_shanxi','End_sichuan','End_tianjing','End_xinjiang'],['End_fujian','End_shandong','End_yunnan','End_zhejiang'],['End_anhui', 'End_beijing','End_chongqing',]]#['End_liaoning','End_jilin','End_neimenggu','End_ningxia'],# 根据项目配置获取 CrawlerProcess 实例def process1(name): # try: process = CrawlerProcess(settings=get_project_settings()) process.crawl(name) process.start() # except: # pass# print(process)# # 获取 spiderloader 工具,以进一步获取项眼前所有爬虫名称spider_loader = list_all#SpiderLoader(list_all)if __name__ == '__main__': for P in list_all: # LIST1=P start_3=time.time() pool = Pool(processes=4) pool.daemon = True pool.map(process1, P)#LIST1 pool.close() pool.join() end_3=time.time() print('四个进程',end_3-start_3)
案例 二 小程序 显示受限数据实时更新
碰着每次展示150条,但每次要求返回不同的数据,这种数据库看似深不见底,由于不知道到底有多少数据,就须要尽可能多地爬取。
确定enterpriseName公司名称为主键,作为去重keyend详细设置方法
方案1 缓存方案
爬"上海市"的时候,把mongoDB的数据缓存到redis,然后设置30分钟或者爬取完开释缓存。
方案2 Redis过滤器快速去重
(1)刚开始爬虫 把MongDB数据库所有数据缓存到redis
(2)把新爬的数据按照keyend和redis主键比拟
(3)如果keyend不在redis主键中,插入redis,再插入MongDB
(4)如果keyend在redis主键中,不插入redis
(5) connection.flushall()开释redis数据的缓存
这个拿字典也能实现,只是效率没有redis高,等到一百万数据的时候这个会很明显
在这里插入代码片