码迷,mamicode.com
首页 > 编程语言 > 详细

python实现: protobuf解释器

时间:2015-09-18 15:19:46      阅读:261      评论:0      收藏:0      [点我收藏+]

标签:

之前项目为了自动化,所以写一个protobuf的解释器,用来生成项目所需的格式。

当然现在通过以下链接的指导,跳过手工分析,直接生成代码了。

https://developers.google.com/protocol-buffers/docs/reference/cpp-generated

 

这次文档主要是描述如何分析protobuf格式,以及如何收集需要的符号。

使用python 2.7脚本进行文本的处理。

程序分成4个模块:

expression: 格式的解析

symbol:在protobuf中定义的message等对象以及它们的层次结构,在这里已经看不见protobuf的样子了。

typecollection:基础类型定义和收集message等对象。

builder:遍历symbol,根据需要创建适合的输出文件。typecollection起到索引的作用。

这次就不演示了。

 

1 测试用protobuf文件。(来源于google示例)

package tutorial;

message Person {
  required string name = 1;
  required int32 id = 2 ;
  optional string email = 3;

  enum PhoneType {
    MOBILE = 0;
    HOME = 1;
    WORK = 2;
  }

  message PhoneNumber {
    required string number = 1;
    optional PhoneType type = 2 [default = HOME];
  }

  repeated PhoneNumber phone = 4;
}

message AddressBook {
  repeated Person person = 1;
}

 

 

2 expression实现---最简单的扫描方法,分析每一个word。

# -*- coding: UTF-8 -*-

import sys
import os
import string
import shutil
import io
import pb_symbol

class StringBuffer(object):
     def __init__(self,src):
          self.src  = src;         
          pass;

     def __del__(self):
          self.buf = None;
          pass;    

     def OpenFile(self):
          self.Data = open(self.src).read()   
          pass;

