码迷,mamicode.com
首页 > 系统相关 > 详细

以多进程读取oss符合条件的数据为例,综合使用多进程间的通信、获取多进程的数据

时间:2018-07-11 14:42:50      阅读:180      评论:0      收藏:0      [点我收藏+]

标签:for   object   utf-8   pytho   task   prefix   pam   ram   created   

import datetime
import sys
import oss2
from itertools import islice
import pandas as pd
import re
import json
from pandas.tseries.offsets import Day
from multiprocessing import Process, JoinableQueue, cpu_count, Manager
import time


def mkbuck(bk):
	auth = oss2.Auth(username, password)
	bucket = oss2.Bucket(auth, address, bk)
	return bucket

#获取前天最后一小时的paths
def getbflastpt(bucket, bfyespattern):
	bfpamax = []
	for bf in islice(oss2.ObjectIterator(bucket, prefix=bfyespattern), sys.maxsize):
		c = bf.key
		if c[-1:] != ‘/‘:
			bfpamax.append(int(c.split(‘/‘)[4]))
	last = pd.Series(bfpamax).unique().max()
	if last < 10:
		bflastpt = bfyespattern + ‘/0‘ + str(last)
	else:
		bflastpt = bfyespattern + ‘/‘ + str(last)
	return bflastpt

#获取当天第一个小时的paths
def getnowfirstpt(bucket, nowpattern):
	bfpamin = []
	for bf in islice(oss2.ObjectIterator(bucket, prefix=nowpattern), sys.maxsize):
		c = bf.key
		if c[-1:] != ‘/‘:
			bfpamin.append(int(c.split(‘/‘)[4]))
	first = pd.Series(bfpamin).unique().min()
	if first < 10:
		nowfirstpt = nowpattern + ‘/0‘ + str(first)
	else:
		nowfirstpt = nowpattern + ‘/‘ + str(first)
	return nowfirstpt

#获取所有的昨日paths,并合并得到完全的paths和数量
def getfullnum(bk, bfyespattern, nowpattern, yespattern):
	lists = []
	bucket = mkbuck(bk)
	bfyespattern = getbflastpt(bucket, bfyespattern)
	nowpattern = getnowfirstpt(bucket, nowpattern)
	timelist = (s for s in (bfyespattern, yespattern, nowpattern))
	for pter in timelist:
		for bf in islice(oss2.ObjectIterator(bucket, prefix=pter), sys.maxsize):
			c = bf.key
			lists.append(c)
	return lists, len(lists)

#以下为进程间通信,即生产者、消费者模型
def getfull(bk, bfyespattern, nowpattern, yespattern, q):
	lists, num = getfullnum(bk, bfyespattern, nowpattern, yespattern)
	for c in lists:
		q.put(c)
	q.join()


def consumer(bk, q, d):
	bucket = mkbuck(bk)
	repattern2 = re.compile(‘{.*"adadji",.*}‘)
	while True:
		js = []
		ress = q.get()
		if ress[-1:] != ‘/‘:
			remote_data = bucket.get_object(ress).read().decode(‘utf-8‘)
			aa = (d for d in repattern2.findall(remote_data))
			for a in aa:
				temdic = json.loads(a)
				if (starttime <= temdic[‘created_at‘]) and (temdic[‘created_at‘] <= endtime):
					js.append(temdic)
		df = pd.DataFrame(js, columns=[‘dd‘,‘cc‘])
		d[ress] = df##d为通过主进程Manager共享变量将数据取出
		# print(ress)
		q.task_done()# 向q.join()发送一次信号,证明一个数据已经被取走了


if __name__ == ‘__main__‘:
	s1 = time.time()
	now_time = datetime.datetime.now()  # 获取当前时间
	bfyes_time = (now_time - 2 * Day()).strftime(‘%Y/%m/%d‘)
	yes_time = (now_time - 1 * Day()).strftime(‘%Y/%m/%d‘)
	yesdate = (now_time - 1 * Day()).strftime(‘%Y-%m-%d‘)
	yesdate1 = (now_time - 1 * Day()).strftime(‘%Y%m%d‘)
	endtime = (now_time - 1 * Day()).strftime(‘%Y-%m-%d 23:59:59‘)
	starttime = (now_time - 1 * Day()).strftime(‘%Y-%m-%d 00:00:00‘)
	nowdate = now_time.strftime(‘%Y/%m/%d‘)
	
	bk = ‘xxx‘
	bfyespattern = ‘%s/%s‘ % (bk, bfyes_time)
	yespattern = ‘%s/%s‘ % (bk, yes_time)
	nowpattern = ‘%s/%s‘ % (bk, nowdate)
	
	q = JoinableQueue(cpu_count())
	m = Manager()
	d = m.dict()  ##主进程字典共享
	p1 = Process(target=getfull, args=(‘xx‘, bfyespattern, nowpattern, yespattern, q))
	#####生成consumer多进程
	cc = []
	for c in range(cpu_count() - 1):
		c1 = Process(target=consumer, args=(‘xx‘, q, d))
		cc.append(c1)
	
	p_l = [p1]
	for c in cc:
		c.daemon = True
		p_l.append(c)
	
	for p in p_l:
		p.start()
	p1.join()
	d = d.values()
	df1 = pd.concat(d, ignore_index=True)
	df1.sort_values(‘created_at‘, inplace=True)
	print(time.time() - s1)
	print(‘=‘ * 20)
	print(df1)

  说明:需求为获取昨日的数据即可,因oss实时数据存储可能存在提前或延迟情况,因此读取前天的最后一小时,昨日全部,当天最开始一小时数据,读者可根据自身情况进行修改

以多进程读取oss符合条件的数据为例,综合使用多进程间的通信、获取多进程的数据

标签:for   object   utf-8   pytho   task   prefix   pam   ram   created   

原文地址:https://www.cnblogs.com/mahailuo/p/9293825.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!