通过 Lua 脚本处理数据
简介
DataWay 支持引入 Lua 脚本对数据进行处理后再上报到 DataFlux 中心。当你有下列需求时可考虑使用 Lua 脚本:
- 需要对采集的数据进行清洗,比如添加、修改、删除标签,甚至删除整个指标集
- 需要将第三方数据源(比如 MySQL、Postgres、SQL Server、API 请求、CSV 文件、JSON 文件)中数据合并到采集的数据点然后在上报 DataFlux 中
在 DataWay 中引入 Lua 脚本
DataWay 中支持 2 种作用域的 Lua 脚本:global lua 和 route lua。
global lua
全局 lua 程序,独立于所有功能之外,可设置定时执行该脚本,多用于更新全局 cache 数据,比如需要定时请求某个 API 接口,获取某个采集对象的属性信息并缓存到 DataWay 中。
配置方式
打开 DataWay 的配置文件 dataway.yaml
,配置 global_lua 项,其中 path
为 lua 文件路径,circle
为当前文件的执行周期,例如:
global_lua:
- path: xxx.lua
circle: 0 * * * 0,6
- path: xxx.lua
circle: 0 * * * 0,6
circle
使用 Unix Cron 格式,支持 ”秒、分、时、天、月”,具体如下表:
Field name | Mandatory? | Allowed values | Example to |
---|---|---|---|
Seconds | Yes | 0-59 | 31 * * * * |
Minutes | Yes | 0-59 | 0 25 * * * |
Hours | Yes | 0-23 | 0 0 17 * * |
Day of month | Yes | 1-31 | 0 0 0 12 * |
Month | Yes | 1-12 or JAN-DEC | 0 0 0 0 9 |
route lua
路由级别的 lua 脚本,一个路由可配置多个 lua 脚本,可对上报到路由的数据依次执行 Lua 脚本
配置方式
打开 DataWay 的配置文件 dataway.yaml
,配置文件 routes _config 项
# route config
routes_config:
- name: default
lua :
- path: default01.lua
- path: default02.lua
- path: default03.lua
- name: test
lua :
- path: default01.lua
- path: default02.lua
- path: default03.lua
注意:任意一组执行出错,将不再执行下一组,并丢弃此次数据内容不再上报,对执行错误写入日志
编写 Lua 脚本
在配置文件中指定 lua script 文件路径,该文件内容格式为:
function handle(points)
-- lua code
return points
end
其中:
function handle(points)
为整个 script 的执行入口,所有的 lua 脚本都在该函数内部,points
为传入的数据参数points
类型为 table,类似于c语言系的struct[]
,其中struct[]
的各个成员变量分别是name, tags, fields, time
,详细介绍如下:name
string,本组数据的存储表名,一般不会改变tags
table,标签组,该 table 中只允许存放 string 类型的数据fields
table,数据组,可以存放 string/number/bool 类型的数据time
本条记录的时间戳,类型 number ,一般不会改变
points
类型为数据所用类型,handle 函数传入 points,函数结束返回 points,points 格式固定,但支持用户自定义其中的字段内容,这也是 lua 脚本的核心功能points
示例
points
{
{
name = "t_name",
tags = {
t1 = "tags_01",
t2 = "tags_02"
},
fields = {
f1 = "fields_01",
f2 = 12345,
f3 = true
},
time = 1575970133841355008
},
{
name = "t_name",
tags = {
t1 = "tags_03",
t2 = "tags_04"
},
fields = {
f1 = "fields_02",
f2 = 66666,
f3 = false
},
time = 1575970133841355008
}
}
- 在函数末尾依照 lua 语法添加
end
内置函数
为了方便用户进行处理,DataWay 中的 lua 脚本内置了一些常用函数库
http_request
函数签名:
response_table, error_string = http_request(method_string, URL_string, headers_table {
query_string,
headers_table,
body_string
})
query,headers,body如果不存在可以不写
示例:
function handle(points)
print("http test:")
response, error = http_request("GET", "http://www.baidu.com", {
query="",
headers={
Accept="*/*"
},
body="test123"
})
if error == nil then
print(response.body)
print(response.status_code)
else
print(error)
end
return points
end
函数有2个返回值,response 和 error,如果函数执行错误,则 response 为 nil,error 为 string 类型的错误说明。
response 是 table 类型,各个字段名和类型分别为:
- headers (table)
- cookies (table)
- status_code (number)
- url (string)
- body (string)
- body_size (number)
数据库连接
sql 采用“数据库+连接信息”的方式进行连接,现支持的数据库和对应的信息模板如下:
database name | connect information format |
---|---|
mysql | username:password@protocol(address)/dbname?param=value |
postgres | postgres://user:password@localhost/dbname?sslmode=disable¶m=value |
mssql | sqlserver://username:password@host:port?database=dbname¶m=value |
sql_connect 接收的类型为连接数据库所用的 table,返回一条连接和错误信息。该连接含有一个成员函数 query,参数为查询语句,支持占位符操作,例如:
function handle(points)
print("mysql test:")
conn, err = sql_connect("mysql", "root:123456@tcp(192.168.0.2:3306)/test_mysql?charset=utf8")
if err ~= "" then
res, err = conn:query('SELECT * FROM students where year>?;', 1986)
if res ~= nil then
for _, row in pairs(res) do
for k, v in pairs(row) do
print(k, v)
end
end
else
print(err)
end
else
print(err)
end
conn:close()
return points
end
注:该连接使用结束后,需调用close()
关闭连接
json 编码和解析
json_decode 解析字符串为table json_encode 将 table 或 string 转为 json 格式的字符串
function handle(points)
json_str = '{ "hostname":"ubuntu18.04LTS", "date":"2019年12月10日 星期二 11时14分47秒 CST", "ip":["127.0.0.1","192.168.0.1","172.16.0.1"] }'
print("json_str:", json_str)
json_table = json_decode(json_str)
print(json_table)
for k, v in pairs(json_table) do
print(k, v)
end
for _, v in pairs(json_table["ip"]) do
print(, v)
end
return points
end
xml 解析
xml_decode 将 xml 格式字符串解析为 table
function handle(points)
xml_str ="<booklist><book>100</book><book>100.5</book><book>200</book></booklist>"
print("xml test:", xml_str)
xml_table = xml_decode(xml_str)
for _, row in pairs(xml_table) do
for k, v in pairs(row) do
for _, vv in pairs(v) do
print(vv)
print("number add 1", vv+1)
end
end
print("----------------")
end
return points
end
数值类型的解析优先级高于字符型,示例代码中100、100.5、200都解析成 Number 类型,可以进行数值操作(例如 +1)。
csv 解析
csv_decode 将 csv 格式字符串解析为N维 table ,N为csv行数减1
table 的 key 与首行 header 对应
function handle(points)
csv_str = "name,year,address\nAA,22, NewYork\nBB, 21, Seattle"
print("csv test:", csv_str)
csv_table = csv_decode(csv_str)
for _, row in pairs(csv_table) do
for k, v in pairs(row) do
print(k, v)
end
print("----------------")
end
return points
end
标准 csv 格式,分隔符为英文状态下逗号“,”
cache 缓存数据
KV 缓存,key 为 string 类型,value 为 string/number/table 任意类型
- cache_set 缓存写入
- cache_get 缓存读取,如果没有找到对应的 key,则返回 nil
- cache_list 缓存队列中所有的key值,类型为 table,可使用
#list
或table.getn(list)
得到该 list 的长度
function handle(points)
cache_set("AAA", "hello,world")
cache_set("BBB", 123456)
cache_set("CCC", true)
cache_set("DDD", { host = '10.100.64.106', port = 13306, database = 'golua_mysql', user = 'root', password = '123456' })
list = cache_list()
print("cache_list:")
for k,v in pairs(list) do
print(k, v)
end
print("cache key list: ", #list)
print("cache key list: ", table.getn(list))
print("AAA: ", cache_get("AAA"))
print("BBB: ", cache_get("BBB"))
print("CCC: ", cache_get("CCC"))
print("DDD --------")
dd = cache_get("DDD")
for k,v in pairs(dd) do
print(k, v)
end
return points
end
Regex 正则表达式
- re_quote 转义
- re_find 查找
- re_gsub 替换
- re_match 匹配
示例:
function handle(points)
-- quote ----------
print(re_quote("^$.?a"))
-- "\^\$\.\?a"
-- find ----------
print(re_find('', ''))
-- 1 0
print(re_find("abcd efgh ijk", "cd e", 1, true))
-- 3 6
print(re_find("abcd efgh ijk", "cd e", -1, true))
-- nil
print(re_find("abcd efgh ijk", "i([jk])"))
-- 11 12 "j"
-- gsub ----------
print(re_gsub("hello world", [[(\w+)]], "${1} ${1}"))
-- "hello hello world world" 2
print(re_gsub("name version", [[\w+]], {name="lua", version="5.1"}))
-- "lua-5.1.tar.gz" 2
print(re_gsub("name version", [[\w+]], {name="lua", version="5.1"}))
-- "lua 5.1" 2
print(re_gsub("$ world", "\\w+", string.upper))
-- "$ WORLD" 1
print(re_gsub("4+5 = $return 4+5$", "\\$(.*)\\$", function (s)
return loadstring(s)()
end))
-- "4+5 = 9" 1
-- match ----------
print(re_match("$$$ hello", "z"))
-- nil
print(re_match("$$$ hello", "\\w+"))
-- "hello"
print(re_match("hello world", "\\w+", 6))
-- "world"
print(re_match("hello world", "\\w+", -5))
-- "world"
print(re_match("from=world", "(\\w+)=(\\w+)"))
-- "from" "world"
return points
end
crypto 加密
加密解密以及 base64 转换
function signature:
base64_encode(data_str)
base64_decode(data_str)
hex_encode(data_str)
hmac(method_str, data_str, key_str, notHexString_bool)
method 为方法枚举 { "md5", "sha1", "sha256", "sha512" } 最后一个参数
notHexString
是 bool 类型,含义是“不进行编码为字符串操作”,默认是false
,即对输入或输出进行编码为字符串的操作
。
encrypt(data_str, method_str, key_str, iv_str, notHexString)
method 为方法枚举 { "aes_cbc", "des_cbc", "des_ecb" }
当方法为
aes_cbc
时,key 的长度可以是 16,24,32,iv 向量长度为 16
当方法为
des_cbc
或des_ecb
时,key 的长度可以是 8,iv 向量长度为 8
第三个形参和
hmac()
的 notHexString 意义相同。
decrypt(data_str, method_str, key_str, iv_str, notHexString)
同上
示例:
function handle(points)
print("----------- base64 ------------")
b64 = base64_encode("hello,world!")
print(b64)
print(base64_decode(b64))
print("---------- hex encode -----------")
print(hex_encode("Hello"))
print("----------- crc32 ------------")
print(crc32("hello,world!"))
print("----------- hmac ------------")
print(hmac("md5", "hello", "world"))
print(hmac("sha1", "hello", "world", true))
print(hmac("sha256", "hello", "world"))
print(hmac("sha512", "hello", "world", true))
print("----------- encrypt/decrypt ------------")
-- aes-cbc key length 16,24,32, iv length 16
aescbc = encrypt("hello", "aes-cbc", "1234abcd1234abcd", "iviv12345678abcd", true)
print(aescbc)
print(decrypt(aescbc, "aes-cbc", "1234abcd1234abcd", "iviv12345678abcd", true))
-- des-cbc key length 8, iv length 8
descbc = encrypt("hello", "des-cbc", "1234abcd", "iviv1234")
print(descbc)
print(decrypt(descbc, "des-cbc", "1234abcd", "iviv1234"))
-- des-cbc key length 8, iv length 8
desecb = encrypt("hello", "des-ecb", "1234abcd", "iviv1234")
print(desecb)
print(decrypt(descbc, "des-ecb", "1234abcd", "iviv1234"))
return points
end
例子
采集的数据:
用户统计,平台=京东,性别=男,地区=广东,user_id=4 在线用户数=16920i,下单量=173i 1577113606410407469
采集的数据中有 user_id 这个标签,user_id 表示当前用户的ID
另外有一个 JSON 格式的数据,存储了用户了其他的一些基本信息。
user.json
[
{
"id": 1,
"name": "张三",
"sex": "male",
"email": "zhansan@demo.com"
},
{
"id": 2,
"name": "李四",
"sex": "female",
"email": "lisi@demo.com"
},
{
"id": 3,
"name": "王五",
"sex": "male",
"email": "wangwu@demo.com"
}
]
我希望根据用户ID,找到该用户的 name
、sex
、email
信息,并且将这些信息作为标签合并到采集的数据点中,则 lua 脚本可以这样编写:
-- 读取JSON文件
function fileRead(path)
local file = io.open(path, 'r')
if(file == nil) then
error('read file error')
end
local json = file:read('*a')
file:close()
return json
end
-- 查找用户对象
function findUser(id,userTables)
local findUser = nil;
for i,user in pairs(userTables) do
if(tostring(user.id) == id)
then
findUser = user
break
end
end
return findUser;
end
-- 入口函数
function handle(points)
-- 读取 JSON 文件
local userJson = fileRead('user.json')
-- 解析 JSON 文件
local jsonTable = json_decode(userJson)
for i, v in pairs(points) do
if (v.name == '用户统计' and v.tags.user_id ~= nil) then
local user = findUser(v.tags.user_id,jsonTable)
-- print(json.encode(user))
if(user ~= nil) then
v.tags.name = user.name
v.tags.sex = user.sex
v.tags.email = user.email
end
end
end
return points;
end
在路由上配置好 lua 脚本,重启 DataWay 即可生效