class Expression(object):

     desc_set = set([‘required‘,‘optional‘,‘repeated‘])

     b_char_set = set([‘A‘,‘B‘,‘C‘,‘D‘,‘E‘
                       ,‘F‘,‘G‘,‘H‘,‘I‘,‘J‘
                       ,‘K‘,‘L‘,‘M‘,‘N‘,‘O‘
                       ,‘P‘,‘Q‘,‘R‘,‘S‘,‘T‘
                       ,‘U‘,‘V‘,‘W‘,‘X‘,‘Y‘,‘Z‘])

     l_char_set = set ([‘a‘,‘b‘,‘c‘,‘d‘,‘e‘
                        ,‘f‘,‘g‘,‘h‘,‘i‘,‘j‘
                        ,‘k‘,‘l‘,‘m‘,‘n‘,‘o‘
                        ,‘p‘,‘q‘,‘r‘,‘s‘,‘t‘
                        ,‘u‘,‘v‘,‘w‘,‘x‘,‘y‘,‘z‘])

     digit_set = set([0,1,2,3,4,5,6,7,8,9])

     equals_char = ‘=‘
     space_char = ‘ ‘
     openbrace_char = ‘{‘
     closebrace_char = ‘}‘
     semicolon_char = ‘;‘
     tab_char = chr(9)
     newline_char = chr(10)
     return_char = chr(13)
     slash_char = chr(47)
     ctl_char_set = set([openbrace_char,closebrace_char,semicolon_char,equals_char,‘\n‘,‘\r‘,‘\t‘,‘=‘,‘;‘,space_char])

     empty_char_set = set ([space_char,tab_char,newline_char,return_char])

     symbol_char_set = b_char_set | l_char_set | digit_set 
     all_char_set = symbol_char_set | ctl_char_set

     def backup(self):
          return self.index;

     def restore(self,prevIndex):
          self.index = prevIndex;
          
  
     
     def forwardChar(self):
          if(self.index < self.count):
               self.index = self.index +1
               
     def backChar(self):
          if(self.index > 0):
               self.index = self.index -1

     def getchar(self):
          if( self.index < self.count):
               char = self.Buf.Data[self.index]
               self.forwardChar()
               return char

          return None;

     def skipComment(self):
          bkIndex = self.backup();
          while 1:
               char = self.getchar()             
               next_char = self.getchar()
               if(char != self.slash_char or next_char != self.slash_char):
                    self.restore(bkIndex)
                    return;
               while 1:
                    char = self.getchar()
                    if(char == None):
                         self.restore(bkIndex)
                         return;   
                    if(char == self.newline_char):
                         return;
                    

     def getSpecialChar(self,currentchar):	  
          while 1:
               self.skipComment()
               char = self.getchar();
               if(char == None):
                    break;
               else:
                    if(char == currentchar):
                         break;	    
          return char;     
     
   

     def getVisibleChar(self):
          while 1:
               self.skipComment()
               char = self.getchar();
               if(char is None):
                    break;
               else:
                    if(char not in self.empty_char_set):
                         break;	    
          return char;         
     

     def getNextword(self):
          word = None
          got1st = 0
          while 1:
               self.skipComment()
               char = self.getchar()
               if(char == None): 
                    break;
               if(got1st == 0):
                    if(char not in self.ctl_char_set):
                         word = char
                         got1st = 1
               else:
                    if(char in self.ctl_char_set):
                         self.backChar()
                         break;
                    else:
                         word = word + char			 
          return word;



     def do_enum_item(self,pbEnum):
          memText = self.getNextword();	
          self.getSpecialChar(self.equals_char);
          memValue = self.getNextword();	
          self.getSpecialChar(self.semicolon_char);	  
          pbEnum.append_Member(memText,memValue)


     def do_enum_proc(self):	  
          symbol = self.getNextword();          
          pbEnum = pb_symbol.PBEnum(symbol)
          while 1:	     
               currentIndex = self.backup()
               word = self.getNextword();	       
               if(word == None):
                    break;
               self.restore(currentIndex)
               self.do_enum_item(pbEnum)    	
               end_char_Index = self.backup();
               char = self.getVisibleChar();	      
               if(char == self.closebrace_char):
                    break;
               else:
                    self.restore(end_char_Index);   
          self.symbol.append_enum(pbEnum)

     def do_message_proc(self):  
          symbol = self.getNextword();     
          pbMsg = pb_symbol.PBMessage(symbol)
          while 1:
               currentIndex = self.backup()
               word = self.getNextword();

               if(word == None):
                    break;

               if(word in self.token_set):		    		   
                    subSymbol = pb_symbol.Symbol(self.symbol.tpDict,self.symbol.entity_full_path,False);
                    subSymbol.update_namespace(symbol);			 
                    self.restore(currentIndex);
                    subExp = Expression(self.Buf,subSymbol);
                    subExp.index = self.index;
                    subExp.do_expression();               
                    self.index = subExp.index
                    self.symbol.append_symbol(subSymbol)
                    pbMsg.enableSymbol = 1
               else:
                    if(word in self.desc_set):			 
                         memType = self.getNextword();			 			 
                         memText = self.getNextword();				 			 
                         pbMsg.append_Member(word,memType,memText)
                         self.getSpecialChar(self.semicolon_char);				 

                    end_char_Index = self.backup();
                    char = self.getVisibleChar();		    
                    if(char == self.closebrace_char):
                         break;
                    else:			
                         self.restore(end_char_Index);

          self.symbol.append_message(pbMsg)


     def do_import_proc(self):  		  
          self.getSpecialChar(self.semicolon_char);

     def do_package_proc(self):
          word = self.getNextword();   
          self.symbol.update_namespace(word)
          self.getSpecialChar(self.semicolon_char);

     token_set = { ‘message‘:do_message_proc
                   ,‘enum‘:do_enum_proc
                   ,‘import‘:do_import_proc
                   ,‘package‘:do_package_proc
                   }   

     def do_expression(self):
          while 1:	   
               current_index = self.backup();
               token = self.getNextword();          
               if(token == None): 
                    break;	       
               if(token in self.token_set):
                    proc = self.token_set[token];
                    proc(self);
               else:
                    self.restore(current_index)
                    break;	


     def __init__(self,sBuf,symbol):
          self.Buf = sBuf;
          self.index = 0;
          self.count = len(self.Buf.Data)
          self.symbol = symbol;	  

  

 

