search
数据采集 采集器 DataWay 基于Lua脚本数据处理

通过 Lua 脚本处理数据

简介

DataWay 支持引入 Lua 脚本对数据进行处理后再上报到 DataFlux 中心。当你有下列需求时可考虑使用 Lua 脚本:

  • 需要对采集的数据进行清洗,比如添加、修改、删除标签,甚至删除整个指标集
  • 需要将第三方数据源(比如 MySQL、Postgres、SQL Server、API 请求、CSV 文件、JSON 文件)中数据合并到采集的数据点然后在上报 DataFlux 中

在 DataWay 中引入 Lua 脚本

DataWay 中支持 2 种作用域的 Lua 脚本:global lua 和 route lua。

global lua

全局 lua 程序,独立于所有功能之外,可设置定时执行该脚本,多用于更新全局 cache 数据,比如需要定时请求某个 API 接口,获取某个采集对象的属性信息并缓存到 DataWay 中。

配置方式

打开 DataWay 的配置文件 dataway.yaml,配置 global_lua 项,其中 path 为 lua 文件路径,circle 为当前文件的执行周期,例如:

global_lua:
    - path: xxx.lua
        circle: 0 * * * 0,6

    - path: xxx.lua
        circle: 0 * * * 0,6

circle 使用 Unix Cron 格式,支持 ”秒、分、时、天、月”,具体如下表:

Field name Mandatory? Allowed values Example to
Seconds Yes 0-59 31 * * * *
Minutes Yes 0-59 0 25 * * *
Hours Yes 0-23 0 0 17 * *
Day of month Yes 1-31 0 0 0 12 *
Month Yes 1-12 or JAN-DEC 0 0 0 0 9

route lua

路由级别的 lua 脚本,一个路由可配置多个 lua 脚本,可对上报到路由的数据依次执行 Lua 脚本

配置方式

打开 DataWay 的配置文件 dataway.yaml,配置文件 routes _config 项

# route config
routes_config:
    - name: default
        lua :
          - path: default01.lua
          - path: default02.lua
          - path: default03.lua

    - name: test
        lua :
          - path: default01.lua
          - path: default02.lua
          - path: default03.lua

注意:任意一组执行出错,将不再执行下一组,并丢弃此次数据内容不再上报,对执行错误写入日志

编写 Lua 脚本

在配置文件中指定 lua script 文件路径,该文件内容格式为:

function handle(points)

    -- lua code

    return points
end

其中:

  • function handle(points)为整个 script 的执行入口,所有的 lua 脚本都在该函数内部,points为传入的数据参数

  • points类型为 table,类似于c语言系的struct[],其中struct[]的各个成员变量分别是name, tags, fields, time,详细介绍如下:

    • name string,本组数据的存储表名,一般不会改变
    • tags table,标签组,该 table 中只允许存放 string 类型的数据
    • fields table,数据组,可以存放 string/number/bool 类型的数据
    • time 本条记录的时间戳,类型 number ,一般不会改变
  • points类型为数据所用类型,handle 函数传入 points,函数结束返回 points,points 格式固定,但支持用户自定义其中的字段内容,这也是 lua 脚本的核心功能

  • points示例

points
{
    {
        name = "t_name",
        tags = {
            t1 = "tags_01",
            t2 = "tags_02"
        },
        fields = {
            f1 = "fields_01",
            f2 = 12345,
            f3 = true
        },
        time = 1575970133841355008 
    },
    {
        name = "t_name",
        tags = {
            t1 = "tags_03",
            t2 = "tags_04"
        },
        fields = {
            f1 = "fields_02",
            f2 = 66666,
            f3 = false
        },
        time = 1575970133841355008 
    }
}
  • 在函数末尾依照 lua 语法添加 end

内置函数

为了方便用户进行处理,DataWay 中的 lua 脚本内置了一些常用函数库

http_request

函数签名:

response_table, error_string = http_request(method_string, URL_string, headers_table {
                                        query_string,
                                        headers_table,
                                        body_string
                                    })

query,headers,body如果不存在可以不写

示例:

function handle(points)

    print("http test:")

    response, error = http_request("GET", "http://www.baidu.com", {
        query="",
        headers={
            Accept="*/*"
        },
        body="test123"
    })

    if error == nil then
        print(response.body)
        print(response.status_code)
    else
        print(error)
    end

    return points
end

函数有2个返回值,response 和 error,如果函数执行错误,则 response 为 nil,error 为 string 类型的错误说明。

response 是 table 类型,各个字段名和类型分别为:

  • headers (table)
  • cookies (table)
  • status_code (number)
  • url (string)
  • body (string)
  • body_size (number)

数据库连接

sql 采用“数据库+连接信息”的方式进行连接,现支持的数据库和对应的信息模板如下:

database name connect information format
mysql username:password@protocol(address)/dbname?param=value
postgres postgres://user:password@localhost/dbname?sslmode=disable&param=value
mssql sqlserver://username:password@host:port?database=dbname&param=value

sql_connect 接收的类型为连接数据库所用的 table,返回一条连接和错误信息。该连接含有一个成员函数 query,参数为查询语句,支持占位符操作,例如:

function handle(points)
    print("mysql test:")
    conn, err = sql_connect("mysql", "root:123456@tcp(192.168.0.2:3306)/test_mysql?charset=utf8")
    if err ~= "" then
        res, err = conn:query('SELECT * FROM students where year>?;', 1986)
        if res ~= nil then
            for _, row in pairs(res) do
                for k, v in pairs(row) do
                    print(k, v)
                end
            end
        else
            print(err)
        end
    else
        print(err)
    end
    conn:close()

    return points
end

注:该连接使用结束后,需调用close()关闭连接

json 编码和解析

json_decode 解析字符串为table json_encode 将 table 或 string 转为 json 格式的字符串

function handle(points)
    json_str = '{ "hostname":"ubuntu18.04LTS", "date":"2019年12月10日 星期二 11时14分47秒 CST", "ip":["127.0.0.1","192.168.0.1","172.16.0.1"] }'

    print("json_str:", json_str)
    json_table = json_decode(json_str)

    print(json_table)

    for k, v in pairs(json_table) do
        print(k, v)
    end
    for _, v in pairs(json_table["ip"]) do
        print(, v)
    end
    return points
end

xml 解析

xml_decode 将 xml 格式字符串解析为 table

function handle(points)
        xml_str ="<booklist><book>100</book><book>100.5</book><book>200</book></booklist>"
    print("xml test:", xml_str)
    xml_table = xml_decode(xml_str)

    for _, row in pairs(xml_table) do
        for k, v in pairs(row) do
            for _, vv in pairs(v) do
                print(vv)
                print("number add 1", vv+1)
            end
        end
        print("----------------")
    end

   return points
end

数值类型的解析优先级高于字符型,示例代码中100、100.5、200都解析成 Number 类型,可以进行数值操作(例如 +1)。

csv 解析

csv_decode 将 csv 格式字符串解析为N维 table ,N为csv行数减1

table 的 key 与首行 header 对应

function handle(points)
    csv_str = "name,year,address\nAA,22, NewYork\nBB, 21, Seattle"
    print("csv test:", csv_str)
    csv_table = csv_decode(csv_str)

    for _, row in pairs(csv_table) do
        for k, v in pairs(row) do
            print(k, v)
        end
        print("----------------")
    end

    return points
end

标准 csv 格式,分隔符为英文状态下逗号“,”

cache 缓存数据

KV 缓存,key 为 string 类型,value 为 string/number/table 任意类型

  • cache_set 缓存写入
  • cache_get 缓存读取,如果没有找到对应的 key,则返回 nil
  • cache_list 缓存队列中所有的key值,类型为 table,可使用#listtable.getn(list)得到该 list 的长度
