码迷,mamicode.com
首页 > 数据库 > 详细

OpenStreetMap数据清洗(SQL版本)

时间:2017-12-04 16:44:37      阅读:628      评论:0      收藏:0      [点我收藏+]

标签:class   sage   ror   mat   ignore   erp   order   free   form   

目标:通过网上下载的OpenStreetMap.xml数据格式,将该文件的格式进行统计,清洗,并导出成CSV格式的文件,最后倒入到SQLite中

本案例中所需的包

import csv
import codecs
import pprint
import re
import xml.etree.cElementTree as ET
from collections import defaultdict
import cerberus
import schema

1.统计文件中每一个标签出现的次数

思路:将xml文件使用sax解析,将每一个节点的的标签值设为字典的key,次数为value,初始化为0,

   循环文件,如果可以找到key,那么value的值+1,否则不变

def count_tags(filename):
#1.读文件 osm
= ET.ElementTree(file=filename)
#2.获取根节点 root
= osm.getroot()
#3.获取根节点的标签,创建一个字典来存放标签名和次数 tags_count_dic
= {root.tag:0}
#4.循环文件
for _,ele in ET.iterparse(filename,events=(start,)):
#5.如果有元素的tag在字典中,则value的值+1,否则表示该标签只出现一次
if ele.tag in tags_count_dic: tags_count_dic[ele.tag] += 1 else: tags_count_dic[ele.tag] = 1 return tags_count_dic
def test():
    #测试函数的断言不出错,表示结果正确
    tags = count_tags(example.osm)
    pprint.pprint(tags)
    assert tags == {bounds: 1,
                     member: 3,
                     nd: 4,
                     node: 20,
                     osm: 1,
                     relation: 1,
                     tag: 7,
                     way: 1}

if __name__ == "__main__":
    test()

2.根据正则表达式,确定各种标签类型的数量

思路:获取根据传入的element,来获取tag,获取到tag即可获取到k的值,在根据正则表达式进行匹配,将匹配成功的值放入到不同的字典中

lower = re.compile(r^([a-z]|_)*$) #仅包含小写字母且有效的标记
lower_colon = re.compile(r^([a-z]|_)*:([a-z]|_)*$) #名称中有冒号的其他有效标记
problemchars = re.compile(r[=\+/&<>;\‘"\?%#$@\,\. \t\r\n]) #字符存在问题的标记

def key_type(element,keys):
#1.找到需要处理的标签进行处理
if element.tag == tag:
#2.获取带匹配的字符串 key
= element.attrib[k]
#逐次匹配,并将匹配成功的结果放到keys中,并返回
if lower.search(key): keys[lower] += 1 elif lower_colon.search(key): keys[lower_colon] += 1 elif problemchars.search(key): keys[problemchars] += 1 else: keys[other] += 1 return keys def process_map(filename): keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0} for _, element in ET.iterparse(filename): keys = key_type(element, keys) return keys
def test():
    #测试函数的断言不报错,代码正确
    keys = process_map(example.osm)
    pprint.pprint(keys)
    assert keys == {lower: 5, lower_colon: 0, other: 1, problemchars: 1}

if __name__ == "__main__":
    test()

3.搜索用户,返回一组唯一的用户ID

思路:找到uid所对应的tag,循环xml文件,如果标签存在uid,就加入到set中返回

def get_user(element):
#如果标签中包含‘uid‘这一属性,则返回该属性的值
if uid in element.attrib: return element.attrib[uid] def process_map(filename): users = set()
#循环xml文件,如果每行的元素中有‘uid‘这一标签,则其值取出加入到set中,返回
for _, element in ET.iterparse(filename): if get_user(element): users.add(get_user(element)) return users
def test():
    #断言不出错,程序正确
    users = process_map(example.osm)
    pprint.pprint(users)
    assert len(users) == 6

if __name__ == "__main__":
    test()

4.完善街道名,将街道中的一些不合法的值去除

思路:循环街道的字典,名称在mapping,则进行替换,返回替换后的字符串

OSMFILE = "example1.osm"
street_type_re = re.compile(r\b\S+\.?$, re.IGNORECASE)


expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", 
            "Trail", "Parkway", "Commons"]

