码迷,mamicode.com
首页 > Web开发 > 详细

newWebMethod

时间:2019-01-30 12:56:34      阅读:262      评论:0      收藏:0      [点我收藏+]

标签:网络   gre   lines   net   play   elements   localtime   elm   edr   

#!/bin/env python
#-*- encoding=utf8 -*-
import traceback
import os,ssl,random
import string,zipfile
import PublicMethod as PM
from time import *
from _weakref import proxy
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from selenium.webdriver.common.action_chains import  ActionChains

opera_list=[Opera,opera,op]
firefox_list=[Firefox,firefox,ff]
chrome_list=[Chrome,chrome,Google,google,gg]

# 再操作一次元素,修饰器方法
def try_again(func):
    def wrapper(self,*args,**kwargs):
        try:
            result=func(self,*args,**kwargs)
        except:
            error_msg=traceback.format_exc()
            if self.retry_error(error_msg):
                result=func(self,*args,**kwargs)
            else:
                raise Exception(error_msg)
        PM.random_sleep(1,3)
        return result
    return wrapper

# Web.异常截图修饰器
def save_screen_shot(func):
    def wrapper(self,*args,**kwargs):
        try:
            func(self,*args,**kwargs)
        except:
            # 输出并保存错误信息
            traceback.print_exc()
            error_msg=traceback.format_exc()
            time=PM.get_time(%m%d_%H%M%S)
            log_path=self.task_path+\\log_%s.txt%time
            traceback.print_exc(file=open(log_path,w+))
            # 根据错误类型判断是否需要重新运行
            self.web.rerun_error(error_msg)
            PM.output_result(self.task_path+\\output.txt)
            print "PM.fail[‘reason‘]:\n",PM.fail[reason]
            # 获取当前页面的URL、HTML、截图
            self.web.get_error_page_info(log_path)
            print sleep 1000s
            sleep(1000)
            # 关闭浏览器,并清除相关进程
            try:
                self.driver.quit()
                print Close Window by modifier!
            except:
                print u浏览器已关闭(E)!
            close_browser_by_pid(self.python_pid,self.driver_pid,self.mozilla_pid)
            raise Exception(Error thrown by modifier !)
    return wrapper

# 获取任务相关进程PID
def get_all_pid(browser):

    if browser in chrome_list:
        driver=chromedriver
        mozilla=chrome
    elif browser in firefox_list:
        driver=geckodriver
        mozilla=firefox
    elif browser in opera_list:
        driver=operadriver
        mozilla=opera
    # 获取python_pid
    python_pid=os.getpid()
    # 获取driver_pid
    try:
        result=os.popen(wmic process where (ParentProcessId=%s) get Caption,ProcessId%python_pid)
        res=result.read()
        for line in res.splitlines():
            if driver in line:
                driver_pid=line.split(  )[1]
    except:
        driver_pid=None
    # 获取mozilla_pid
    try:
        result=os.popen(wmic process where (ParentProcessId=%s) get Caption,ProcessId%driver_pid)
        res=result.read()
        for line in res.splitlines():
            if mozilla in line:
                mozilla_pid=line.split(  )[1]
    except:
        mozilla_pid=None
    print python_pid,driver_pid,mozilla_pid
    return python_pid,driver_pid,mozilla_pid

# 关闭浏览器,及其相关进程
def close_browser_by_pid(python_pid,driver_pid,mozilla_pid):
    try:
        os.popen(taskkill /F /pid:%s /T%mozilla_pid)
        print kill mozilla by pid !
    except:
        print kill mozilla by pid failed !
    try:
        os.popen(taskkill /F /pid:%s /T%driver_pid)
        print kill driver by pid !
    except:
        print kill driver by pid failed !
    print python PID:%s%python_pid