3 symbol--定义对象类型以及层次

# -*- coding: UTF-8 -*-

import os
import string
import pb_typecollection




class PBEntity(object):
      def __init__(self,entName,rtname):
            self.entName = entName;    
            self.orgName = entName[1:]   
            self.rtname = rtname
          
            
      def outputDebug(self):
            pass;
      
      def create_impl(self,entity_indent,top_ns):
            batch_list = list();
            return batch_list;
    
      def mem_include(self,entName):
            return False;

class PBMessageMember(object):
      def __init__(self,option,memType,memText):            
            self.option = option;
            self.memType = memType;
            self.memText = memText;
      
      def outputDebug(self):
            print(self.option,self.memType,self.memText)
      
      @property
      def mem_option(self):
            return self.option
      
      @property
      def mem_type(self):
            return self.memType;

      
      @property
      def mem_text(self):
            return self.memText      
      
class PBMessage(PBEntity):          
      
      def __init__(self,entName):         
            PBEntity.__init__(self,entName, entName );
            self.members  = []            
            self.enableSymbol = 0;   
            self.rt_ns = ‘‘;
            self.tpDict = None
       
      @property
      def Members(self):
            return self.members
            
            
      def attach_tp_dict(self,tpDict):
            self.tpDict = tpDict;     
            
      def append_Member(self,option,memType,memText):      
            msgMem = PBMessageMember(option,memType,memText)
            self.members.append(msgMem)
            
      def enable_Symbol(self,enable):
            self.enableSymbol = enable;
            
      def outputDebug(self,ns):
            print(ns,‘message‘,self.entName);
            for entMsg in self.members:                  
                  entMsg.outputDebug();
            print(‘‘);      
            
      def attach_tp_dict(self,tpDict):
            self.tpDict = tpDict;
            
                        
      def set_rt_ns(self,rt_entity_full_path):
            self.rt_ns = rt_entity_full_path
            
      def mem_include(self,entName):
            for entMsg in self.members:   
                  if(entName == entMsg.memType):
                        return True;
            return False;
      
      def detect_request(self):
            if(self.members.count > 0 ): 
                  return True;
            return False;
                  
   
                

class PBEnumMember(object):
      def __init__(self,memText,memValue):                        
            self.memText = memText;
            self.memValue = memValue;           
            
      def outputDebug(self):
            print(self.memText,self.memValue)
            
            
class PBEnum( PBEntity):
      def __init__(self,entName):
            PBEntity.__init__(self,entName,entName);
            self.members  = []
            
      def append_Member(self,memText,memValue):      
            msgMem = PBEnumMember(memText,memValue)
            self.members.append(msgMem)      
            
      def outputDebug(self,ns):
            print(ns,‘enum‘,self.entName);
            for entEnum in self.members:                  
                  entEnum.outputDebug();
            print(‘‘);      
            
    