# UPDATE THIS VARIABLE
#题目这里的字符串需要更改,否则结果错误
mapping = { "Rd.": "Road", "St.": "Street", "Ave": "Avenue" } def audit_street_type(street_types,street_name): m = street_type_re.search(street_name) if m: street_type = m.group() if street_type not in expected: street_types[street_type].add(street_name) def is_street_name(elem): return (elem.attrib[k] == addr:street) def audit(osmfile): osm_file = open(osmfile,r) street_types = defaultdict(set) for event,ele in ET.iterparse(osmfile,events=(start,)): if ele.tag == tag or ele.tag == way: for tag in ele.iter(tag): if is_street_name(tag): audit_street_type(street_types,tag.attrib[v]) osm_file.close() return street_types def update_name(name, mapping): #获取需要修改的key changewords = mapping.keys()
#如果名称相同,则替换字符,并返回
for word in changewords: if word in name: name = name.replace(word,mapping.get(word)) return name

def test():
#断言不出错,则结果正确 st_types
= audit(OSMFILE) assert len(st_types) == 3 pprint.pprint(dict(st_types)) for st_type, ways in st_types.iteritems(): for name in ways: better_name = update_name(name, mapping) print name, "=>", better_name if name == "West Lexington St.": assert better_name == "West Lexington Street" if name == "Baldwin Rd.": assert better_name == "Baldwin Road" if __name__ == "__main__": test()

5.数据清洗

  目标数据的结构

  node节点需要[id,user,uid,version,lat,lon,timestamp,changeset]

  node节点下的tags子节点需要[id,key,value,type]

{node: {id: 757860928,
          user: uboot,
          uid: 26299,
       version: 2,
          lat: 41.9747374,
          lon: -87.6920102,
          timestamp: 2010-07-22T16:16:51Z,
      changeset: 5288876},
 node_tags: [{id: 757860928,
                key: amenity,
                value: fast_food,
                type: regular},
               {id: 757860928,
                key: cuisine,
                value: sausage,
                type: regular},
               {id: 757860928,
                key: name,
                value: "Shelly‘s Tasty Freeze",
                type: regular}]}

way节点需要[id,user,uid,version,timestamp,changeset]

way节点下的nodes子节点需要[id,node_id,position]

way节点下的tag子节点需要[id,key,value,type]

{way: {id: 209809850,
         user: chicago-buildings,
         uid: 674454,
         version: 1,
         timestamp: 2013-03-13T15:58:04Z,
         changeset: 15353317},
 way_nodes: [{id: 209809850, node_id: 2199822281, position: 0},
               {id: 209809850, node_id: 2199822390, position: 1},
               {id: 209809850, node_id: 2199822392, position: 2},
               {id: 209809850, node_id: 2199822369, position: 3},
               {id: 209809850, node_id: 2199822370, position: 4},
               {id: 209809850, node_id: 2199822284, position: 5},
               {id: 209809850, node_id: 2199822281, position: 6}],
 way_tags: [{id: 209809850,
               key: housenumber,
               type: addr,
               value: 1412},
              {id: 209809850,
               key: street,
               type: addr,
               value: West Lexington St.},
              {id: 209809850,
               key: street:name,
               type: addr,
               value: Lexington},
              {id: 209809850,
               key: street:prefix,
               type: addr,
               value: West},
              {id: 209809850,
               key: street:type,
               type: addr,
               value: Street},
              {id: 209809850,
               key: building,
               type: regular,
               value: yes},
              {id: 209809850,
               key: levels,
               type: building,
               value: 1},
              {id: 209809850,
               key: building_id,
               type: chicago,
               value: 366409}]}

思路: 1.使用iterparse便利xml中每一个顶层标签

         2.使用自定义函数将每个元素变成多个数据结构

         3.利用架构和验证库保证数据格式的正确

         4.将每个数据结构写入相应的csv文件

OSM_PATH = "example1.osm"

NODES_PATH = "nodes.csv"  #node标签生成的文件名
NODE_TAGS_PATH = "nodes_tags.csv"  #node下的tag标签生成的文件名
WAYS_PATH = "ways.csv"  #way标签生成的文件名
WAY_NODES_PATH = "ways_nodes.csv" #way标签下的node生成的文件名
WAY_TAGS_PATH = "ways_tags.csv" #way标签下的tag生成的文件名

