python 基于Avro实现RPC

Download

Avro implementations for C, C++, C#, Java, PHP, Python, and Ruby can be downloaded from the Apache Avro™ Releases page. This guide uses Avro 1.7.7, the latest version at the time of writing. Download and unzip avro-1.7.7.tar.gz, and install via python setup.py (this will probably require root privileges). Ensure that you can import avro from a Python prompt.

$ tar xvf avro-1.7.7.tar.gz
$ cd avro-1.7.7
$ sudo python setup.py install
$ python
>>> import avro # should not raise ImportError
      

Alternatively, you may build the Avro Python library from source. From your the root Avro directory, run the commands

$ cd lang/py/
$ ant
$ sudo python setup.py install
$ python
>>> import avro # should not raise ImportError



Avro用于数据序列化

Defining a schema

Avro schemas are defined using JSON. Schemas are composed of primitive types (null, boolean, int, long, float, double, bytes, and string) andcomplex types (record, enum, array, map, union, and fixed). You can learn more about Avro schemas and types from the specification, but for now let's start with a simple schema example, user.avsc:

{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}
      

This schema defines a record representing a hypothetical user. (Note that a schema file can only contain a single schema definition.) At minimum, a record definition must include its type ("type": "record"), a name ("name": "User"), and fields, in this case name, favorite_number, and favorite_color. We also define a namespace ("namespace": "example.avro"), which together with the name attribute defines the "full name" of the schema (example.avro.User in this case).

Fields are defined via an array of objects, each of which defines a name and type (other attributes are optional, see the record specification for more details). The type attribute of a field is another schema object, which can be either a primitive or complex type. For example, the name field of our User schema is the primitive type string, whereas the favorite_number and favorite_color fields are both unions, represented by JSON arrays. unions are a complex type that can be any of the types listed in the array; e.g., favorite_number can either be an int or null, essentially making it an optional field.

Serializing and deserializing without code generation

Data in Avro is always stored with its corresponding schema, meaning we can always read a serialized item, regardless of whether we know the schema ahead of time. This allows us to perform serialization and deserialization without code generation. Note that the Avro Python library does not support code generation.

Try running the following code snippet, which serializes two users to a data file on disk, and then reads back and deserializes the data file:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

schema = avro.schema.parse(open("user.avsc").read())

