使用CortexAPI 实现Cortex的基本操作

官网上的 API 介绍

在官网上,提供了一下几种 API Remote API / Alerts & Rules API / Configs API / Endpoints / Manage Rules . 这里呢,我只说这里的 RemoteAPI .

我们先看一下这个 Remote API :
使用CortexAPI 实现Cortex的基本操作

写的很详细,目前是算比较详细的,笔者在一开始看的时候就几行,第一行:支持Prometheus 的 remote_read / remote_write , 第二行 , 对于Http post 里面的请求体首先是 Protocol Buffers , 再来一个 snappy . 第三行是 远程读写 api . 目前来看是又修改了下。而且我们也能看到,当前的这个 Cortex 的APIS 正在编写中。

Remote API

这里的 remote api 官网讲了两个,这两个对应着的是 Prometheus 中的 remote_write / remote_read . 这里对这两个做个简单的介绍,笔者这里也主要用到了的 remote_write .

remote_write

刚刚说了,这里的 remote_write 对应着的是 Prometheus 里面的那个 remote_write .所以呢,我们现在需要到 Prometheus 里面看看这个remote_write 的基本配置是什么样的。

我们进入到Prometheus 的官网,去看一下。 我这里不贴全部的,全部的大家去官网查看就好:

# The URL of the endpoint to send samples to.
url: <string>

# Timeout for requests to the remote write endpoint.
[ remote_timeout: <duration> | default = 30s ]

# List of remote write relabel configurations.
write_relabel_configs:
  [ - <relabel_config> ... ]

# Sets the `Authorization` header on every remote write request with the
# configured username and password.
# password and password_file are mutually exclusive.
basic_auth:
  [ username: <string> ]
  [ password: <string> ]
  [ password_file: <string> ]

# Sets the `Authorization` header on every remote write request with
# the configured bearer token. It is mutually exclusive with `bearer_token_file`.
[ bearer_token: <string> ]

# Sets the `Authorization` header on every remote write request with the bearer token
# read from the configured file. It is mutually exclusive with `bearer_token`.
[ bearer_token_file: /path/to/bearer/token/file ]

# Configures the remote write request's TLS settings.
tls_config:
  [ <tls_config> ]

# Optional proxy URL.
[ proxy_url: <string> ]

这里的配置,写的很清楚,这里笔者用到的主要是两个配置,一个是 url , 一个是 basic_auth ,大家可以参考我之前的一个文章使用Cortex实现Prometheus的多租户管理 . 很简单的一个remote_write 。

隐藏的 API

如果你以为,我这就完了,那你就错了。我这里讲的是 Cortex 里面的查询的 APIs , 这里的几个API 目前官网我还没看到,也可能是因为我没找到吧。

这里呢,没必要去跟大家介绍每一个API。大家看一下这个API接口就清楚了:

// API provides bindings for Prometheus's v1 API.
type API interface {
    // Alerts returns a list of all active alerts.
    Alerts(ctx context.Context) (AlertsResult, error)
    // AlertManagers returns an overview of the current state of the Prometheus alert manager discovery.
    AlertManagers(ctx context.Context) (AlertManagersResult, error)
    // CleanTombstones removes the deleted data from disk and cleans up the existing tombstones.
    CleanTombstones(ctx context.Context) error
    // Config returns the current Prometheus configuration.
    Config(ctx context.Context) (ConfigResult, error)
    // DeleteSeries deletes data for a selection of series in a time range.
    DeleteSeries(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) error
    // Flags returns the flag values that Prometheus was launched with.
    Flags(ctx context.Context) (FlagsResult, error)
    // LabelNames returns all the unique label names present in the block in sorted order.
    LabelNames(ctx context.Context) ([]string, api.Warnings, error)
    // LabelValues performs a query for the values of the given label.
    LabelValues(ctx context.Context, label string) (model.LabelValues, api.Warnings, error)
    // Query performs a query for the given time.
    Query(ctx context.Context, query string, ts time.Time) (model.Value, api.Warnings, error)
    // QueryRange performs a query for the given range.
    QueryRange(ctx context.Context, query string, r Range) (model.Value, api.Warnings, error)
    // Series finds series by label matchers.
    Series(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) ([]model.LabelSet, api.Warnings, error)
    // Snapshot creates a snapshot of all current data into snapshots/<datetime>-<rand>
    // under the TSDB's data directory and returns the directory as response.
    Snapshot(ctx context.Context, skipHead bool) (SnapshotResult, error)
    // Rules returns a list of alerting and recording rules that are currently loaded.
    Rules(ctx context.Context) (RulesResult, error)
    // Targets returns an overview of the current state of the Prometheus target discovery.
    Targets(ctx context.Context) (TargetsResult, error)
    // TargetsMetadata returns metadata about metrics currently scraped by the target.
    TargetsMetadata(ctx context.Context, matchTarget string, metric string, limit string) ([]MetricMetadata, error)
}

