码迷,mamicode.com
首页 > 数据库 > 详细

Python读取纯真IP数据库

时间:2014-12-13 06:26:06      阅读:472      评论:0      收藏:0      [点我收藏+]

标签:python

一、获取最新版IP地址数据库qqwry.dat

    纯真IP地址数据库下载地址:http://update.cz88.net/soft/setup.zip

    在windows机器上下载解压,点击setup.exe安装,在安装目录下的qqwry.dat即是最新版ip数据库。

    也可从51CTO下载(不是最新版,可用于测试):http://down.51cto.com/data/1888530

二、IPLocator.py

    网上找到别人用Python写的纯真IP数据库的查询程序,原文地址:http://blog.chinaunix.net/uid-20758462-id-1876988.html,本文对代码做了一些修改,解决中文乱码问题。

    建立IPLocator.py文件(见本文附件),内容如下:

#! /usr/bin/env python
# -*- coding: utf-8 -*-
""" IPLocator: locate IP in the QQWry.dat.
    Usage:
        python IPLocator.py <ip>
"""

import socket,string,struct,sys

class IPLocator :
    def __init__( self, ipdbFile ):
        self.ipdb = open( ipdbFile, "rb" )
        str = self.ipdb.read( 8 )
        (self.firstIndex,self.lastIndex) = struct.unpack(‘II‘,str)
        self.indexCount = (self.lastIndex - self.firstIndex)/7+1
        print self.getVersion()," 纪录总数: %d 条 "%(self.indexCount)

    def getVersion(self):
        s = self.getIpAddr(0xffffff00L)
        return s

    def getAreaAddr(self,offset=0):
        if offset :
            self.ipdb.seek( offset )
        str = self.ipdb.read( 1 )
        (byte,) = struct.unpack(‘B‘,str)
        if byte == 0x01 or byte == 0x02:
            p = self.getLong3()
            if p:
                return self.getString( p )
            else:
                return ""
        else:
            self.ipdb.seek(-1,1)
            return self.getString( offset )

    def getAddr(self,offset,ip=0):
        self.ipdb.seek( offset + 4)
        countryAddr = ""
        areaAddr = ""
        str = self.ipdb.read( 1 )
        (byte,) = struct.unpack(‘B‘,str)
        if byte == 0x01:
            countryOffset = self.getLong3()
            self.ipdb.seek( countryOffset )
            str = self.ipdb.read( 1 )
            (b,) = struct.unpack(‘B‘,str)
            if b == 0x02:
                countryAddr = self.getString( self.getLong3() )
                self.ipdb.seek( countryOffset + 4 )
            else:
                countryAddr = self.getString( countryOffset )
            areaAddr = self.getAreaAddr()
        elif byte == 0x02:
            countryAddr = self.getString( self.getLong3() )
            areaAddr = self.getAreaAddr( offset + 8 )
        else:
            countryAddr = self.getString( offset + 4 )
            areaAddr = self.getAreaAddr()
        return countryAddr + " " + areaAddr

    def dump(self, first ,last ):
        if last > self.indexCount :
            last = self.indexCount
        for index in range(first,last):
            offset = self.firstIndex + index * 7
            self.ipdb.seek( offset )
            buf = self.ipdb.read( 7 )
            (ip,of1,of2) = struct.unpack("IHB",buf)
            address = self.getAddr( of1 + (of2 << 16) )
            #把GBK转为utf-8
            address = unicode(address,‘gbk‘).encode("utf-8")
            print "%d\t%s\t%s" %(index, self.ip2str(ip),                 address )

    def setIpRange(self,index):
        offset = self.firstIndex + index * 7
        self.ipdb.seek( offset )
        buf = self.ipdb.read( 7 )
        (self.curStartIp,of1,of2) = struct.unpack("IHB",buf)
        self.curEndIpOffset = of1 + (of2 << 16)
        self.ipdb.seek( self.curEndIpOffset )
        buf = self.ipdb.read( 4 )
        (self.curEndIp,) = struct.unpack("I",buf)

    def getIpAddr(self,ip):
        L = 0
        R = self.indexCount - 1
        while L < R-1:
            M = (L + R) / 2
            self.setIpRange(M)
            if ip == self.curStartIp:
                L = M
                break
            if ip > self.curStartIp:
                L = M
            else:
                R = M
        self.setIpRange( L )
        #version information,255.255.255.X,urgy but useful
        if ip&0xffffff00L == 0xffffff00L:
            self.setIpRange( R )
        if self.curStartIp <= ip <= self.curEndIp:
            address = self.getAddr( self.curEndIpOffset )
            #把GBK转为utf-8
            address = unicode(address,‘gbk‘).encode("utf-8")
        else:
            address = "未找到该IP的地址"
        return address

    def getIpRange(self,ip):
        self.getIpAddr(ip)
        range = self.ip2str(self.curStartIp) + ‘ - ‘             + self.ip2str(self.curEndIp)
        return range

    def getString(self,offset = 0):
        if offset :
            self.ipdb.seek( offset )
        str = ""
        ch = self.ipdb.read( 1 )
        (byte,) = struct.unpack(‘B‘,ch)
        while byte != 0:
            str = str + ch
            ch = self.ipdb.read( 1 )
            (byte,) = struct.unpack(‘B‘,ch)
        return str

    def ip2str(self,ip):
        return str(ip>>24)+‘.‘+str((ip>>16)&0xffL)+‘.‘             +str((ip>>8)&0xffL)+‘.‘+str(ip&0xffL)

    def str2ip(self,s):
        (ip,) = struct.unpack(‘I‘,socket.inet_aton(s))
        return ((ip>>24)&0xffL)|((ip&0xffL)<<24)             |((ip>>8)&0xff00L)|((ip&0xff00L)<<8)

    def getLong3(self,offset = 0):
        if offset :
            self.ipdb.seek( offset )
        str = self.ipdb.read(3)
        (a,b) = struct.unpack(‘HB‘,str)
        return (b << 16) + a

#Demo
def main():
    IPL = IPLocator( "qqwry.dat" )
    ip = ""
    if len(sys.argv) != 2:
        print ‘Usage: python IPLocator.py <IP>‘
        return
    else:
        ip = sys.argv[1]
    address = IPL.getIpAddr( IPL.str2ip(ip) )
    range = IPL.getIpRange( IPL.str2ip(ip) )
    print "此IP %s 属于 %s\n所在网段: %s" % (ip,address, range)


if __name__ == "__main__" :
    main()

    把qqwry.dat放到和IPLocator.py同一目录,使用方法如下(请参考IPLocator.py中的Demo):

bubuko.com,布布扣


本文出自 “启程的Linux博客” 博客,请务必保留此出处http://qicheng0211.blog.51cto.com/3958621/1589442

Python读取纯真IP数据库

标签:python

原文地址:http://qicheng0211.blog.51cto.com/3958621/1589442

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!