标签:style blog color os io ar for art div
#!/usr/bin/python3 import time import json import re import datetime import os import uuid import zipfile import sys from configparser import RawConfigParser import io import copy import psycopg2 import psycopg2.extras import redis def zipdir(path, zipf): for root, dirs, files in os.walk(path): for file in files: zipf.write(os.path.join(root, file), os.path.basename(file)) def main(): if len(sys.argv) < 2: print("You must provide config file!", file=sys.stderr) sys.exit(-1) ini_str = ‘[root]\n‘ + open(sys.argv[1], ‘r‘).read() ini_fp = io.StringIO(ini_str) config = RawConfigParser(strict=False,allow_no_value=True) config.readfp(ini_fp) if len(sys.argv) > 2: cdr_down_key = sys.argv[2] else: cdr_down_key = ‘cdr_down‘ pattern = re.compile(r"""time\s*between\s*‘(.*?)‘\s*and\s*‘(.*?)‘""", re.I | re.M) while True: r = redis.StrictRedis(host=config.get(‘root‘,‘redis_host‘), port=config.getint(‘root‘,‘redis_port‘), db=0) received = r.lpop(cdr_down_key) if received: data = json.loads(received.decode(‘utf-8‘)) sql = data[‘sql‘] DB_EXPORT_PATH = data[‘db_export_path‘] DB_WEB_PATH = data[‘db_web_path‘] print(DB_EXPORT_PATH) LOG_ID = data[‘log_id‘] #print("SQL:", sql) match = pattern.search(sql) if match: canceled = False #connect db conn = psycopg2.connect(host=data[‘db_info‘][‘host‘], port=data[‘db_info‘][‘port‘], database=data[‘db_info‘][‘database‘], user=data[‘db_info‘][‘login‘], password=data[‘db_info‘][‘password‘]) conn.autocommit = True backend_pid = conn.get_backend_pid() cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cursor.execute("SELECT * FROM cdr_export_log WHERE id = %s", (LOG_ID, )) job = cursor.fetchone() if job[‘status‘] == 5: canceled = True continue cursor.execute("UPDATE cdr_export_log SET status = 1, backend_pid = %s WHERE id = %s", (backend_pid, LOG_ID)) start_time = match.group(1) end_time = match.group(2) start_time2 = start_time[:19] end_time2 = end_time[:19] timezone = start_time[20:] start_dt = datetime.datetime.strptime(start_time2, "%Y-%m-%d %H:%M:%S") end_dt = datetime.datetime.strptime(end_time2, "%Y-%m-%d %H:%M:%S") days = (end_dt - start_dt).days + 1 current_day = 0 print("Start:", start_dt, "End:", end_dt, "Time Zone:", timezone, "Days: ", days) loop_dt = copy.copy(start_dt) delta = datetime.timedelta(days=1) cdr_down_path = os.path.join(DB_EXPORT_PATH, ‘cdr_download‘) cdr_web_path = os.path.join(DB_WEB_PATH, ‘cdr_download‘) if not os.path.exists(cdr_web_path): os.mkdir(cdr_web_path, 511) # create temp dir unique_path = str(uuid.uuid4()) tempDir = os.path.join(cdr_down_path, unique_path) tempWebDir = os.path.join(cdr_web_path, unique_path) os.mkdir(tempWebDir) os.chmod(tempWebDir, 511) cursor.execute("UPDATE cdr_export_log SET status = %s, total_days = %s, completed_days = %s, file_dir = %s WHERE id = %s", (2, days, current_day, tempWebDir, LOG_ID)) while loop_dt.date() <= end_dt.date(): if loop_dt.date() == start_dt.date(): current_start_datetime = loop_dt.strftime("%Y-%m-%d %H:%M:%S") else: current_start_datetime = loop_dt.strftime("%Y-%m-%d 00:00:00") if loop_dt.date() == end_dt.date(): current_end_datetime = end_dt.strftime("%Y-%m-%d %H:%M:%S") else: current_end_datetime = loop_dt.strftime("%Y-%m-%d 23:59:59") print("%s ~ %s" % (current_start_datetime, current_end_datetime)) replaced_sql = pattern.sub("time between ‘%s %s‘ and ‘%s %s‘" % (current_start_datetime, timezone, current_end_datetime, timezone), sql) #print("Replaced SQL:", replaced_sql) filename = "%s %s_%s %s.csv" % (current_start_datetime, timezone, current_end_datetime, timezone) filepath = os.path.join(tempDir, filename) webfilepath = os.path.join(tempWebDir, filename) copy_sql = "COPY (%s) TO ‘%s‘ DELIMITER ‘,‘ CSV HEADER " % (replaced_sql, filepath) try: cursor.execute(copy_sql) except (psycopg2.extensions.QueryCanceledError, psycopg2.OperationalError): print("Canceled!") canceled = True break print("SQL:", copy_sql) loop_dt += delta current_day += 1 cursor.execute("UPDATE cdr_export_log SET completed_days = %s WHERE id = %s", (current_day, LOG_ID)) if canceled: cursor.close() conn.close() continue cursor.execute("UPDATE cdr_export_log SET status = 3 WHERE id = %s", (LOG_ID, )) zipfile_path = os.path.join(cdr_web_path, "%s.zip" % (str(uuid.uuid4()))) zipf = zipfile.ZipFile(zipfile_path, ‘w‘, allowZip64=True) zipdir(tempWebDir, zipf) zipf.close() print("ZIP File:", zipfile_path) cursor.execute("UPDATE cdr_export_log SET status = 4, file_path = %s WHERE id = %s", (zipfile_path, LOG_ID)) cursor.close() conn.close() time.sleep(1) if __name__ == "__main__": main()
标签:style blog color os io ar for art div
原文地址:http://www.cnblogs.com/alexkn/p/3952980.html