InfluxDB 2.0 的数据查询语法

文章目录

    目标

    • 查询最近的50条数据。可以分页,符合 Antd Pro Table 的显示规范。
    • 查询指定时间段内的数据。
    • 在时间跨度大的情况下,可以聚合数据,避免取出的数据过多。
    • 指定 field key 查询数据,单 field,多 field
    • golang sdk 相关的查询方法使用

    Flux

    Flux 是 InfluxDB 2.0 引入的一门查询语言,号称借鉴了 Js 的语法。

    相关英文单词

    • influx: n. 流入;汇集;河流的汇集处
    • flux: n. 流量;变迁;不稳定;流出; vt. 使熔融;用焊剂处理; vi. 熔化;流出
    • mean: 平均值
    • aggregate: 聚合
    • downsampling: 降采样

    通过实例学习 Flux 查询语法

    实例一:

    from(bucket:"example-bucket")
      |> range(start: -1h, stop: -10m)
    
    • 第一行指定数据源,即指定 bucket
    • 第二行限定时间区间。Flux 查询必须限定时间区间。
    • 如果 stop 省略,就默认是当前时间。
    • 竖杠右箭头组合,是 Pipe-forward operator, 类似 linux 管道。即将上一步的查询结果,传递给下一步操作

    实例二:

    from(bucket:"example-bucket")
      |> range(start: -15m)
      |> filter(fn: (r) =>
        r._measurement == "cpu" and
        r._field == "usage_system" and
        r.cpu == "cpu-total"
      )
      |> yield()
    
    • filter 对每行 record 进行过滤
    • fn 是 function 的缩写,类似 js 中的匿名函数,这里非常像箭头函数
    • r 是 record 的缩写
    • yield 默认是可以省略的,只有当包含多个查询时,才需要使用 yield。但是我没理解什么是多个查询。。。
    • measurement, field 比较好理解,最后一个过滤条件 cpu 是 tag 过滤

    分页

    https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/built-in/transformations/limit/#offset

    limit(
      n:10,
      offset: 0
    )
    

    代码:

    from(bucket:"example-bucket")
      |> range(start:-1h)
      |> limit(n:10)
    

    需要注意的是,这里的 limit 数,是指每个 table 返回一条数据,所以如果有 4 个 table,就会返回 4 条。。。

    例如,有4个 field key,就是 4 个 Record,分属于 4 个不同的 table。

    按时间倒序

    from(bucket:"example-bucket")
      |> order(desc: true)
      |> limit(n:10)
    

    https://docs.influxdata.com/flux/v0.x/stdlib/universe/sort/

    注意:但是使用 pivot 时,这个倒序的排序很诡异,尽量不要使用

    golang sdk

    InfluxDB Golang Client 能返回两种格式的查询结果:

    • QueryTableResult
    • Raw string

    如何将 QueryTableResult 转成 JSON 输出:

    https://stackoverflow.com/questions/65425375/convert-data-obtained-from-influxdb-to-json

    // QueryTableResult parses streamed flux query response into structures representing flux table parts
    // Walking though the result is done by repeatedly calling Next() until returns false.
    // Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
    // Data are acquired by Record() method.
    // Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
    type QueryTableResult struct {
    	io.Closer
    	csvReader     *csv.Reader
    	tablePosition int
    	tableChanged  bool
    	table         *query.FluxTableMetadata
    	record        *query.FluxRecord
    	err           error
    }
    
    // FluxRecord represents row in the flux query result table
    type FluxRecord struct {
    	table  int
    	values map[string]interface{}
    }
    
    // Value returns the default _value column value or nil if not present
    func (r *FluxRecord) Value() interface{} {
    	return r.ValueByKey("_value")
    }
    
    // Values returns map of the values where key is the column name
    func (r *FluxRecord) Values() map[string]interface{} {
    	return r.values
    }
    

    从 QueryTableResult 的结构看,还是定义一个 struct slice 然后循环写入比较靠谱。

    打印一行看看数据结构:

    	// get QueryTableResult
    	result, err := models.QueryAPI.Query(context.Background(), `
    		from(bucket:"sunzhongwei")
    			|> range(start: -7d) 
    			|> filter(fn: (r) => 
    				r._measurement == "sunzhongwei"
    			)`)
    	if err == nil {
    		// Iterate over query response
    		for result.Next() {
    			// Access data
    			//fmt.Printf("value: %v\n", result.Record().Value())
    			fmt.Printf("value: %v\n", result.Record().Values())
    		}
    		// check for an error
    		if result.Err() != nil {
    			fmt.Printf("query parsing error: %s\n", result.Err().Error())
    		}
    	} else {
    		panic(err)
    	}
    

    对输出进行了排版,方便查看

    value: map[
    	_field:temperature 
    	_measurement:sunzhongwei 
    	_start:2021-06-09 08:42:28.6152305 +0000 UTC 
    	_stop:2021-06-16 08:42:28.6152305 +0000 UTC 
    	_time:2021-06-15 08:15:04.0473985 +0000 UTC 
    	_value:87.32 
    	deviceId:justtest 
    	result:_result 
    	table:4
    ]
    

    可以看到里面有不少多余的字段,例如 measurement, start, stop;
    实际上只需要保留 time, value,field, deviceId (自定义 tag) 就足够在前端展示了。

    参考

    • https://docs.influxdata.com/influxdb/v2.0/query-data/get-started/
    • https://github.com/influxdata/influxdb-client-go#queries

    关于作者 🌱

    我是来自山东烟台的一名开发者,有感兴趣的话题,或者软件开发需求,欢迎加微信 zhongwei 聊聊,或者关注我的个人公众号“大象工具”, 查看更多联系方式