class Symbol(object):
      def __init__(self,tpDict,fullpath,rooted):
            self.namespace = ‘‘
            self.tpDict = tpDict
            self.rooted = rooted
            self.entity_full_path = fullpath
            self.rt_entity_full_path = fullpath            
            self.entitylist = []
            self.containerlist = []
            
      def __del__(self):
            pass;
      
      def update_namespace(self,namespace):            
            self.namespace = namespace;
            if(self.rooted == False):
                  if(self.entity_full_path == ‘‘):
                        self.entity_full_path = namespace     
                        self.rt_entity_full_path = namespace
                  else:      
                        self.entity_full_path = ‘%s_%s‘ %(self.entity_full_path,namespace)
                        self.rt_entity_full_path = ‘%s_%s‘ %(self.entity_full_path,namespace)
        
      def append_type_dict(self,entity,isMsg):
            if(isMsg == True):
                  if(self.entity_full_path == ‘‘):
                        self.tpDict.insert_type(entity.entName                                            
                                                ,entity.rtname
                                                ,entity
                                                ,‘‘)
                  else:
                        self.tpDict.insert_type(entity.entName
                                                ,‘%s::%s‘ % (self.rt_entity_full_path, entity.rtname)
                                                ,entity
                                                ,‘‘)
            else:
                  if(self.entity_full_path == ‘‘):
                        self.tpDict.insert_type(entity.entName                                            
                                                ,entity.rtname
                                                ,entity
                                                ,entity.rtname)
                  else:
                        self.tpDict.insert_type(entity.entName
                                                ,‘%s::%s‘ % (self.rt_entity_full_path, entity.rtname)
                                                ,entity
                                                ,‘%s::%s‘ % (self.entity_full_path, entity.rtname))
                  
      
      def append_message(self,msg):            
            self.entitylist.append(msg)
            self.containerlist.append(msg)
            msg.attach_tp_dict(self.tpDict);
            if(self.rt_entity_full_path == ‘‘):
                  msg.set_rt_ns(self.rt_entity_full_path)
            else:
                  msg.set_rt_ns(self.rt_entity_full_path + ‘_‘)
            self.append_type_dict(msg,True)
            
      def append_enum(self,enum):
            self.entitylist.append(enum)
            self.append_type_dict(enum,False)
            
      def append_symbol(self,symbol):
            self.entitylist.append(symbol)
            self.containerlist.append(symbol)
            
      def outputDebug(self,ns):
            for entity in self.entitylist:                  
                  entity.outputDebug(ns +‘::‘+self.namespace);
                        
      def query_entitylist(self):
            return self.entitylist;
      
      def query_containerlist(self):
            return self.containerlist;
      
      def query_pb_ns(self):
            return self.namespace;
      
      def mem_include(self,entName):
            for entity in self.entitylist:     
                  if(entity.mem_include(entName) == True):
                        return True;
            return False;
                         
                  
class PBProxy(object):
      def __init__(self,entity):
            self.entity = entity
      
      @property
      def enableSymbol(self):
            return self.entity.enableSymbol
      
      def mem_include(self,entName):      
            return self.entity.mem_include(entName)

      def create_impl(self,entity_indent,top_ns):
            return self.entity.create_impl(entity_indent,top_ns)
            
      
      @property
      def entName(self):
            return self.entity.entName;      
      
      @property
      def rtname(self):
            return self.entity.rtname;
      
      @property
      def orgName(self):
            return self.entity.orgName;
      
      @property
      def members(self):
            return self.entity.members;
             
      @property
      def rt_ns(self):
            return self.entity.rt_ns;
      
      
      @property
      def namespace(self):
            return self.entity.namespace;
      
      
      @property
      def rooted(self):
            return self.entity.rooted;
        
      @property
      def entity_full_path(self):
            return self.entity.entity_full_path;
      
      
      @property
      def rt_entity_full_path(self):
            return self.entity.rt_entity_full_path;
      
      @property
      def entitylist(self):
            return self.entity.entitylist
      
      
      @property
      def containerlist(self):
            return self.entity.containerlist    
     
      
      @property
      def tpDict(self):
            return self.entity.tpDict;
      
     
      def detect_request(self):
            return self.entity.detect_request()
      
      
      @property
      def Members(self):
            return self.entity.members
      
      @property
      def mem_option(self):
            return self.entity.mem_option
      
      @property
      def mem_type(self):
            return self.entity.mem_type;
      
      @property
      def mem_text(self):
            return self.entity.mem_text      

 

 

4 typecollection

# -*- coding: UTF-8 -*-


import os
import pb_symbol


class typeDict(object):
     op_req_desc = ‘required‘
     op_opt_desc = ‘optional‘
     op_rep_desc = ‘repeated‘
     def __init__(self):
          self.collection  = dict()
          self.insert_type(‘int32‘,‘__int32‘,pb_symbol.PBEntity(‘int32‘,‘int32‘),‘‘)
          self.insert_type(‘int64‘,‘__int64‘,pb_symbol.PBEntity(‘int64‘,‘int64‘),‘‘)
          self.insert_type(‘uint32‘,‘unsigned int‘,pb_symbol.PBEntity(‘uint32‘,‘uint32‘),‘‘)
          self.insert_type(‘bool‘,‘bool‘,pb_symbol.PBEntity(‘bool‘,‘bool‘),‘‘)
          self.insert_type(‘float‘,‘float‘,pb_symbol.PBEntity(‘float‘,‘float‘),‘‘)
          self.insert_type(‘double‘,‘double‘,pb_symbol.PBEntity(‘double‘,‘double‘),‘‘)
          self.insert_type(‘string‘,‘const char*‘,pb_symbol.PBEntity(‘string‘,‘string‘),‘‘)
          self.insert_type(‘bytes‘,‘const char*‘,pb_symbol.PBEntity(‘bytes‘,‘bytes‘),‘‘)        
               
        
        
     def insert_type(self, entName, rtType,entity,orgType):        
          self.collection[entName] = (rtType,entity,orgType);        
          
     def output_debug(self):
          print(‘type collection‘)
          for item in self.collection.items():
               print(item);
       

 

