标签:class sage ror mat ignore erp order free form
目标:通过网上下载的OpenStreetMap.xml数据格式,将该文件的格式进行统计,清洗,并导出成CSV格式的文件,最后倒入到SQLite中
本案例中所需的包
import csv import codecs import pprint import re import xml.etree.cElementTree as ET from collections import defaultdict import cerberus import schema
1.统计文件中每一个标签出现的次数
思路:将xml文件使用sax解析,将每一个节点的的标签值设为字典的key,次数为value,初始化为0,
循环文件,如果可以找到key,那么value的值+1,否则不变
def count_tags(filename):
#1.读文件 osm = ET.ElementTree(file=filename)
#2.获取根节点 root = osm.getroot()
#3.获取根节点的标签,创建一个字典来存放标签名和次数 tags_count_dic = {root.tag:0}
#4.循环文件 for _,ele in ET.iterparse(filename,events=(‘start‘,)):
#5.如果有元素的tag在字典中,则value的值+1,否则表示该标签只出现一次 if ele.tag in tags_count_dic: tags_count_dic[ele.tag] += 1 else: tags_count_dic[ele.tag] = 1 return tags_count_dic
def test(): #测试函数的断言不出错,表示结果正确 tags = count_tags(‘example.osm‘) pprint.pprint(tags) assert tags == {‘bounds‘: 1, ‘member‘: 3, ‘nd‘: 4, ‘node‘: 20, ‘osm‘: 1, ‘relation‘: 1, ‘tag‘: 7, ‘way‘: 1} if __name__ == "__main__": test()
2.根据正则表达式,确定各种标签类型的数量
思路:获取根据传入的element,来获取tag,获取到tag即可获取到k的值,在根据正则表达式进行匹配,将匹配成功的值放入到不同的字典中
lower = re.compile(r‘^([a-z]|_)*$‘) #仅包含小写字母且有效的标记 lower_colon = re.compile(r‘^([a-z]|_)*:([a-z]|_)*$‘) #名称中有冒号的其他有效标记 problemchars = re.compile(r‘[=\+/&<>;\‘"\?%#$@\,\. \t\r\n]‘) #字符存在问题的标记 def key_type(element,keys):
#1.找到需要处理的标签进行处理 if element.tag == ‘tag‘:
#2.获取带匹配的字符串 key = element.attrib[‘k‘]
#逐次匹配,并将匹配成功的结果放到keys中,并返回 if lower.search(key): keys[‘lower‘] += 1 elif lower_colon.search(key): keys[‘lower_colon‘] += 1 elif problemchars.search(key): keys[‘problemchars‘] += 1 else: keys[‘other‘] += 1 return keys def process_map(filename): keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0} for _, element in ET.iterparse(filename): keys = key_type(element, keys) return keys
def test(): #测试函数的断言不报错,代码正确 keys = process_map(‘example.osm‘) pprint.pprint(keys) assert keys == {‘lower‘: 5, ‘lower_colon‘: 0, ‘other‘: 1, ‘problemchars‘: 1} if __name__ == "__main__": test()
3.搜索用户,返回一组唯一的用户ID
思路:找到uid所对应的tag,循环xml文件,如果标签存在uid,就加入到set中返回
def get_user(element):
#如果标签中包含‘uid‘这一属性,则返回该属性的值 if ‘uid‘ in element.attrib: return element.attrib[‘uid‘] def process_map(filename): users = set()
#循环xml文件,如果每行的元素中有‘uid‘这一标签,则其值取出加入到set中,返回 for _, element in ET.iterparse(filename): if get_user(element): users.add(get_user(element)) return users
def test(): #断言不出错,程序正确 users = process_map(‘example.osm‘) pprint.pprint(users) assert len(users) == 6 if __name__ == "__main__": test()
4.完善街道名,将街道中的一些不合法的值去除
思路:循环街道的字典,名称在mapping,则进行替换,返回替换后的字符串
OSMFILE = "example1.osm" street_type_re = re.compile(r‘\b\S+\.?$‘, re.IGNORECASE) expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", "Trail", "Parkway", "Commons"] # UPDATE THIS VARIABLE
#题目这里的字符串需要更改,否则结果错误 mapping = { "Rd.": "Road", "St.": "Street", "Ave": "Avenue" } def audit_street_type(street_types,street_name): m = street_type_re.search(street_name) if m: street_type = m.group() if street_type not in expected: street_types[street_type].add(street_name) def is_street_name(elem): return (elem.attrib[‘k‘] == ‘addr:street‘) def audit(osmfile): osm_file = open(osmfile,‘r‘) street_types = defaultdict(set) for event,ele in ET.iterparse(osmfile,events=(‘start‘,)): if ele.tag == ‘tag‘ or ele.tag == ‘way‘: for tag in ele.iter(‘tag‘): if is_street_name(tag): audit_street_type(street_types,tag.attrib[‘v‘]) osm_file.close() return street_types def update_name(name, mapping): #获取需要修改的key changewords = mapping.keys()
#如果名称相同,则替换字符,并返回 for word in changewords: if word in name: name = name.replace(word,mapping.get(word)) return name
def test():
#断言不出错,则结果正确 st_types = audit(OSMFILE) assert len(st_types) == 3 pprint.pprint(dict(st_types)) for st_type, ways in st_types.iteritems(): for name in ways: better_name = update_name(name, mapping) print name, "=>", better_name if name == "West Lexington St.": assert better_name == "West Lexington Street" if name == "Baldwin Rd.": assert better_name == "Baldwin Road" if __name__ == "__main__": test()
5.数据清洗
目标数据的结构
node节点需要[id,user,uid,version,lat,lon,timestamp,changeset]
node节点下的tags子节点需要[id,key,value,type]
{‘node‘: {‘id‘: 757860928, ‘user‘: ‘uboot‘, ‘uid‘: 26299, ‘version‘: ‘2‘, ‘lat‘: 41.9747374, ‘lon‘: -87.6920102, ‘timestamp‘: ‘2010-07-22T16:16:51Z‘, ‘changeset‘: 5288876}, ‘node_tags‘: [{‘id‘: 757860928, ‘key‘: ‘amenity‘, ‘value‘: ‘fast_food‘, ‘type‘: ‘regular‘}, {‘id‘: 757860928, ‘key‘: ‘cuisine‘, ‘value‘: ‘sausage‘, ‘type‘: ‘regular‘}, {‘id‘: 757860928, ‘key‘: ‘name‘, ‘value‘: "Shelly‘s Tasty Freeze", ‘type‘: ‘regular‘}]}
way节点需要[id,user,uid,version,timestamp,changeset]
way节点下的nodes子节点需要[id,node_id,position]
way节点下的tag子节点需要[id,key,value,type]
{‘way‘: {‘id‘: 209809850, ‘user‘: ‘chicago-buildings‘, ‘uid‘: 674454, ‘version‘: ‘1‘, ‘timestamp‘: ‘2013-03-13T15:58:04Z‘, ‘changeset‘: 15353317}, ‘way_nodes‘: [{‘id‘: 209809850, ‘node_id‘: 2199822281, ‘position‘: 0}, {‘id‘: 209809850, ‘node_id‘: 2199822390, ‘position‘: 1}, {‘id‘: 209809850, ‘node_id‘: 2199822392, ‘position‘: 2}, {‘id‘: 209809850, ‘node_id‘: 2199822369, ‘position‘: 3}, {‘id‘: 209809850, ‘node_id‘: 2199822370, ‘position‘: 4}, {‘id‘: 209809850, ‘node_id‘: 2199822284, ‘position‘: 5}, {‘id‘: 209809850, ‘node_id‘: 2199822281, ‘position‘: 6}], ‘way_tags‘: [{‘id‘: 209809850, ‘key‘: ‘housenumber‘, ‘type‘: ‘addr‘, ‘value‘: ‘1412‘}, {‘id‘: 209809850, ‘key‘: ‘street‘, ‘type‘: ‘addr‘, ‘value‘: ‘West Lexington St.‘}, {‘id‘: 209809850, ‘key‘: ‘street:name‘, ‘type‘: ‘addr‘, ‘value‘: ‘Lexington‘}, {‘id‘: ‘209809850‘, ‘key‘: ‘street:prefix‘, ‘type‘: ‘addr‘, ‘value‘: ‘West‘}, {‘id‘: 209809850, ‘key‘: ‘street:type‘, ‘type‘: ‘addr‘, ‘value‘: ‘Street‘}, {‘id‘: 209809850, ‘key‘: ‘building‘, ‘type‘: ‘regular‘, ‘value‘: ‘yes‘}, {‘id‘: 209809850, ‘key‘: ‘levels‘, ‘type‘: ‘building‘, ‘value‘: ‘1‘}, {‘id‘: 209809850, ‘key‘: ‘building_id‘, ‘type‘: ‘chicago‘, ‘value‘: ‘366409‘}]}
思路: 1.使用iterparse便利xml中每一个顶层标签
2.使用自定义函数将每个元素变成多个数据结构
3.利用架构和验证库保证数据格式的正确
4.将每个数据结构写入相应的csv文件
OSM_PATH = "example1.osm"
NODES_PATH = "nodes.csv" #node标签生成的文件名
NODE_TAGS_PATH = "nodes_tags.csv" #node下的tag标签生成的文件名
WAYS_PATH = "ways.csv" #way标签生成的文件名
WAY_NODES_PATH = "ways_nodes.csv" #way标签下的node生成的文件名
WAY_TAGS_PATH = "ways_tags.csv" #way标签下的tag生成的文件名
LOWER_COLON = re.compile(r‘^([a-z]|_)+:([a-z]|_)+‘) #字符串中有冒号和小写字母的标记
PROBLEMCHARS = re.compile(r‘[=\+/&<>;\‘"\?%#$@\,\. \t\r\n]‘)#字符存在问题的标记
SCHEMA = schema.schema #模板文件
# Make sure the fields order in the csvs matches the column order in the sql table schema
#每一个生成的文件的表头
NODE_FIELDS = [‘id‘, ‘lat‘, ‘lon‘, ‘user‘, ‘uid‘, ‘version‘, ‘changeset‘, ‘timestamp‘]
NODE_TAGS_FIELDS = [‘id‘, ‘key‘, ‘value‘, ‘type‘]
WAY_FIELDS = [‘id‘, ‘user‘, ‘uid‘, ‘version‘, ‘changeset‘, ‘timestamp‘]
WAY_TAGS_FIELDS = [‘id‘, ‘key‘, ‘value‘, ‘type‘]
WAY_NODES_FIELDS = [‘id‘, ‘node_id‘, ‘position‘]
def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
problem_chars=PROBLEMCHARS, default_tag_type=‘regular‘):
"""Clean and shape node or way XML element to Python dict"""
node_attribs = {} #存放生成node的key和value,key作表头,value作内容
way_attribs = {} #存放生成way的key和value,key作表头,value作内容
way_nodes = [] #存放生成way标签下的nd子标签的值,[{...},{...}]
tags = [] #存放node和way下的tag子标签的值 ,[{...},{...},]# Handle secondary tags the same way for both node and way elements
# YOUR CODE HERE
#先提取node字段
if element.tag == ‘node‘:
#1.循环node_field表头,如果element中有key所对应的属性,则放入到node_attribs字典中
for key in NODE_FIELDS:
node_attribs[key] = element.attrib[key]
#2.循环子节点,获取tags元素的值
for child in element:
Node_Tags = {}
#匹配字母和冒号
colon = re.match(LOWER_COLON,child.attrib[‘k‘])
#匹配异常字符
problem = re.match(PROBLEMCHARS,child.attrib[‘k‘])
#异常字符直接跳过,进行下一次查找
if problem:
continue
#如果tag是包含字母和冒号<tag k="addr:housenumber" v="1412"/>需要解析成{‘id‘: 12345, ‘key‘: ‘housenumber‘, ‘value‘: ‘1412‘, ‘type‘: ‘addr‘}
elif colon:
#从父节点获取id属性的值
Node_Tags[‘id‘] = element.attrib[‘id‘]
#获取k="addr:housenumber"的值,以:拆分,第一个值为type的值
type_value = child.attrib[‘k‘].split(‘:‘,1)[0]
Node_Tags[‘type‘] = type_value
#获取k="addr:housenumber"的值,以:拆分,第二个值为key的值
Node_Tags[‘key‘] = child.attrib[‘k‘].split(‘:‘,1)[1]
#获取v=1412的值,为value的值
Node_Tags[‘value‘] = child.attrib[‘v‘]
#将处理后的数据加入到字典中
tags.append(Node_Tags)
#tag不包含冒号<tag k="building" v="yes"/>
else:
#从父节点获取id属性的值
Node_Tags[‘id‘] = element.attrib[‘id‘]
#type的值是 regular
Node_Tags[‘type‘] = ‘regular‘
#获取k=building的值,为key的值
Node_Tags[‘key‘] = child.attrib[‘k‘]
#获取v=yes的值,为value的值
Node_Tags[‘value‘] = child.attrib[‘v‘]
#将处理后的数据加入到字典中
tags.append(Node_Tags)
#返回node处理之后的结果
return {‘node‘: node_attribs, ‘node_tags‘: tags}
#在提取way字段
elif element.tag == ‘way‘:
1.循环way_field表头,如果element中有key所对应的属性,则放入到way_attribs字典中
for key in WAY_FIELDS:
way_attribs[key] = element.attrib[key]
counter = 0 #计数,用于填充way下面nd子标签的position的值
#循环父节点下的子节点
for child in element:
Way_Nodes = {} #存放nd子标签
Way_Tags = {} #存放tag子标签
#处理nd子标签
if child.tag == ‘nd‘:
#从父节点获取id属性的值
Way_Nodes[‘id‘] = element.attrib[‘id‘]
#从自身的ref,来获取该属性的值
Way_Nodes[‘node_id‘] = child.attrib[‘ref‘]
#获取position的值,每循环一次nd,counter + 1
Way_Nodes[‘position‘] = counter
counter += 1
将处理后的nd子节点数据加入到字典中
way_nodes.append(Way_Nodes)
#处理tag子标签
elif child.tag == ‘tag‘:
#同处理node下的tag子节点
colon = re.match(LOWER_COLON,child.attrib[‘k‘])
problem = re.match(PROBLEMCHARS,child.attrib[‘k‘])
if problem:
continue
elif colon:
Way_Tags[‘id‘] = element.attrib[‘id‘]
type_value = child.attrib[‘k‘].split(‘:‘,1)[0]
Way_Tags[‘key‘] = child.attrib[‘k‘].split(‘:‘,1)[1]
Way_Tags[‘type‘] = type_value
Way_Tags[‘value‘] = child.attrib[‘v‘]
tags.append(Way_Tags)
else:
Way_Tags[‘id‘] = element.attrib[‘id‘]
Way_Tags[‘key‘] = child.attrib[‘k‘]
Way_Tags[‘type‘] = ‘regular‘
Way_Tags[‘value‘] = child.attrib[‘v‘]
tags.append(Way_Tags)
return {‘way‘: way_attribs, ‘way_nodes‘: way_nodes, ‘way_tags‘: tags}
def get_element(osm_file, tags=(‘node‘, ‘way‘, ‘relation‘)):
"""Yield element if it is the right type of tag"""
"""如果是正确的类型时,返回标签中的tag"""
context = ET.iterparse(osm_file, events=(‘start‘, ‘end‘))
_, root = next(context)
for event, elem in context:
if event == ‘end‘ and elem.tag in tags:
yield elem
root.clear()
def validate_element(element, validator, schema=SCHEMA):
"""Raise ValidationError if element does not match schema"""
"""当和schema的数据格式不匹配时,抛出异常"""
if validator.validate(element, schema) is not True:
field, errors = next(validator.errors.iteritems())
message_string = "\nElement of type ‘{0}‘ has the following errors:\n{1}"
error_string = pprint.pformat(errors)
raise Exception(message_string.format(field, error_string))
class UnicodeDictWriter(csv.DictWriter, object):
"""Extend csv.DictWriter to handle Unicode input"""
"""扩展csv下的DictWriter方法的去支持Unicode输入"""
def writerow(self, row):
super(UnicodeDictWriter, self).writerow({
k: (v.encode(‘utf-8‘) if isinstance(v, unicode) else v) for k, v in row.iteritems()
})
def writerows(self, rows):
for row in rows:
self.writerow(row)
def process_map(file_in, validate):
"""Iteratively process each XML element and write to csv(s)"""
"""将处理好的xml文件写入到csv中"""
with codecs.open(NODES_PATH, ‘w‘) as nodes_file, codecs.open(NODE_TAGS_PATH, ‘w‘) as nodes_tags_file, codecs.open(WAYS_PATH, ‘w‘) as ways_file, codecs.open(WAY_NODES_PATH, ‘w‘) as way_nodes_file, codecs.open(WAY_TAGS_PATH, ‘w‘) as way_tags_file:
nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)
nodes_writer.writeheader()
node_tags_writer.writeheader()
ways_writer.writeheader()
way_nodes_writer.writeheader()
way_tags_writer.writeheader()
validator = cerberus.Validator()
for element in get_element(file_in, tags=(‘node‘, ‘way‘)):
el = shape_element(element)
if el:
if validate is True:
validate_element(el, validator)
if element.tag == ‘node‘:
nodes_writer.writerow(el[‘node‘])
node_tags_writer.writerows(el[‘node_tags‘])
elif element.tag == ‘way‘:
ways_writer.writerow(el[‘way‘])
way_nodes_writer.writerows(el[‘way_nodes‘])
way_tags_writer.writerows(el[‘way_tags‘])
第二种方法
高阶解法
def shape_tag(el, tag):
#tag标签返回的格式 tag = { ‘id‘ : el.attrib[‘id‘], ‘key‘ : tag.attrib[‘k‘], ‘value‘: tag.attrib[‘v‘], ‘type‘ : ‘regular‘ } if LOWER_COLON.match(tag[‘key‘]):
#如果tag的key中出现冒号<tag k="addr:housenumber" v="1412"/>,则根据:进行拆分,获取type和key tag[‘type‘], _, tag[‘key‘] = tag[‘key‘].split(‘:‘) return tag def shape_way_node(el, i, nd):
#way下的nd标签返回的格式 return { ‘id‘ : el.attrib[‘id‘], ‘node_id‘ : nd.attrib[‘ref‘], ‘position‘ : i } def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS, problem_chars=PROBLEMCHARS, default_tag_type=‘regular‘): """Clean and shape node or way XML element to Python dict""" node_attribs = {} way_attribs = {} way_nodes = []
#直接获取所有的tag子标签 tags = [shape_tag(element, t) for t in element.iter(‘tag‘)] # Handle secondary tags the same way for both node and way elements # YOUR CODE HERE if element.tag == ‘node‘: node_attribs = {f: element.attrib[f] for f in node_attr_fields} return {‘node‘: node_attribs, ‘node_tags‘: tags} elif element.tag == ‘way‘: way_attribs = {f: element.attrib[f] for f in way_attr_fields} #获取way标签下nd标签的各个值 way_nodes = [shape_way_node(element, i, nd) for i, nd in enumerate(element.iter(‘nd‘))] return {‘way‘: way_attribs, ‘way_nodes‘: way_nodes, ‘way_tags‘: tags}
标签:class sage ror mat ignore erp order free form
原文地址:http://www.cnblogs.com/luhuajun/p/7977561.html