目标
- 查询最近的50条数据。可以分页,符合 Antd Pro Table 的显示规范。
- 查询指定时间段内的数据。
- 在时间跨度大的情况下,可以聚合数据,避免取出的数据过多。
- 指定 field key 查询数据,单 field,多 field
- golang sdk 相关的查询方法使用
Flux
Flux 是 InfluxDB 2.0 引入的一门查询语言,号称借鉴了 Js 的语法。
相关英文单词
- influx: n. 流入;汇集;河流的汇集处
- flux: n. 流量;变迁;不稳定;流出; vt. 使熔融;用焊剂处理; vi. 熔化;流出
- mean: 平均值
- aggregate: 聚合
- downsampling: 降采样
通过实例学习 Flux 查询语法
实例一:
from(bucket:"example-bucket")
|> range(start: -1h, stop: -10m)
- 第一行指定数据源,即指定 bucket
- 第二行限定时间区间。Flux 查询必须限定时间区间。
- 如果 stop 省略,就默认是当前时间。
- 竖杠右箭头组合,是 Pipe-forward operator, 类似 linux 管道。即将上一步的查询结果,传递给下一步操作
实例二:
from(bucket:"example-bucket")
|> range(start: -15m)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage_system" and
r.cpu == "cpu-total"
)
|> yield()
- filter 对每行 record 进行过滤
- fn 是 function 的缩写,类似 js 中的匿名函数,这里非常像箭头函数
- r 是 record 的缩写
- yield 默认是可以省略的,只有当包含多个查询时,才需要使用 yield。但是我没理解什么是多个查询。。。
- measurement, field 比较好理解,最后一个过滤条件 cpu 是 tag 过滤
分页
https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/built-in/transformations/limit/#offset
limit(
n:10,
offset: 0
)
代码:
from(bucket:"example-bucket")
|> range(start:-1h)
|> limit(n:10)
需要注意的是,这里的 limit 数,是指每个 table 返回一条数据,所以如果有 4 个 table,就会返回 4 条。。。
例如,有4个 field key,就是 4 个 Record,分属于 4 个不同的 table。
按时间倒序
from(bucket:"example-bucket")
|> order(desc: true)
|> limit(n:10)
https://docs.influxdata.com/flux/v0.x/stdlib/universe/sort/
注意:但是使用 pivot 时,这个倒序的排序很诡异,尽量不要使用
golang sdk
InfluxDB Golang Client 能返回两种格式的查询结果:
- QueryTableResult
- Raw string
如何将 QueryTableResult 转成 JSON 输出:
https://stackoverflow.com/questions/65425375/convert-data-obtained-from-influxdb-to-json
// QueryTableResult parses streamed flux query response into structures representing flux table parts
// Walking though the result is done by repeatedly calling Next() until returns false.
// Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
// Data are acquired by Record() method.
// Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
type QueryTableResult struct {
io.Closer
csvReader *csv.Reader
tablePosition int
tableChanged bool
table *query.FluxTableMetadata
record *query.FluxRecord
err error
}
// FluxRecord represents row in the flux query result table
type FluxRecord struct {
table int
values map[string]interface{}
}
// Value returns the default _value column value or nil if not present
func (r *FluxRecord) Value() interface{} {
return r.ValueByKey("_value")
}
// Values returns map of the values where key is the column name
func (r *FluxRecord) Values() map[string]interface{} {
return r.values
}
从 QueryTableResult 的结构看,还是定义一个 struct slice 然后循环写入比较靠谱。
打印一行看看数据结构:
// get QueryTableResult
result, err := models.QueryAPI.Query(context.Background(), `
from(bucket:"sunzhongwei")
|> range(start: -7d)
|> filter(fn: (r) =>
r._measurement == "sunzhongwei"
)`)
if err == nil {
// Iterate over query response
for result.Next() {
// Access data
//fmt.Printf("value: %v\n", result.Record().Value())
fmt.Printf("value: %v\n", result.Record().Values())
}
// check for an error
if result.Err() != nil {
fmt.Printf("query parsing error: %s\n", result.Err().Error())
}
} else {
panic(err)
}
对输出进行了排版,方便查看
value: map[
_field:temperature
_measurement:sunzhongwei
_start:2021-06-09 08:42:28.6152305 +0000 UTC
_stop:2021-06-16 08:42:28.6152305 +0000 UTC
_time:2021-06-15 08:15:04.0473985 +0000 UTC
_value:87.32
deviceId:justtest
result:_result
table:4
]
可以看到里面有不少多余的字段,例如 measurement, start, stop; 实际上只需要保留 time, value,field, deviceId (自定义 tag) 就足够在前端展示了。
参考
- https://docs.influxdata.com/influxdb/v2.0/query-data/get-started/
- https://github.com/influxdata/influxdb-client-go#queries
微信关注我哦 👍
我是来自山东烟台的一名开发者,有感兴趣的话题,或者软件开发需求,欢迎加微信 zhongwei 聊聊, 查看更多联系方式