function handle(points)

    cache_set("AAA", "hello,world")
    cache_set("BBB", 123456)
    cache_set("CCC", true)
    cache_set("DDD", { host = '10.100.64.106', port = 13306, database = 'golua_mysql', user = 'root', password = '123456' })

    list = cache_list()
    print("cache_list:")
    for k,v in pairs(list) do 
        print(k, v)
    end
    print("cache key list: ", #list)
    print("cache key list: ", table.getn(list))

    print("AAA: ", cache_get("AAA"))
    print("BBB: ", cache_get("BBB"))
    print("CCC: ", cache_get("CCC"))

    print("DDD --------")
    dd = cache_get("DDD")
    for k,v in pairs(dd) do
        print(k, v)
    end

    return points
end

Regex 正则表达式

  • re_quote 转义
  • re_find 查找
  • re_gsub 替换
  • re_match 匹配

示例:

function handle(points)
    -- quote ----------

    print(re_quote("^$.?a"))
    -- "\^\$\.\?a"

    -- find ----------

    print(re_find('', ''))
    -- 1   0

    print(re_find("abcd efgh ijk", "cd e", 1, true))
    -- 3   6

    print(re_find("abcd efgh ijk", "cd e", -1, true))
    -- nil

    print(re_find("abcd efgh ijk", "i([jk])"))
    -- 11  12  "j"

    -- gsub ----------

    print(re_gsub("hello world", [[(\w+)]], "${1} ${1}"))
    -- "hello hello world world"  2

    print(re_gsub("name version", [[\w+]], {name="lua", version="5.1"}))
    -- "lua-5.1.tar.gz"  2

    print(re_gsub("name version", [[\w+]], {name="lua", version="5.1"}))
    -- "lua 5.1"  2

    print(re_gsub("$ world", "\\w+", string.upper))
    -- "$ WORLD"  1

    print(re_gsub("4+5 = $return 4+5$", "\\$(.*)\\$", function (s)
            return loadstring(s)()
        end))
    -- "4+5 = 9"  1

    -- match ----------

    print(re_match("$$$ hello", "z"))
    -- nil

    print(re_match("$$$ hello", "\\w+"))
    -- "hello"

    print(re_match("hello world", "\\w+", 6))
    -- "world"

    print(re_match("hello world", "\\w+", -5))
    -- "world"

    print(re_match("from=world", "(\\w+)=(\\w+)"))
    -- "from" "world"

    return points
end

crypto 加密

加密解密以及 base64 转换

function signature:

  • base64_encode(data_str)
  • base64_decode(data_str)
  • hex_encode(data_str)
  • hmac(method_str, data_str, key_str, notHexString_bool)

method 为方法枚举 { "md5", "sha1", "sha256", "sha512" } 最后一个参数notHexString是 bool 类型,含义是“不进行编码为字符串操作”,默认是false,即对输入或输出进行编码为字符串的操作

  • encrypt(data_str, method_str, key_str, iv_str, notHexString)

method 为方法枚举 { "aes_cbc", "des_cbc", "des_ecb" }

当方法为aes_cbc时,key 的长度可以是 16,24,32,iv 向量长度为 16

当方法为des_cbcdes_ecb时,key 的长度可以是 8,iv 向量长度为 8

第三个形参和hmac()的 notHexString 意义相同。

  • decrypt(data_str, method_str, key_str, iv_str, notHexString)

    同上

示例:

function handle(points)
    print("----------- base64 ------------")
    b64 = base64_encode("hello,world!")
    print(b64)
    print(base64_decode(b64))

    print("---------- hex encode -----------")
    print(hex_encode("Hello"))

    print("----------- crc32 ------------")
    print(crc32("hello,world!"))

    print("----------- hmac ------------")
    print(hmac("md5", "hello", "world"))
    print(hmac("sha1", "hello", "world", true))
    print(hmac("sha256", "hello", "world"))
    print(hmac("sha512", "hello", "world", true))

    print("----------- encrypt/decrypt ------------")

    -- aes-cbc  key length 16,24,32, iv length 16
    aescbc = encrypt("hello", "aes-cbc", "1234abcd1234abcd", "iviv12345678abcd", true)
    print(aescbc)
    print(decrypt(aescbc, "aes-cbc", "1234abcd1234abcd", "iviv12345678abcd", true))

    -- des-cbc key length 8, iv length 8
    descbc = encrypt("hello", "des-cbc", "1234abcd", "iviv1234")
    print(descbc)
    print(decrypt(descbc, "des-cbc", "1234abcd", "iviv1234"))

    -- des-cbc key length 8, iv length 8
    desecb = encrypt("hello", "des-ecb", "1234abcd", "iviv1234")
    print(desecb)
    print(decrypt(descbc, "des-ecb", "1234abcd", "iviv1234"))

    return points
end

例子

采集的数据:

用户统计,平台=京东,性别=男,地区=广东,user_id=4 在线用户数=16920i,下单量=173i 1577113606410407469

采集的数据中有 user_id 这个标签,user_id 表示当前用户的ID

另外有一个 JSON 格式的数据,存储了用户了其他的一些基本信息。

user.json

[
    {
        "id": 1,
        "name": "张三",
        "sex": "male",
        "email": "zhansan@demo.com"
    },
    {
        "id": 2,
        "name": "李四",
        "sex": "female",
        "email": "lisi@demo.com"
    },
    {
        "id": 3,
        "name": "王五",
        "sex": "male",
        "email": "wangwu@demo.com"
    }
]

我希望根据用户ID,找到该用户的 namesexemail 信息,并且将这些信息作为标签合并到采集的数据点中,则 lua 脚本可以这样编写:

-- 读取JSON文件
function fileRead(path)
    local file = io.open(path, 'r')
    if(file == nil) then
        error('read file error')
    end
    local json = file:read('*a')
    file:close()
    return json
end

-- 查找用户对象
function findUser(id,userTables)
    local findUser = nil;
    for i,user in pairs(userTables) do
        if(tostring(user.id) == id)
        then
            findUser = user
            break
        end
    end
    return findUser;
end    

-- 入口函数
function handle(points)

    -- 读取 JSON 文件
    local userJson = fileRead('user.json')

    -- 解析 JSON 文件
    local jsonTable = json_decode(userJson)

    for i, v in pairs(points) do
        if (v.name == '用户统计' and v.tags.user_id ~= nil) then
            local user = findUser(v.tags.user_id,jsonTable)
            -- print(json.encode(user))
            if(user ~= nil) then
                v.tags.name = user.name
                v.tags.sex = user.sex
                v.tags.email = user.email
            end    
        end
    end
    return points;
end

在路由上配置好 lua 脚本,重启 DataWay 即可生效