标签:for object utf-8 pytho task prefix pam ram created
import datetime import sys import oss2 from itertools import islice import pandas as pd import re import json from pandas.tseries.offsets import Day from multiprocessing import Process, JoinableQueue, cpu_count, Manager import time def mkbuck(bk): auth = oss2.Auth(username, password) bucket = oss2.Bucket(auth, address, bk) return bucket #获取前天最后一小时的paths def getbflastpt(bucket, bfyespattern): bfpamax = [] for bf in islice(oss2.ObjectIterator(bucket, prefix=bfyespattern), sys.maxsize): c = bf.key if c[-1:] != ‘/‘: bfpamax.append(int(c.split(‘/‘)[4])) last = pd.Series(bfpamax).unique().max() if last < 10: bflastpt = bfyespattern + ‘/0‘ + str(last) else: bflastpt = bfyespattern + ‘/‘ + str(last) return bflastpt #获取当天第一个小时的paths def getnowfirstpt(bucket, nowpattern): bfpamin = [] for bf in islice(oss2.ObjectIterator(bucket, prefix=nowpattern), sys.maxsize): c = bf.key if c[-1:] != ‘/‘: bfpamin.append(int(c.split(‘/‘)[4])) first = pd.Series(bfpamin).unique().min() if first < 10: nowfirstpt = nowpattern + ‘/0‘ + str(first) else: nowfirstpt = nowpattern + ‘/‘ + str(first) return nowfirstpt #获取所有的昨日paths,并合并得到完全的paths和数量 def getfullnum(bk, bfyespattern, nowpattern, yespattern): lists = [] bucket = mkbuck(bk) bfyespattern = getbflastpt(bucket, bfyespattern) nowpattern = getnowfirstpt(bucket, nowpattern) timelist = (s for s in (bfyespattern, yespattern, nowpattern)) for pter in timelist: for bf in islice(oss2.ObjectIterator(bucket, prefix=pter), sys.maxsize): c = bf.key lists.append(c) return lists, len(lists) #以下为进程间通信,即生产者、消费者模型 def getfull(bk, bfyespattern, nowpattern, yespattern, q): lists, num = getfullnum(bk, bfyespattern, nowpattern, yespattern) for c in lists: q.put(c) q.join() def consumer(bk, q, d): bucket = mkbuck(bk) repattern2 = re.compile(‘{.*"adadji",.*}‘) while True: js = [] ress = q.get() if ress[-1:] != ‘/‘: remote_data = bucket.get_object(ress).read().decode(‘utf-8‘) aa = (d for d in repattern2.findall(remote_data)) for a in aa: temdic = json.loads(a) if (starttime <= temdic[‘created_at‘]) and (temdic[‘created_at‘] <= endtime): js.append(temdic) df = pd.DataFrame(js, columns=[‘dd‘,‘cc‘]) d[ress] = df##d为通过主进程Manager共享变量将数据取出 # print(ress) q.task_done()# 向q.join()发送一次信号,证明一个数据已经被取走了 if __name__ == ‘__main__‘: s1 = time.time() now_time = datetime.datetime.now() # 获取当前时间 bfyes_time = (now_time - 2 * Day()).strftime(‘%Y/%m/%d‘) yes_time = (now_time - 1 * Day()).strftime(‘%Y/%m/%d‘) yesdate = (now_time - 1 * Day()).strftime(‘%Y-%m-%d‘) yesdate1 = (now_time - 1 * Day()).strftime(‘%Y%m%d‘) endtime = (now_time - 1 * Day()).strftime(‘%Y-%m-%d 23:59:59‘) starttime = (now_time - 1 * Day()).strftime(‘%Y-%m-%d 00:00:00‘) nowdate = now_time.strftime(‘%Y/%m/%d‘) bk = ‘xxx‘ bfyespattern = ‘%s/%s‘ % (bk, bfyes_time) yespattern = ‘%s/%s‘ % (bk, yes_time) nowpattern = ‘%s/%s‘ % (bk, nowdate) q = JoinableQueue(cpu_count()) m = Manager() d = m.dict() ##主进程字典共享 p1 = Process(target=getfull, args=(‘xx‘, bfyespattern, nowpattern, yespattern, q)) #####生成consumer多进程 cc = [] for c in range(cpu_count() - 1): c1 = Process(target=consumer, args=(‘xx‘, q, d)) cc.append(c1) p_l = [p1] for c in cc: c.daemon = True p_l.append(c) for p in p_l: p.start() p1.join() d = d.values() df1 = pd.concat(d, ignore_index=True) df1.sort_values(‘created_at‘, inplace=True) print(time.time() - s1) print(‘=‘ * 20) print(df1)
说明:需求为获取昨日的数据即可,因oss实时数据存储可能存在提前或延迟情况,因此读取前天的最后一小时,昨日全部,当天最开始一小时数据,读者可根据自身情况进行修改
以多进程读取oss符合条件的数据为例,综合使用多进程间的通信、获取多进程的数据
标签:for object utf-8 pytho task prefix pam ram created
原文地址:https://www.cnblogs.com/mahailuo/p/9293825.html