标签:
由于最近在整理公司的培训事情,首先培训的就是Avro,故这里做一个记录
一、介绍,直接看官网来得快
官方网站:http://avro.apache.org/
1.1、Maven项目构建pom示例
所需要的jar包
<!-- 测试类 --> <dependency> <groupId> junit</groupId > <artifactId> junit</artifactId > <version> 4.12</version > </dependency> <!-- 序列化需要的jar --> <dependency> <groupId> org.apache.avro</groupId > <artifactId> avro</artifactId > <version> 1.7.7</version > </dependency> <!-- rpc 通讯需要的jar --> <dependency> <groupId> org.apache.avro</groupId > <artifactId> avro-ipc</artifactId> <version> 1.7.7</version > </dependency>
所需要的插件,如果是在外部生成,可以不要
<plugin > <groupId > org.apache.avro</ groupId > <artifactId > avro-maven-plugin</ artifactId > <version > 1.7.7</ version > <executions > <execution> <phase> generate-sources</phase > <goals> <!-- Schema序列化 --> <!-- <goal>schema</goal> --> <!-- RPC协议通讯 --> <goal> protocol</goal > </goals> <configuration> <!-- 源目录,用于存放 avro的schema文件及protocol文件 --> <sourceDirectory> ${project.basedir}/src/main/resources/</ sourceDirectory > <outputDirectory> ${project.basedir}/src/main/java/</ outputDirectory > </configuration> </execution> </executions > </plugin >
二、消息结构
Avro的模式主要由JSON对象来表示,Avro支持8种基本类型(Primitive Type)和6种复杂类型(Complex Type:records、enums、arrays、maps、unions 和fixed),基本类型可以由JSON字符串来表示。
Avro支持两种序列化编码方式:二进制编码和JSON编码,使用二进制编码会高效序列化,并且序列化后得到的结果会比较小。
2.1、基本数据类型
null: 表示没有值 0字节
boolean: 表示一个二进制布尔值 一个字节 0-false,1-true
int: 表示32位有符号整数
long: 表示64位有符号整数
float: 表示32位的单精度浮点数(IEEE 754)4字节
double: 表示64位双精度浮点数(IEEE 754) 8字节
bytes: 表示8位的无符号字节序列
string: Unicode 编码的字符序列
总共就这8种原生数据类型,这些原生数据类型均没有明确的属性。
原生数据类型也可以使用JSON定义类型名称,比如schema "string"和{"type": "string"}是同义且相等的。
2.2、复合数据类型
2.2.1、records使用类型名称“record”,并且支持三个必选属性。
type: 必有属性。
name: 必有属性,是一个JSON string,提供了记录的名字。
namespace,也是一个JSON string,用来限定和修饰name属性,动态方式生成后,为Java的包名
doc: 可选属性,是一个JSON string,为使用这个Schema的用户提供文档。
aliases: 可选属性,是JSON的一个string数组,为这条记录提供别名。
fields: 必选属性,是一个JSON数组,数组中列举了所有的field。每一个field都是一个JSON对象,并且具有如下属性:
name: 必选属性,field的名字,是一个JSON string,类似一个Java属性。
doc: 可选属性,为使用此Schema的用户提供了描述此field的文档。
type: 必选属性,定义Schema的一个JSON对象,或者是命名一条记录定义的JSON string。
default: 可选属性,即field的默认值,当读到缺少这个field的实例时用到。
例如:
{"namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
说明:以上表示建立了一个复合类型User,具有三个字段属性,分别是字符串类型的name,整型的favorite_number,字符串类型的favorite_color,报名:example.avro,类名为:user
2.2.2、“enum”的type并且支持如下的属性:
name: 必有属性,是一个JSON string,提供了enum的名字。
namespace,也是一个JSON string,用来限定和修饰name属性。
aliases: 可选属性,是JSON的一个string数组,为这个enum提供别名。
doc: 可选属性,是一个JSON string,为使用这个Schema的用户提供文档。
symbols: 必有属性,是一个JSON string数组,列举了所有的symbol,在enum中的所有symbol都必须是唯一的,不允许重复。比如下面的例子:
例如
{ "type": "enum", "name": "Suit", "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] }
说明:以上表示建立了一个Suit的枚举类,含有symbols里面的属性
2.2.3、Array使用名为"array"的type,并且支持一个属性 items: array中元素的Schema
例如
{ "type": "record", "name": "ArrAvro", "fields" : [ {"name": "arr", "type": ["null",{"type":"array", "items":"string"}]} ] }
2.2.4、Map使用名为"map"的type,并且支持一个属性 values: 用来定义map的值的Schema。Maps的key都是string。
比如一个key为string,value为long的maps定义为:
例如
{
"type": "record",
"name": "MapAvro",
"fields" : [
{"name": "map", "type": ["null",{"type":"map", "values":"string"}]}
]
}
2.2.5、序列化文件样式及RPC通讯协议样式编写
序列化schema文件
Members.avsc
{ "namespace":"com.ifree.serrpc.builder", "type":"record", "name":"Members", "fields":[ { "name":"userName", "type":"string" }, { "name":"userPwd", "type":"string" }, { "name":"realName", "type":"string" } ] }
说明:该文件中的namespace命名空间在生成代码的时候会自动生成包路径;type类型是record复合类型;name为Members,在生成的时候,会生成一个Members类,建议首字母大写;fields是一个字段集合,里面的每一个字段类似与Java的实体字段,如userName字段,类型为String
RPC通讯协议protocol文件
Members.avpr
{ "namespace":"com.ifree.serrpc.builder", "protocol":"MemberIFace", "types":[ { "type":"record", "name":"Members", "fields":[ { "name":"userName", "type":"string" }, { "name":"userPwd", "type":"string" }, { "name":"realName", "type":[ "string", "null" ] } ] }, { "type":"record", "name":"Retmsg", "fields":[ { "name":"msg", "type":"string" } ] } ], "messages":{ "login":{ "doc":"member login.", "request":[ { "name":"m", "type":"Members" } ], "response":"Retmsg" } } }
说明:该文件中,namespace表示包路径;protocol表示协议,名字是MemberIFace,这里在工具生成的时候会生成一个类,故首字母大写;types是一个类型集合,返回类型和请求类型都可以在这里定义,里面的定义方式可参考序列化化定义文件;messages是表示一个请求返回消息体,login表示一个Java方法,里面有两部分组成,请求和响应,request表示请求,里面是一个members类型的对象,response表示相应,返回值,这里是Retmsg对象。
三、序列化编码
Avro有两种序列化编码:binary和JSON。
四、序列化
在使用序列化时,我们可以有两种方式来实现,一种是静态方式,采用schema文件来生成所需要的类,然后直接调用类里面的实现;另一种是动态方式,直接采用代码解析schema文件内容,动态设置内容。
五、RPC
在使用Avro进行RPC通讯时,我们可以有两种方式来实现,一种是静态方式,采用protocol文件来生成所需要的类,然后直接调用类里面的实现;另一种是动态方式,直接采用代码解析protocol文件内容,动态设置内容。
六、综合示例
序列化Schema和RPC的Protocol都是上面的文件,这里不做列出
Java代码:
服务端代码:含(动态|工具)序列化、RPC通讯代码
package com.ifree.serrpc.avro; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Random; import org.apache.avro.Protocol; import org.apache.avro.Protocol.Message; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.ipc.HttpServer; import org.apache.avro.ipc.NettyServer; import org.apache.avro.ipc.Server; import org.apache.avro.ipc.generic.GenericResponder; import org.apache.avro.ipc.specific.SpecificResponder; import org.apache.avro.specific.SpecificDatumWriter; import org.junit.Test; import com.ifree.serrpc.builder.MemberIFace; import com.ifree.serrpc.builder.Members; import com.ifree.serrpc.impl.MemberIFaceImpl; /** * 会员信息处理服务端 * * @author ifree * */ public class MemberServerProvider { /** * 动态序列化:通过动态解析Schema文件进行内容设置,并序列化内容 * * @throws IOException */ @Test public void MemberInfoDynSer() throws IOException { // 1.解析schema文件内容 Parser parser = new Parser(); Schema mSchema = parser.parse(this.getClass().getResourceAsStream("/Members.avsc")); // 2.构建数据写对象 DatumWriter<GenericRecord> mGr = new SpecificDatumWriter<GenericRecord>(mSchema); DataFileWriter<GenericRecord> mDfw = new DataFileWriter<GenericRecord>(mGr); // 3.创建序列化文件 mDfw.create(mSchema, new File("E:/avro/members.avro")); // 4.添加序列化数据 for (int i = 0; i < 20; i++) { GenericRecord gr = new GenericData.Record(mSchema); int r = i * new Random().nextInt(50); gr.put("userName", "xiaoming-" + r); gr.put("userPwd", "9999" + r); gr.put("realName", "小明" + r + "号"); mDfw.append(gr); } // 5.关闭数据文件写对象 mDfw.close(); System.out.println("Dyn Builder Ser Start Complete."); } /** * 通过Java工具生成文件方式进行序列化操作 命令:C:\Users\Administrator>java -jar * E:\avro\avro-tools-1.7.7.jar compile schema E:\avro\Members.avsc E:\avro * * @throws IOException */ @Test public void MemberInfoToolsSer() throws IOException { // 1.为Member生成对象进行设置必要的内容,这里实现三种设置方式的演示 // 1.1、构造方式 Members m1 = new Members("xiaoming", "123456", "校名"); // 1.2、属性设置 Members m2 = new Members(); m2.setUserName("xiaoyi"); m2.setUserPwd("888888"); m2.setRealName("小艺"); // 1.3、Builder方式设置 Members m3 = Members.newBuilder().setUserName("xiaohong").setUserPwd("999999").setRealName("小红").build(); // 2.构建反序列化写对象 DatumWriter<Members> mDw = new SpecificDatumWriter<Members>(Members.class); DataFileWriter<Members> mDfw = new DataFileWriter<Members>(mDw); // 2.1.通过对Members.avsc的解析创建Schema Schema schema = new Parser().parse(AvroSerProvider.class.getClass().getResourceAsStream("/Members.avsc")); // 2.2.打开一个通道,把schema和输出的序列化文件关联起来 mDfw.create(schema, new File("E:/avro/members.avro")); // 4.把刚刚创建的Users类数据追加到数据文件写入对象中 mDfw.append(m1); mDfw.append(m2); mDfw.append(m3); // 5.关闭数据文件写入对象 mDfw.close(); System.out.println("Tools Builder Ser Start Complete."); } // ******************************************************ser // end********************************************************* /** * 服务端支持的网络通讯协议有:NettyServer、SocketServer、HttpServer * 采用HTTPSERVER方式调用 * * @throws IOException * @throws InterruptedException */ @Test public void MemberHttpRPCDynBuilderServer() throws IOException, InterruptedException { // 1.进行业务处理 GenericResponder gr = bussinessDeal(); // 2.开启一个HTTP服务端,进行等待客户端的连接 Server server = new HttpServer(gr, 60090); server.start(); System.out.println("Dyn Builder PRC Start Complete."); server.join(); } /** * 服务端支持的网络通讯协议有:NettyServer、SocketServer、HttpServer * 采用Netty方式调用 * * @throws IOException * @throws InterruptedException */ @Test public void MemberNettyRPCDynBuilderServer() throws IOException, InterruptedException { // 1.进行业务处理 GenericResponder gr = bussinessDeal(); // 2.开启一个Netty服务端,进行等待客户端的连接 Server server = new NettyServer(gr, new InetSocketAddress(60090)); server.start(); System.out.println("Dyn Builder PRC Start Complete."); server.join(); } /** * 主要进行业务处理 服务端逻辑处理 采用动态生成代码处理方式,客户端和服务端只需要有protocol文件即可,不需要手工生成代码 * * @return * @throws IOException */ private GenericResponder bussinessDeal() throws IOException { // 1.构建协议 final Protocol protocol = Protocol.parse(this.getClass().getResourceAsStream("/Members.avpr")); // 2.构建业务逻辑及响应客户端 GenericResponder gr = new GenericResponder(protocol) { @Override public Object respond(Message message, Object request) throws Exception { System.err.println("request:" + request); // 3.获取请求信息 GenericRecord record = (GenericRecord) request; GenericRecord retGr = null; // 4.判断请求的方法 if (message.getName().equals("login")) { // 5.获取到传输的参数 Object obj = record.get("m"); GenericRecord mGr = (GenericRecord) obj; String userName = mGr.get("userName").toString(); String userPwd = mGr.get("userPwd").toString(); // 6.进行相应的业务逻辑处理 System.out.println("Members:" + ",userName:" + userName + mGr + ",userPwd:" + userPwd); String retMsg; if (userName.equalsIgnoreCase("rita") && userPwd.equals("123456")) { retMsg = "哈哈,恭喜你,成功登录。"; System.out.println(retMsg); } else { retMsg = "登录失败。"; System.out.println(retMsg); } // 7.获取返回值类型 retGr = new GenericData.Record(protocol.getMessages().get("login").getResponse()); // 8.构造回复消息 retGr.put("msg", retMsg); } System.err.println("DEAL SUCCESS!"); return retGr; } }; return gr; } /** * Java工具生成协议代码方式:java -jar E:\avro\avro-tools-1.7.7.jar compile protocol E:\avro\Members.avpr E:\avro * 功能和动态调用方式一致 * @throws InterruptedException */ @Test public void MemberNettyRPCToolsBuilderServer() throws InterruptedException{ //1.构造接口和实现类的映射相应对象,MemberIFaceImpl该类为具体的业务实现类 SpecificResponder responder=new SpecificResponder(MemberIFace.class, new MemberIFaceImpl()); //2.Netty启动RPC服务 Server server=new NettyServer(responder, new InetSocketAddress(60090)); server.start(); System.out.println("Tools Builder PRC Start Complete."); server.join(); } }
客户端代码:含(动态|工具)反序列化、RPC通讯代码
package com.ifree.serrpc.avro; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URL; import org.apache.avro.Protocol; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.ipc.HttpTransceiver; import org.apache.avro.ipc.NettyTransceiver; import org.apache.avro.ipc.Transceiver; import org.apache.avro.ipc.generic.GenericRequestor; import org.apache.avro.ipc.specific.SpecificRequestor; import org.apache.avro.specific.SpecificDatumReader; import org.junit.Test; import com.ifree.serrpc.builder.MemberIFace; import com.ifree.serrpc.builder.Members; import com.ifree.serrpc.builder.Retmsg; /** * 服务消费者 该类测试了通过工具和动态序列化及反序列化两种方式,同时测试了通过工具生成代码及动态调用RPC服务两种方式 * * @author ifree * */ public class MemberServerConsumer { /** * 动态反序列:通过Schema文件进行动态反序列化操作 * * @throws IOException */ @Test public void MemberInfoDynDeser() throws IOException { // 1.schema文件解析 Parser parser = new Parser(); Schema mSchema = parser.parse(this.getClass().getResourceAsStream("/Members.avsc")); // 2.构建数据读对象 DatumReader<GenericRecord> mGr = new SpecificDatumReader<GenericRecord>(mSchema); DataFileReader<GenericRecord> mDfr = new DataFileReader<GenericRecord>(new File("E:/avro/members.avro"), mGr); // 3.从序列化文件中进行数据反序列化取出数据 GenericRecord gr = null; while (mDfr.hasNext()) { gr = mDfr.next(); System.err.println("deser data:" + gr.toString()); } mDfr.close(); System.out.println("Dyn Builder Ser Start Complete."); } /** * 通过Java工具来生成必要的类,进行反序列化操作 * * @throws IOException */ @Test public void MemberInfoToolsDeser() throws IOException { // 1.构建反序列化读取对象 DatumReader<Members> mDr = new SpecificDatumReader<Members>(Members.class); DataFileReader<Members> mDfr = new DataFileReader<Members>(new File("E:/avro/members.avro"), mDr); Members m = null; // 2.循环读取文件数据 while (mDfr.hasNext()) { m = mDfr.next(); System.err.println("tools deser data :" + m); } // 3.关闭读取对象 mDfr.close(); System.out.println("Tools Builder Ser Start Complete."); } /** * 采用HTTP方式建立和服务端的连接 * * @throws IOException */ @Test public void MemberHttpRPCDynBuilderClient() throws IOException { // 1.建立和服务端的http通讯 Transceiver transceiver = new HttpTransceiver(new URL("http://192.168.1.116:60090")); bussinessDeal(transceiver); } /** * 采用Netty方式建立和服务端的连接 * * @throws IOException */ @Test public void MemberNettyRPCDynBuilderClient() throws IOException { // 1.建立和服务端的Netty通讯 Transceiver transceiver = new NettyTransceiver(new InetSocketAddress("192.168.1.116", 60090)); // 2.进行必要的业务处理 bussinessDeal(transceiver); } /** * 进行必要的业务处理 * * @param transceiver * @throws IOException */ private void bussinessDeal(Transceiver transceiver) throws IOException { // 2.获取协议 Protocol protocol = Protocol.parse(this.getClass().getResourceAsStream("/Members.avpr")); // 3.根据协议和通讯构造请求对象 GenericRequestor requestor = new GenericRequestor(protocol, transceiver); // 4.根据schema获取messages主节点内容 GenericRecord loginGr = new GenericData.Record(protocol.getMessages().get("login").getRequest()); // 5.在根据协议里面获取request中的schema GenericRecord mGr = new GenericData.Record(protocol.getType("Members")); // 6.设置request中的请求数据 mGr.put("userName", "rita"); mGr.put("userPwd", "123456"); // 7、把二级内容加入到一级message的主节点中 loginGr.put("m", mGr); // 8.设置完毕后,请求方法,正式发送访问请求信息,并得到响应内容 Object retObj = requestor.request("login", loginGr); // 9.进行解析操作 GenericRecord upGr = (GenericRecord) retObj; System.out.println(upGr.get("msg")); } /** * Java工具生成协议代码方式:java -jar E:\avro\avro-tools-1.7.7.jar compile protocol * E:\avro\Members.avpr E:\avro 功能和动态调用方式一致 * * @throws InterruptedException * @throws IOException */ @Test public void MemberNettyRPCToolsBuilderClient() throws InterruptedException, IOException { // 1.和服务端建立通讯 Transceiver transceiver = new NettyTransceiver(new InetSocketAddress("192.168.1.116", 60090)); // 2.获取客户端对象 MemberIFace memberIFace = SpecificRequestor.getClient(MemberIFace.class, transceiver); // 3.进行数据设置 Members members = new Members(); members.setUserName("rita"); members.setUserPwd("123456"); // 开始调用登录方法 Retmsg retmsg = memberIFace.login(members); System.out.println("Recive Msg:" + retmsg.getMsg()); } }
具体业务实现类
package com.ifree.serrpc.impl; import org.apache.avro.AvroRemoteException; import com.ifree.serrpc.builder.MemberIFace; import com.ifree.serrpc.builder.Members; import com.ifree.serrpc.builder.Retmsg; /** * 具体的业务处理类 * @author Administrator * */ public class MemberIFaceImpl implements MemberIFace { final String userName="rita"; final String userPwd="888888"; /** * 登录业务处理 */ @Override public Retmsg login(Members m) throws AvroRemoteException { //验证登录权限 if(m.getUserName().equals(userName)&&m.getUserPwd().equals(userPwd)){ return new Retmsg("恭喜你,登录成功,欢迎进入AVRO测试环境。"); } return new Retmsg("对不起,权限不足,不能登录。"); } }
demo地址:https://git.oschina.net/ifree613/SerRpcDemo.git
标签:
原文地址:http://my.oschina.net/tearsky/blog/509610