LOWER_COLON = re.compile(r^([a-z]|_)+:([a-z]|_)+) #字符串中有冒号和小写字母的标记
PROBLEMCHARS = re.compile(r[=\+/&<>;\‘"\?%#$@\,\. \t\r\n])#字符存在问题的标记
SCHEMA = schema.schema #模板文件

# Make sure the fields order in the csvs matches the column order in the sql table schema
#每一个生成的文件的表头
NODE_FIELDS = [id, lat, lon, user, uid, version, changeset, timestamp] NODE_TAGS_FIELDS = [id, key, value, type] WAY_FIELDS = [id, user, uid, version, changeset, timestamp] WAY_TAGS_FIELDS = [id, key, value, type] WAY_NODES_FIELDS = [id, node_id, position] def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS, problem_chars=PROBLEMCHARS, default_tag_type=regular): """Clean and shape node or way XML element to Python dict""" node_attribs = {} #存放生成node的key和value,key作表头,value作内容 way_attribs = {} #存放生成way的key和value,key作表头,value作内容 way_nodes = [] #存放生成way标签下的nd子标签的值,[{...},{...}] tags = [] #存放node和way下的tag子标签的值 ,[{...},{...},]# Handle secondary tags the same way for both node and way elements # YOUR CODE HERE
#先提取node字段
if element.tag == node:
#1.循环node_field表头,如果element中有key所对应的属性,则放入到node_attribs字典中
for key in NODE_FIELDS: node_attribs[key] = element.attrib[key]
#2.循环子节点,获取tags元素的值
for child in element: Node_Tags = {}
#匹配字母和冒号 colon
= re.match(LOWER_COLON,child.attrib[k])
#匹配异常字符 problem
= re.match(PROBLEMCHARS,child.attrib[k])
#异常字符直接跳过,进行下一次查找
if problem: continue
#如果tag是包含字母和冒号<tag k="addr:housenumber" v="1412"/>需要解析成{‘id‘: 12345, ‘key‘: ‘housenumber‘, ‘value‘: ‘1412‘, ‘type‘: ‘addr‘}
elif colon:
#从父节点获取id属性的值 Node_Tags[
id] = element.attrib[id]
#获取k="addr:housenumber"的值,以:拆分,第一个值为type的值 type_value
= child.attrib[k].split(:,1)[0] Node_Tags[type] = type_value
#获取k="addr:housenumber"的值,以:拆分,第二个值为key的值 Node_Tags[
key] = child.attrib[k].split(:,1)[1]
#获取v=1412的值,为value的值 Node_Tags[
value] = child.attrib[v]
#将处理后的数据加入到字典中 tags.append(Node_Tags)
#tag不包含冒号<tag k="building" v="yes"/>
else:
#从父节点获取id属性的值 Node_Tags[
id] = element.attrib[id]
#type的值是 regular Node_Tags[
type] = regular
#获取k=building的值,为key的值
Node_Tags[key] = child.attrib[k]
#获取v=yes的值,为value的值 Node_Tags[
value] = child.attrib[v]
#将处理后的数据加入到字典中 tags.append(Node_Tags)
#返回node处理之后的结果
return {node: node_attribs, node_tags: tags}
#在提取way字段
elif element.tag == way:
1.循环way_field表头,如果element中有key所对应的属性,则放入到way_attribs字典中
for key in WAY_FIELDS: way_attribs[key] = element.attrib[key] counter = 0 #计数,用于填充way下面nd子标签的position的值
#循环父节点下的子节点
for child in element: Way_Nodes = {} #存放nd子标签 Way_Tags = {} #存放tag子标签
#处理nd子标签
if child.tag == nd:
#从父节点获取id属性的值 Way_Nodes[
id] = element.attrib[id]
#从自身的ref,来获取该属性的值 Way_Nodes[
node_id] = child.attrib[ref]
#获取position的值,每循环一次nd,counter + 1 Way_Nodes[
position] = counter counter += 1
将处理后的nd子节点数据加入到字典中 way_nodes.append(Way_Nodes)
#处理tag子标签
elif child.tag == tag:
#同处理node下的tag子节点 colon
= re.match(LOWER_COLON,child.attrib[k]) problem = re.match(PROBLEMCHARS,child.attrib[k]) if problem: continue elif colon: Way_Tags[id] = element.attrib[id] type_value = child.attrib[k].split(:,1)[0] Way_Tags[key] = child.attrib[k].split(:,1)[1] Way_Tags[type] = type_value Way_Tags[value] = child.attrib[v] tags.append(Way_Tags) else: Way_Tags[id] = element.attrib[id] Way_Tags[key] = child.attrib[k] Way_Tags[type] = regular Way_Tags[value] = child.attrib[v] tags.append(Way_Tags) return {way: way_attribs, way_nodes: way_nodes, way_tags: tags} def get_element(osm_file, tags=(node, way, relation)): """Yield element if it is the right type of tag""" """如果是正确的类型时,返回标签中的tag""" context = ET.iterparse(osm_file, events=(start, end)) _, root = next(context) for event, elem in context: if event == end and elem.tag in tags: yield elem root.clear() def validate_element(element, validator, schema=SCHEMA): """Raise ValidationError if element does not match schema"""
"""当和schema的数据格式不匹配时,抛出异常"""
if validator.validate(element, schema) is not True: field, errors = next(validator.errors.iteritems()) message_string = "\nElement of type ‘{0}‘ has the following errors:\n{1}" error_string = pprint.pformat(errors) raise Exception(message_string.format(field, error_string)) class UnicodeDictWriter(csv.DictWriter, object): """Extend csv.DictWriter to handle Unicode input""" """扩展csv下的DictWriter方法的去支持Unicode输入""" def writerow(self, row): super(UnicodeDictWriter, self).writerow({ k: (v.encode(utf-8) if isinstance(v, unicode) else v) for k, v in row.iteritems() }) def writerows(self, rows): for row in rows: self.writerow(row) def process_map(file_in, validate): """Iteratively process each XML element and write to csv(s)""" """将处理好的xml文件写入到csv中""" with codecs.open(NODES_PATH, w) as nodes_file, codecs.open(NODE_TAGS_PATH, w) as nodes_tags_file, codecs.open(WAYS_PATH, w) as ways_file, codecs.open(WAY_NODES_PATH, w) as way_nodes_file, codecs.open(WAY_TAGS_PATH, w) as way_tags_file: nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS) node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS) ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS) way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS) way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS) nodes_writer.writeheader() node_tags_writer.writeheader() ways_writer.writeheader() way_nodes_writer.writeheader() way_tags_writer.writeheader() validator = cerberus.Validator() for element in get_element(file_in, tags=(node, way)): el = shape_element(element) if el: if validate is True: validate_element(el, validator) if element.tag == node: nodes_writer.writerow(el[node]) node_tags_writer.writerows(el[node_tags]) elif element.tag == way: ways_writer.writerow(el[way]) way_nodes_writer.writerows(el[way_nodes]) way_tags_writer.writerows(el[way_tags])

