【OpenYurt 深度解析,Java编程教程视频

实现一个缓存数据的反向代理,第一想法就是从 response.Body 中读取数据,然后分别返回给请求 client 和本地的 Cache 模块。伪代码如下:


func HandleResponse(rw http.ResponseWriter, resp *http.Response) {

        bodyBytes, _ := ioutil.ReadAll(resp.Body)

        go func() {

                // cache response on local disk

                cacher.Write(bodyBytes)

        }



        // client reads data from response

        rw.Write(bodyBytes)

}



当深入思考后,在 Kubernetes 系统中,上述实现会引发下面的问题:

  • 问题 1:流式数据需要如何处理(如: K8s 中的 watch 请求),意味 ioutil.ReadAll() 一次调用无法返回所有数据。即如何可以返回流数据同时又缓存流数据。

  • 问题 2:同时在本地缓存数据前,有可能需要对传入的 byte slice 数据先进行清洗处理。这意味着需要修改 byte slice,或者先备份 byte slice 再处理。这样会造成内存的大量消耗,同时针对流式数据,到底申请多大的 slice 也不好处理。

3. 优雅实现探讨


针对上面的问题,我们将问题逐个抽象,可以发现更优雅的实现方法。

  • 问题 1:如何对流数据同时进行读写

针对流式数据的读写(一边返回一边缓存),如下图所示,其实需要的不过是把 response.Body(io.Reader) 转换成一个 io.Reader 和一个 io.Writer。或者说是一个 io.Reader 和 io.Writer 合成一个 io.Reader。这很容易就联想到 Linux 里面的 Tee 命令。

【OpenYurt 深度解析,Java编程教程视频

而在 Golang 中 Tee 命令是实现就是io.TeeReader,那问题 1 的伪代码如下:


func HandleResponse(rw http.ResponseWriter, resp *http.Response) {

        // create TeeReader with response.Body and cacher

        newRespBody := io.TeeReader(resp.Body, cacher)



        // client reads data from response

        io.Copy(rw, newRespBody)

}



通过 TeeReader 的对 Response.Body 和 Cacher 的整合,当请求 client 端从 response.Body 中读取数据时,将同时向 Cache 中写入返回数据,优雅的解决了流式数据的处理。

  • 问题 2:如何在缓存前先清洗流数据

如下图所示,缓存前先清洗流数据,请求端和过滤端需要同时读取 response.Body(2 次读取问题)。也就是需要将 response.Body(io.Reader) 转换成两个 io.Reader。

【OpenYurt 深度解析,Java编程教程视频

也意味着问题 2 转化成:问题 1 中缓存端的 io.Writer 转换成 Data Filter 的 io.Reader。其实在 Linux 命令中也能找到类似命令,就是管道。因此问题 2 的伪代码如下:


func HandleResponse(rw http.ResponseWriter, resp *http.Response) {

        pr, pw := io.Pipe()

        // create TeeReader with response.Body and Pipe writer

        newRespBody := io.TeeReader(resp.Body, pw)

        go func() {

                // filter reads data from response 

                io.Copy(dataFilter, pr)

        }



        // client reads data from response

        io.Copy(rw, newRespBody)

}



通过 io.TeeReader 和 io.PiPe,当请求 client 端从 response.Body 中读取数据时,Filter 将同时从 Response 读取到数据,优雅的解决了流式数据的 2 次读取问题。

YurtHub 实现

===============================================================================

最后看一下 YurtHub 中相关实现,由于 Response.Body 为 io.ReadCloser,所以实现了 dualReadCloser。同时 YurtHub 可能也面临对 http.Request 的缓存,所以增加了 isRespBody 参数用于判定是否需要负责关闭 response.Body。


// https://github.com/openyurtio/openyurt/blob/master/pkg/yurthub/util/util.go#L156

// NewDualReadCloser create an dualReadCloser object

func NewDualReadCloser(rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser) {

    pr, pw := io.Pipe()

    dr := &dualReadCloser{

        rc:         rc,

        pw:         pw,

        isRespBody: isRespBody,

    }



    return dr, pr

}



type dualReadCloser struct {

    rc io.ReadCloser

    pw *io.PipeWriter

    // isRespBody shows rc(is.ReadCloser) is a response.Body

    // or not(maybe a request.Body). if it is true(it's a response.Body),

    // we should close the response body in Close func, else not,

    // it(request body) will be closed by http request caller

    isRespBody bool

}



// Read read data into p and write into pipe

func (dr *dualReadCloser) Read(p []byte) (n int, err error) {

    n, err = dr.rc.Read(p)

    if n > 0 {

        if n, err := dr.pw.Write(p[:n]); err != nil {

            klog.Errorf("dualReader: failed to write %v", err)

            return n, err

        }

    }



    return

}



// Close close two readers

func (dr *dualReadCloser) Close() error {

    errs := make([]error, 0)

    if dr.isRespBody {

        if err := dr.rc.Close(); err != nil {

            errs = append(errs, err)

        }

    }



    if err := dr.pw.Close(); err != nil {

        errs = append(errs, err)


# 写在最后

**[CodeChina开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频】](https://codechina.****.net/m0_60958482/java-p7)**


还有一份JAVA核心知识点整理(PDF):**JVM,JAVA集合,JAVA多线程并发,JAVA基础,Spring原理,微服务,Netty与RPC**,网络,日志,Zookeeper,Kafka,RabbitMQ,Hbase,**MongoDB,Cassandra,设计模式,负载均衡,数据库,一致性哈希,JAVA算法,数据结构,加密算法,分布式缓存**,Hadoop,Spark,Storm,YARN,机器学习,云计算...

tps://codechina.****.net/m0_60958482/java-p7)**


还有一份JAVA核心知识点整理(PDF):**JVM,JAVA集合,JAVA多线程并发,JAVA基础,Spring原理,微服务,Netty与RPC**,网络,日志,Zookeeper,Kafka,RabbitMQ,Hbase,**MongoDB,Cassandra,设计模式,负载均衡,数据库,一致性哈希,JAVA算法,数据结构,加密算法,分布式缓存**,Hadoop,Spark,Storm,YARN,机器学习,云计算...

![image](https://www.icode9.com/i/ll/?i=img_convert/7a3eb87b619244894e58c940554e8631.png)
上一篇:Python之replace()方法失效


下一篇:树链剖分入门