writer = DataFileWriter(open("users.avro", "w"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()

reader = DataFileReader(open("users.avro", "r"), DatumReader())
for user in reader:
    print user
reader.close()
      

This outputs:

{u'favorite_color': None, u'favorite_number': 256, u'name': u'Alyssa'}
{u'favorite_color': u'red', u'favorite_number': 7, u'name': u'Ben'}
      

Let's take a closer look at what's going on here.

schema = avro.schema.parse(open("user.avsc").read())
      

avro.schema.parse takes a string containing a JSON schema definition as input and outputs a avro.schema.Schema object (specifically a subclass of Schema, in this case RecordSchema). We're passing in the contents of our user.avsc schema file here.

writer = DataFileWriter(open("users.avro", "w"), DatumWriter(), schema)
      

We create a DataFileWriter, which we'll use to write serialized items to a data file on disk. The DataFileWriter constructor takes three arguments:

  • The file we'll serialize to
  • A DatumWriter, which is responsible for actually serializing the items to Avro's binary format (DatumWriters can be used separately fromDataFileWriters, e.g., to perform IPC with Avro TODO: is this true??).
  • The schema we're using. The DataFileWriter needs the schema both to write the schema to the data file, and to verify that the items we write are valid items and write the appropriate fields.
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
        

We use DataFileWriter.append to add items to our data file. Avro records are represented as Python dicts. Since the field favorite_color has type["int", "null"], we are not required to specify this field, as shown in the first append. Were we to omit the required name field, an exception would be raised. Any extra entries not corresponding to a field are present in the dict are ignored.

reader = DataFileReader(open("users.avro", "r"), DatumReader())
        

We open the file again, this time for reading back from disk. We use a DataFileReader and DatumReader analagous to the DataFileWriter andDatumWriter above.

for user in reader:
    print user
        

The DataFileReader is an iterator that returns dicts corresponding to the serialized items.

 

Avro RPC

 

Server:

 

from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import avro.ipc as ipc
import avro.protocol as protocol
import avro.schema as schema

PROTOCOL = protocol.parse(open("../avro/mail.avpr").read())

class MailResponder(ipc.Responder):
    def __init__(self):
        ipc.Responder.__init__(self, PROTOCOL)

    def invoke(self, msg, req):
        if msg.name == 'send':
            message = req['message']
            return ("Sent message to " + message['to']
                    + " from " + message['from']
                    + " with body " + message['body'])
        else:
            raise schema.AvroException("unexpected message:", msg.getname())

class MailHandler(BaseHTTPRequestHandler):
  def do_POST(self):
    self.responder = MailResponder()
    call_request_reader = ipc.FramedReader(self.rfile)
    call_request = call_request_reader.read_framed_message()
    resp_body = self.responder.respond(call_request)
    self.send_response(200)
    self.send_header('Content-Type', 'avro/binary')
    self.end_headers()
    resp_writer = ipc.FramedWriter(self.wfile)
    resp_writer.write_framed_message(resp_body)

server_addr = ('localhost', 9090)

if __name__ == '__main__':
    server = HTTPServer(server_addr, MailHandler)
    server.allow_reuse_address = True
    server.serve_forever()

Client:

    print("Result: " + requestor.request('send', params))

    # cleanup
    client.close()

First run the server as:

./start_server.py

Then run the client as:

./send_message.py avro_user pat Hello_World

 

Python avro 实现的是基于 http transport 的,其实avro rpc 还有另外一种---基于netty(java框架)的

如何做基于socket的而不是HTTPServer的呢?

调试发现,在

call_request = call_request_reader.read_framed_message() 这一句,调用ipc模块的读网络数据,如果我用TCPServer,直接读socket,会出错
而在上面的例子里调试发现,打印读出来的数据为:

\xda\xad\xf2s\xb13\xf48R\xcds\xcaT\xbf\xccQ\x00\xda\xad\xf2s\xb13\xf48R\xcds\xcaT\xbf\xccQ\x00\x00\x08send\ntanzh\x0ckuiper\nhello

同时如果用wireshark抓包的话,发现数据为:

 00 00 00 4c da ad f2 73 b1 33 f4 38 52 cd 73 ca ...L...s.3.8R.s.
 54 bf cc 51 00 da ad f2 73 b1 33 f4 38 52 cd 73 T..Q....s.3.8R.s
 ca 54 bf cc 51 00 00 08 73 65 6e 64 16 62 6a 6b .T..Q...send.bjk
 6a 64 78 62 62 62 62 62 16 68 6e 63 6c 79 7a 61 jdxbbbbb.hnclyza
 61 61 61 61 16 68 65 6c 6c 6f 20 77 6f 72 6c 64 aaaa.hello world
 00 00 00 00 ....

而前4个字节(00 00 00 4c)表示的长度 正好是 4+content-length+4,也就是说例子里哪个地方略去了前4个字节

同样resp_body 为 \x00\x00\x00\x00\x00\x00bSent message to tanzh from kuiper with body hello

抓包的话,前后也都多4个字节

 

总结:avro 一般分http 和 netty实现,或者说分有状态和无状态。基于http的,属于无状态,格式为 4字节表示block的长度+block+4字节表示的0 以表达frame结束。

在flume里 貌似是这样,不知道是不是flume那边项目组自定义的,直接基于socket,格式为 xid(4字节,序列号) + block 个数(4字节)+blocks,每个block都是4bytes length + content

 

另外对于无状态的,要求每次请求前都有handshake,而有状态的只需要一次。

python,修改ipc.py 里的 FramedReader 和 FramedWriter 以及Respond 类很容易实现基于socket的非http实现。

上一篇:wireshark 语法使用详解


下一篇:四:由四进制计数器制作三进制计数器