# 配置浏览器,并启动
class StartBrowser():

    def __init__(self,url,param_dict,task_path=None):
        self.url=url
        self.headless=False
        self.browser=param_dict[browser]
        self.cookies=param_dict[cookies]
        self.task_path=param_dict[task_path]
        self.ip=param_dict[ip]
        self.port=param_dict[port]
        self.ip_username=param_dict[ip_username]
        self.ip_passwd=param_dict[ip_passwd]
        self.binary_location=param_dict[binary_location]

    # 生成Chrome代理IP插件
    def create_proxyauth_extension(self,scheme=http,plugin_path=None):

        manifest_json = """
        {
            "version": "1.0.0",
            "manifest_version": 2,
            "name": "Chrome Proxy",
            "permissions": [
                "proxy",
                "tabs",
                "unlimitedStorage",
                "storage",
                "<all_urls>",
                "webRequest",
                "webRequestBlocking"
            ],
            "background": {
                "scripts": ["background.js"]
            },
            "minimum_chrome_version":"22.0.0"
        }
        """

        background_js = string.Template(
        """
        var config = {
                mode: "fixed_servers",
                rules: {
                  singleProxy: {
                    scheme: "${scheme}",
                    host: "${host}",
                    port: parseInt(${port})
                  },
                  bypassList: ["foobar.com"]
                }
              };

        chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});

        function callbackFn(details) {
            return {
                authCredentials: {
                    username: "${username}",
                    password: "${password}"
                }
            };
        }

        chrome.webRequest.onAuthRequired.addListener(
                    callbackFn,
                    {urls: ["<all_urls>"]},
                    [‘blocking‘]
        );
        """
        ).substitute(host=self.ip,port=self.port,username=self.ip_username,password=self.ip_passwd,scheme=scheme)
        with zipfile.ZipFile(plugin_path,w) as zp:
            zp.writestr("manifest.json", manifest_json)
            zp.writestr("background.js", background_js)

        return plugin_path

    # Chrome代理登录
    def chrome_proxy_options(self):
        chrome_options=webdriver.ChromeOptions()
        chrome_options.add_argument(disable-infobars)# 不显示浏览器正被自动化软件控制的提示
        chrome_options.add_argument("--start-maximized")# 最大化窗口启动
        chrome_options.add_argument(--ignore-certificate-errors)
        # 设置为无头浏览器
        if self.headless:
            chrome_options.add_argument(--headless)
            chrome_options.add_argument(--disable-gpu)
        # 添加代理ip
        if self.ip is not None:
            chrome_options.add_argument(--proxy-server=http://%s:%s%(self.ip,self.port))
        # Opera浏览器需要添加binary_location
        if self.browser in opera_list:
            chrome_options.binary_location=self.binary_location
        # 添加用户名/密码认证IP
        if self.ip_username is not None:
            print u  创建一个代理IP插件!
            path=%s/vimm_chrome_proxyauth_plugin.zip%self.task_path
            proxyauth_plugin_path=self.create_proxyauth_extension(plugin_path=path)
            chrome_options.add_extension(proxyauth_plugin_path)
        return chrome_options

    # Firefox代理登录
    def firefox_proxy_options(self):
        profile=webdriver.FirefoxProfile()
        # 设置为无头浏览器
        if self.headless:
            profile.set_preference(-headless) #设置无头模式
        # 添加代理ip
        if self.ip is not None:
            profile.set_preference(network.proxy.type,1)
            profile.set_preference(network.proxy.http,self.ip)
            profile.set_preference(network.proxy.http_port,self.port)
            profile.set_preference(network.proxy.ssl,self.ip)
            profile.set_preference(network.proxy.ssl_port,self.port)
        profile.update_preferences()
        return profile

    # 检查IP是否正确
    def check_IP(self,driver):
        try_again=False
        try:
            driver.get(http://pv.sohu.com/cityjson?ie=utf-8)
            sleep(5)
            page_source=driver.page_source
        except:
            page_source=u打开检查IP地址的URL失败!
        print page_source:\n%s%page_source
        # if self.ip is not None:
        # 对比IP地址
        if self.ip in page_source:
            print u  代理IP设置正确!
        else:
            driver.quit()
            if self.port==3128:
                self.port=58378
            elif self.port==58378:
                self.port=3128
            try_again=True
        return try_again

    # 打开浏览器
    def _open_browser(self):
        # 创建driver
        for i in range(2):
            if self.browser in opera_list:
                options=self.chrome_proxy_options()
                driver=webdriver.Opera(options=options)
            elif self.browser in firefox_list:
                profile=self.firefox_proxy_options()
                driver=webdriver.Firefox(firefox_profile=profile)
            elif self.browser in chrome_list:
                chrome_options=self.chrome_proxy_options()
                driver=webdriver.Chrome(chrome_options=chrome_options)
            # 检查IP是否正确
            if self.ip is not None:
                r=self.check_IP(driver)
                if r:
                    if i==1:
                        PM.fail[reason]=u代理IP设置错误,退出浏览器!
                        raise Exception
                else:
                    # 如果返回无需重新设置代理,退出循环
                    break
            else:
                break
        # driver.execute_script("function(){Object.defineProperties(navigator,{webdriver:{get:() =&amp;gt; false}})}")
        # sleep(1000)
        # 打开URL
        # 判断是否需要添加cookies
        if self.cookies!=None:
            print u使用cookie认证登录
            driver.get(self.url)# 初次建立连接,随后方可修改cookie
            driver.delete_all_cookies()# 删除第一次建立连接时的cookie
            for cookie in self.cookies:
                if self.browser==Firefox:
                    del cookie[domain]
                driver.add_cookie(cookie)
        # 最大化浏览器,打开url,返回driver
        driver.maximize_window()
        sleep(3)
        # 使用cookie时再次访问页面,便可实现免登录访问
        driver.get(self.url)
        return driver

    # 打开浏览器。如果打开异常,返回异常信息
    def open_browser(self):
        # 启动浏览器,创建driver
        try:
            driver=self._open_browser()
        except:
            error_msg=traceback.format_exc()
            PM.fail[reason]=u浏览器启动失败!\n原因:\n%s%error_msg
            print PM.fail[reason]
            PM.output_result(self.task_path+\\output.txt)
            python_pid,driver_pid,mozilla_pid=get_all_pid(self.browser)
            driver.quit()
            close_browser_by_pid(python_pid,driver_pid,mozilla_pid)
            raise Exception(error_msg)
        return driver

# 定义操作元素的方法
class WebMethod():

    def __init__(self,driver,task_path=None):
        self.driver=driver
        self.task_path=task_path

    def c_time(self):
        return strftime(%Y-%m-%d %H:%M:%S,localtime())

    # **
    # 异常信息处理方法
    # *
    # 从页面获取信息,判断异常原因
    def get_page_error(self):
        # 网络异常错误信息收集
        try:
            reload_button=self.is_exists(//button[@id="reload-button"])
            print reload_button:%s%reload_button
        except:
            reload_button=False
        try:
            error_page=self.is_exists(//div[id="errorPageContainer"])
            print error_page:%s%error_page
        except:
            error_page=False
        try:
            network_error=self.is_exists(//div[@id="main-message"]/h1[@jsselect="heading"])
            print network_error:%s%network_error
        except:
            network_error=False
        try:
            tryAgain_button=web.is_exists(//button[@id="errorTryAgain"])
            print tryAgain_button:%s%tryAgain_button
        except:
            tryAgain_button=False
        # 其它异常信息收集
        try:
            is_relogin=self.is_exists(//span[@id="GREET-SIGN-IN-TO-EBAY"])
        except:
            is_relogin=False
        if reload_button or network_error or error_page:
            PM.fail[reason]=u网络连接错误,请重新运行!
        elif is_relogin:
            PM.fail[reason]=ueBay需要反复登录,请检查账号状态!
        # 如果错误原因不再是默认的,引用该方法时将错误原因写入output
        if PM.fail[reason]!=unknown reasons,please contact IT:
            return True
        else:
            return False

    # 可以再操作一次元素的异常情况
    def retry_error(self,error_msg):
        try_again=False
        # if ‘NoSuchElementException:‘ in error_msg:
        #     try_again=True
        if Command.GO_BACK in error_msg:
            try_again=True
        elif Command.CLICK_ELEMENT in error_msg:
            try_again=True
        elif Command.GET_CURRENT_URL in error_msg:
            try_again=True
        elif Command.GET_PAGE_SOURCE in error_msg:
            try_again=True
        elif ElementClickInterceptedException in error_msg:
            try_again=True
            # driver.set_elmt_not_visb()
            PM.random_sleep(5,10)
        if try_again:
            print u  **再次执行当前操作···
        return try_again

    # 可以重新运行程序的异常情况
    def rerun_error(self,error_msg):
        rerun=False
        ‘‘‘
        TimeoutException: Message: Timeout loading page after 300000ms
        WebDriverException: Message: Failed to decode response from marionette
        WebDriverException: Message: Reached error page: about:neterror?e=netTimeout
        WebDriverException: Message: Reached error page: about:neterror?e=nssFailure
        ‘‘‘
        if self.get_page_error():
            rerun=True
        elif self.retry_error(error_msg):
            rerun=True
        elif WebDriverException: in error_msg:
            rerun=True
        elif unknown session in error_msg:
            rerun=True
        elif unknown sessionId in error_msg:
            rerun=True
        elif invalid session id in error_msg:
            rerun=True
        elif Message: no such session in error_msg:
            rerun=True
        elif session deleted because of page crash in error_msg:
            rerun=True
        elif error: [Errno in error_msg:
            rerun=True
        elif MaxRetryError in error_msg:
            rerun=True
        elif in check_response in error_msg:
            if Message: timeout in error_msg:
                rerun=True
        if rerun:
            PM.fail[reason]=u未知原因失败,可以尝试重新运行一次。如再次相同原因失败,请联系开发人员!\n错误原因:\n%s%error_msg
        return rerun


    # **
    # locator处理方法
    # *
    # 判断定位元素使用的定位方式
    def return_loc_type(self,locator):
        if locator.startswith(/):
            return By.XPATH
        else:
            return By.CSS_SELECTOR

    # 判断元素是否存在
    def is_exists(self,locator,timeout=1):
        element_obj=0
        loc_list=PM.return_loc_list(locator)
        # print ‘locator:‘,locator
        for i in range(timeout):
            for loc in loc_list:
                loc_type=self.return_loc_type(loc)
                elmt_list=self.driver.find_elements(by=loc_type,value=loc)
                length=len(elmt_list)
                # print loc_type,length,loc
                if length>=1:
                    element_obj=1
                    break
            if length>=1:
                print(loc)
                break
            elif timeout>1:
                sleep(1)
        # 超时设置超过1s的都是需要操作元素的,所以需要返回元素对象
        if timeout>1:
            # 找到了元素,就返回元素对象。如果找不到元素,直接就报出了错误
            element_obj=self.driver.find_element(by=loc_type,value=loc)
        return element_obj

    # 判断元素是否可见
    def is_displayed(self,locator,timeout=1):
        result=0
        locator=PM.return_loc_list(locator)
        for i in range(timeout):
            for loc in locator:
                loc_type=self.return_loc_type(loc)
                elmt_list=self.driver.find_elements(by=loc_type,value=loc)
                length=len(elmt_list)
                if length>=1:
                    # 元素已存在当前页面,判断元素是否可见
                    visible=self.driver.find_element(by=loc_type,value=loc).is_displayed()
                    if visible:
                        result=1
                        print This element is visible:%s%loc
                    break
            if length>=1:
                break
            elif timeout>1:
                sleep(1)
        return result

    # **
    # locator操作方法
    # *
    # 等待元素出现
    def wait_element_OK(self,locator,timeout=20):
        print([%s] waiting element is visible...By locator:%self.c_time())
        self.is_exists(locator,timeout)
        PM.random_sleep(1,3)

    # 点击元素
    @try_again
    def click_element(self,locator,timeout=20):
        print([%s] clicking element...By locator:%self.c_time())
        element_obj=self.is_exists(locator,timeout)
        element_obj.click()

    # 输入文本
    @try_again
    def input_text(self,text,locator,timeout=20):
        print([%s] entering text...By locator:%self.c_time())
        element_obj=self.is_exists(locator,timeout)
        element_obj.clear()
        sleep(1)
        for str in text:
            element_obj.send_keys(str)
            PM.random_sleep(0.2,1) 

    # 获取元素指定属性值
    @try_again
    def get_attr(self,attr,locator,timeout=20):
        print([%s] getting element attribute values...By locator:%self.c_time())
        element_obj=self.is_exists(locator,timeout)
        return element_obj.get_attribute(attr)

    # 获取标签对中间的文本
    @try_again
    def get_text(self,locator,timeout=20):
        print([%s] getting element text...By locator:%self.c_time())
        element_obj=self.is_exists(locator,timeout)
        return element_obj.text

    # 鼠标悬停于某元素上
    @try_again
    def mouse_over(self,locator,timeout=20):
        print([%s] mouse is hovering...By locator:%self.c_time())
        element_obj=self.is_exists(locator,timeout)
        ActionChains(self.driver).move_to_element(element_obj).perform()

    # 选择选项,使用option的文本 
    @try_again
    def select_by_attr(self,locator,attr,attr_type=value,timeout=20):
        print([%s] selecting drop-down box elements by %s...By locator:%(self.c_time(),attr_type))
        element_obj=self.is_exists(locator,timeout)
        if attr_type==text:
            Select(element_obj).select_by_visible_text(attr)
        elif attr_type==index:
            Select(element_obj).select_by_index(attr)
        else:
            Select(element_obj).select_by_value(attr)

    # 返 回
    @try_again
    def _go_back(self):
        print([%s] returning to the previous page...By locator:%self.c_time())
        self.driver.back()

    # 返回至上一级页面
    def go_back(self,url=None,desc=u上一个页面):
        result=False
        url1=self.driver.current_url
        for i in range(2):
            self._go_back()
            for j in range(5):
                sleep(2)
                url2=self.driver.current_url
                if url1!=url2:
                    print u  点击go back,成功返回至%s%desc
                    result=True
                    break
            if result:
                break
            elif i==1:
                if url is None:
                    print u  返回%s时,页面跳转失败!%desc
                else:
                    self.driver.get(url)
                    print u  返回至%s,URL:%s%(desc,url)
        return result

    # 判断点击元素后页面是否发生跳转,如果没有再点击一次
    def click_goTo(self,locator,timeout=20):
        result=False
        print([%s] clicking to jump the page...By locator:%self.c_time())
        url1=self.driver.current_url
        for i in range(2):
            self.click_element(locator,timeout)
            for j in range(5):
                sleep(2)
                url2=self.driver.current_url
                if url1!=url2:
                    print u  页面跳转成功!Locator:%s%locator
                    result=True
                    break
            if result:
                break
            elif i==1:
                print u  页面跳转失败!Locator:%s%locator
        return result

    # **
    # Others
    # *
    # 从某个元素位置滑动到另一个元素位置
    def adjust_element(self,start_loc,end_loc,timeout=20):
        print([%s] adjusting element position...By locator:%self.c_time())
        if type(start_loc)==int:
            start_Y=start_loc# 起始位置Y坐标
        else:
            element_obj=self.is_exists(start_loc,timeout)
            start_position=element_obj.location
            start_Y=start_position[y]# 起始位置Y坐标
        if type(end_loc)==int:
            end_Y=end_loc# 结束位置Y坐标
        else:
            element_obj=self.is_exists(end_loc,timeout)
            end_position=element_obj.location
            end_Y=end_position[y]# 结束位置Y坐标
        # 向下滑
        if start_Y<end_Y:
            next_Y=start_Y+random.randint(330,590)# 首次滑动距离
            for i in range(20):
                PM.random_sleep(1,3)
                if next_Y>end_Y:
                    break
                else:
                    self.driver.execute_script(window.scrollTo(0,%d)%next_Y)
                    next_Y+=random.randint(330,590)# 每次增加的滑动距离
        # 向上滑
        elif start_Y>end_Y:
            next_Y=start_Y-random.randint(330,590)# 首次滑动距离
            for j in range(20):
                PM.random_sleep(1,3)
                if next_Y<end_Y:
                    break
                else:
                    self.driver.execute_script(window.scrollTo(0,%d)%next_Y)
                    next_Y-=random.randint(330,590)# 每次增加的滑动距离
        # 最终直接滑动到结束位置
        self.driver.execute_script(window.scrollTo(0,%d)%(end_Y-200))

    # 在页面中随意滚动鼠标滚轮,并滚动回页面顶部
    def random_scroll_page(self,min,max):
        print([%s] random sliding of pages...By locator:%self.c_time())
        max_count=random.randint(min,max)
        # 第一次滑动到的Y坐标位置
        y_position=random.randint(310,380)
        for i in range(max_count):
            self.driver.execute_script(window.scrollTo(0,%d)%y_position)
            y_position+=random.randint(330,390)
            PM.random_sleep(1,3)
        for j in range(max_count+1):
            self.driver.execute_script(window.scrollTo(0,%d)%y_position)
            y_position-=random.randint(430,590)
            PM.random_sleep(0.5,2)
        # 回到页面顶部
        self.driver.execute_script(window.scrollTo(0,0))
        PM.random_sleep(1,3)

    # 选择新打开的窗口
    def select_new_window(self):
        print([%s] selecting the newly opened browser window...By locator:%self.c_time())
        for i in range(10):
            sleep(2)
            all_handles=self.driver.window_handles
            if len(all_handles)>1:
                break
            elif i==9:
                print u新窗口打开失败,一共有%s个窗口!%len(all_handles)
                result=False
        current_handle=self.driver.current_window_handle
        for handle in all_handles:
            if handle!=current_handle:
                self.driver.switch_to_window(handle)
                result=True
        return result

    # 收集错误页面的信息,并写入log.txt
    def get_error_page_info(self,log_path):
        print([%s] Error occurred. Getting the current page information...By locator:%self.c_time())
        fp=open(log_path,a+)
        try:
            # 保存当前页面URL
            fp.writelines(self.driver.current_url)
            # 保存当前页面HTML
            with open("%s\\error_page_source.html"%self.task_path,"w") as f:
                p.writelines(self.driver.page_source.encode("utf-8"))
            # 截取当前页面图片
            time=PM.get_time(%m%d_%H%M%S)
            file_path=%s\\error_page%s.png%(self.task_path,time)
            self.driver.get_screenshot_as_file(path)
            print u\n页面截图保存路径:\n%%file_path
        except:
            fp.writelines(u无法执行webdriver提供的方法,请检查session是否丢失!)
        finally:
            fp.close()

 

未完成***

newWebMethod

标签:网络   gre   lines   net   play   elements   localtime   elm   edr   

原文地址:https://www.cnblogs.com/liuyun66535309/p/10337275.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!