5 测试脚本

# -*- coding: UTF-8 -*-

import pb_symbol
import pb_expression
import pb_typecollection

if __name__ == ‘__main__‘:
     
     pb_file = ‘google_tutorial.proto‘
     sBuf = pb_expression.StringBuffer(pb_file);     
     tpDict = pb_typecollection.typeDict()
     symbol = pb_symbol.Symbol(tpDict,‘‘,True);
     try:
          sBuf.OpenFile();
          exp = pb_expression.Expression(sBuf,symbol);
          exp.do_expression();
          symbol.outputDebug(‘‘);    
          tpDict.output_debug();       
     except Exception as exc:     
          print("%s",exc);
     print("done");

 

6 输出

命名空间:::tutorial::Person

类型名称:PhoneType

(‘::tutorial::Person‘, ‘enum‘, ‘PhoneType‘)   
(‘MOBILE‘, ‘0‘)
(‘HOME‘, ‘1‘)
(‘WORK‘, ‘2‘)

(‘::tutorial::Person‘, ‘message‘, ‘PhoneNumber‘)
(‘required‘, ‘string‘, ‘number‘)
(‘optional‘, ‘PhoneType‘, ‘type‘)

(‘::tutorial‘, ‘message‘, ‘Person‘)
(‘required‘, ‘string‘, ‘name‘)
(‘required‘, ‘int32‘, ‘id‘)
(‘optional‘, ‘string‘, ‘email‘)
(‘repeated‘, ‘PhoneNumber‘, ‘phone‘)

(‘::tutorial‘, ‘message‘, ‘AddressBook‘)
(‘repeated‘, ‘Person‘, ‘person‘)

 

type collection
(‘PhoneNumber‘, (‘Person::PhoneNumber‘, <pb_symbol.PBMessage object at 0x02B9DED0>, ‘‘))
(‘int32‘, (‘__int32‘, <pb_symbol.PBEntity object at 0x02BE3F70>, ‘‘))
(‘string‘, (‘const char*‘, <pb_symbol.PBEntity object at 0x02BEE0F0>, ‘‘))
(‘double‘, (‘double‘, <pb_symbol.PBEntity object at 0x02BEE0B0>, ‘‘))
(‘float‘, (‘float‘, <pb_symbol.PBEntity object at 0x02BEE070>, ‘‘))
(‘bytes‘, (‘const char*‘, <pb_symbol.PBEntity object at 0x02BEE130>, ‘‘))
(‘Person‘, (‘Person‘, <pb_symbol.PBMessage object at 0x02BEE210>, ‘‘))
(‘bool‘, (‘bool‘, <pb_symbol.PBEntity object at 0x02BEE050>, ‘‘))
(‘PhoneType‘, (‘Person::PhoneType‘, <pb_symbol.PBEnum object at 0x02BEE450>, ‘Person::PhoneType‘))
(‘int64‘, (‘__int64‘, <pb_symbol.PBEntity object at 0x02BE3FB0>, ‘‘))
(‘uint32‘, (‘unsigned int‘, <pb_symbol.PBEntity object at 0x02BE3FF0>, ‘‘))
(‘AddressBook‘, (‘AddressBook‘, <pb_symbol.PBMessage object at 0x02BEE7B0>, ‘‘))

 

参考

protobuf的git地址:https://github.com/google/protobuf

python实现: protobuf解释器

标签:

原文地址:http://www.cnblogs.com/febwave/p/4819186.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!