看到这个是不是就很好理解了?

我们在使用的时候,首先需要先知道一个结构体:

type Client struct {
    alertmanagerClient promapi.Client
    distributorAddress string
    timeout            time.Duration
    httpClient         *http.Client
    querierClient      promv1.API
    orgID              string
}

可以看到,在这个 Client 里面,有几个client , alertmanagerClient / httpClient / querierClient . 先不要好奇为什么会有这么多的 client . 这里的每一个 client 对应着的都不一样。这里会涉及到Cortex 的微服务的架构问题,这个我们后期再说。我这里先说怎么使用 API 做查询。 以及我们的 Push 。

Query

func NewClient(distributorAddress string, querierAddress string, alertmanagerAddress string, orgID string) (*Client, error) {
    // Create querier API client
    querierAPIClient, err := promapi.NewClient(promapi.Config{
        Address:      "http://" + querierAddress + "/api/prom",
        RoundTripper: &addOrgIDRoundTripper{orgID: orgID, next: http.DefaultTransport},
    })
    if err != nil {
        return nil, err
    }

    c := &Client{
        distributorAddress: distributorAddress,
        timeout:            5 * time.Second,
        httpClient:         &http.Client{},
        querierClient:      promv1.NewAPI(querierAPIClient),
        orgID:              orgID,
    }
    return c, nil
}

我们先得到一个 client , 这里的 promv1 是包 "github.com/prometheus/client_golang/api/prometheus/v1" .

// Query runs a query
func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
    value, _, err := c.querierClient.Query(context.Background(), query, ts)
    return value, err
}

这样就好了。

Push

这里的 Push ,就是发送数据到 Cortex 里面的。Client 用我们之前拿到的那个 Client

// Push the input timeseries to the remote endpoint
func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
    // Create write request
    data, err := proto.Marshal(&prompb.WriteRequest{Timeseries: timeseries})
    if err != nil {
        return nil, err
    }

    // Create HTTP request
    compressed := snappy.Encode(nil, data)
    req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
    if err != nil {
        return nil, err
    }

    req.Header.Add("Content-Encoding", "snappy")
    req.Header.Set("Content-Type", "application/x-protobuf")
    req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
    req.Header.Set("X-Scope-OrgID", c.orgID)

    ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
    defer cancel()

    // Execute HTTP request
    res, err := c.httpClient.Do(req.WithContext(ctx))
    if err != nil {
        return nil, err
    }

    defer res.Body.Close()
    return res, nil
}

其实就是一个客户端在往服务器里面写数据的过程。

总结

当然了,上面的程序都是有的。大家可以到 cortex 的源程序里面找到 , github 的速度很慢,大家可以到 这个地方去看 cortex

大家可以克隆到自己本地:

git clone https://gitee.com/sun-iot/cortex.git

在里面我们可以找到这个 Cortex 的几个隐藏的 API。

上一篇:squirrelmail(小松鼠web邮件系统)


下一篇:史上最全!阿里智能人机交互的核心技术解析