码迷,mamicode.com
首页 > 编程语言 > 详细

生成CDR的相关报告(by python3)

时间:2014-09-03 10:57:36      阅读:425      评论:0      收藏:0      [点我收藏+]

标签:style   blog   color   os   io   ar   for   art   div   

#!/usr/bin/python3
import time
import json
import re
import datetime
import os
import uuid
import zipfile
import sys
from configparser import RawConfigParser
import io
import copy

import psycopg2
import psycopg2.extras
import redis

def zipdir(path, zipf):
    for root, dirs, files in os.walk(path):
        for file in files:
            zipf.write(os.path.join(root, file), os.path.basename(file))

def main():
    if len(sys.argv) < 2:
        print("You must provide config file!", file=sys.stderr)
        sys.exit(-1)
    ini_str = [root]\n + open(sys.argv[1], r).read()
    ini_fp  = io.StringIO(ini_str)
    config = RawConfigParser(strict=False,allow_no_value=True)
    config.readfp(ini_fp)

    if len(sys.argv) > 2:
        cdr_down_key = sys.argv[2]
    else:
        cdr_down_key = cdr_down

    pattern = re.compile(r"""time\s*between\s*‘(.*?)‘\s*and\s*‘(.*?)‘""", re.I | re.M)
    while True:
        r = redis.StrictRedis(host=config.get(root,redis_host), port=config.getint(root,redis_port), db=0)
        received = r.lpop(cdr_down_key)
        if received:
            data = json.loads(received.decode(utf-8))
            sql = data[sql]
            DB_EXPORT_PATH = data[db_export_path]
            DB_WEB_PATH = data[db_web_path]
            print(DB_EXPORT_PATH)
            LOG_ID = data[log_id]
            #print("SQL:", sql)
            match = pattern.search(sql)
            if match:
                canceled = False
                #connect db
                conn = psycopg2.connect(host=data[db_info][host], port=data[db_info][port], 
                            database=data[db_info][database],
                             user=data[db_info][login], password=data[db_info][password])
                conn.autocommit = True
                backend_pid = conn.get_backend_pid()
                cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
                cursor.execute("SELECT * FROM cdr_export_log WHERE id = %s", (LOG_ID, ))
                job = cursor.fetchone()
                if job[status] == 5:
                    canceled = True
                    continue

                cursor.execute("UPDATE cdr_export_log SET status = 1, backend_pid = %s  WHERE id = %s", (backend_pid, LOG_ID))

                start_time = match.group(1)
                end_time   = match.group(2)
                start_time2 = start_time[:19]
                end_time2   = end_time[:19]
                timezone   = start_time[20:]
                start_dt   = datetime.datetime.strptime(start_time2, "%Y-%m-%d %H:%M:%S")
                end_dt     = datetime.datetime.strptime(end_time2, "%Y-%m-%d %H:%M:%S")
                days       = (end_dt - start_dt).days + 1

                current_day = 0
                print("Start:", start_dt, "End:", end_dt, "Time Zone:", timezone, "Days: ", days)
                loop_dt = copy.copy(start_dt)
                delta = datetime.timedelta(days=1)

                cdr_down_path = os.path.join(DB_EXPORT_PATH, cdr_download)
                cdr_web_path = os.path.join(DB_WEB_PATH, cdr_download)
                if not os.path.exists(cdr_web_path):
                    os.mkdir(cdr_web_path, 511)

                # create temp dir
                unique_path = str(uuid.uuid4())
                tempDir = os.path.join(cdr_down_path, unique_path)
                tempWebDir = os.path.join(cdr_web_path, unique_path)
                os.mkdir(tempWebDir)
                os.chmod(tempWebDir, 511)
                cursor.execute("UPDATE cdr_export_log SET status = %s, total_days = %s,  completed_days = %s, file_dir = %s WHERE id = %s", (2, days, current_day, tempWebDir, LOG_ID))
                while loop_dt.date() <= end_dt.date():
                    if loop_dt.date() == start_dt.date():
                        current_start_datetime = loop_dt.strftime("%Y-%m-%d %H:%M:%S")
                    else:
                        current_start_datetime = loop_dt.strftime("%Y-%m-%d 00:00:00")

                    if loop_dt.date() == end_dt.date():
                        current_end_datetime   = end_dt.strftime("%Y-%m-%d %H:%M:%S")
                    else:
                        current_end_datetime   = loop_dt.strftime("%Y-%m-%d 23:59:59")

                    print("%s ~ %s" % (current_start_datetime, current_end_datetime))

                    replaced_sql = pattern.sub("time between ‘%s %s‘ and ‘%s %s‘" % (current_start_datetime, 
                                    timezone, current_end_datetime, timezone), sql)
                    #print("Replaced SQL:", replaced_sql)
                    filename = "%s %s_%s %s.csv" % (current_start_datetime, 
                                    timezone, current_end_datetime, timezone)
                    filepath = os.path.join(tempDir, filename)
                    webfilepath = os.path.join(tempWebDir, filename)
                
                    copy_sql  = "COPY (%s) TO ‘%s‘ DELIMITER ‘,‘  CSV HEADER " % (replaced_sql, filepath)
                    try:
                        cursor.execute(copy_sql)
                    except (psycopg2.extensions.QueryCanceledError, psycopg2.OperationalError):
                        print("Canceled!")
                        canceled = True
                        break
                    print("SQL:", copy_sql)


                    loop_dt += delta
                    current_day += 1
                    cursor.execute("UPDATE cdr_export_log SET completed_days = %s WHERE id = %s", (current_day, LOG_ID))


                if canceled:
                    cursor.close()
                    conn.close()
                    continue
                cursor.execute("UPDATE cdr_export_log SET status = 3 WHERE id = %s", (LOG_ID, ))
                zipfile_path = os.path.join(cdr_web_path, "%s.zip" % (str(uuid.uuid4())))
                zipf = zipfile.ZipFile(zipfile_path, w, allowZip64=True)
                zipdir(tempWebDir, zipf)
                zipf.close()
                print("ZIP File:", zipfile_path)
                cursor.execute("UPDATE cdr_export_log SET status = 4, file_path = %s WHERE id = %s", (zipfile_path, LOG_ID))

                cursor.close()
                conn.close()

        time.sleep(1)

if __name__ == "__main__":
    main()

 

生成CDR的相关报告(by python3)

标签:style   blog   color   os   io   ar   for   art   div   

原文地址:http://www.cnblogs.com/alexkn/p/3952980.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!