第二种方法

高阶解法

def shape_tag(el, tag): 
#tag标签返回的格式 tag
= { id : el.attrib[id], key : tag.attrib[k], value: tag.attrib[v], type : regular } if LOWER_COLON.match(tag[key]):
#如果tag的key中出现冒号<tag k="addr:housenumber" v="1412"/>,则根据:进行拆分,获取type和key tag[
type], _, tag[key] = tag[key].split(:) return tag def shape_way_node(el, i, nd):
#way下的nd标签返回的格式
return { id : el.attrib[id], node_id : nd.attrib[ref], position : i } def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS, problem_chars=PROBLEMCHARS, default_tag_type=regular): """Clean and shape node or way XML element to Python dict""" node_attribs = {} way_attribs = {} way_nodes = []
#直接获取所有的tag子标签 tags
= [shape_tag(element, t) for t in element.iter(tag)] # Handle secondary tags the same way for both node and way elements # YOUR CODE HERE if element.tag == node: node_attribs = {f: element.attrib[f] for f in node_attr_fields} return {node: node_attribs, node_tags: tags} elif element.tag == way: way_attribs = {f: element.attrib[f] for f in way_attr_fields} #获取way标签下nd标签的各个值 way_nodes = [shape_way_node(element, i, nd) for i, nd in enumerate(element.iter(nd))] return {way: way_attribs, way_nodes: way_nodes, way_tags: tags}

 

OpenStreetMap数据清洗(SQL版本)

标签:class   sage   ror   mat   ignore   erp   order   free   form   

原文地址:http://www.cnblogs.com/luhuajun/p/7977561.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!