离线数据需求自动化Skill
Skill功能
demand-skill解决简单报表类需求投入时间过多问题,通过投入需求背景、指标口径(如有)、数据表来源,MCP信息一键生成sql任务并将表创建(如有),并生成对应的报表样式。
Skill设计思考的点(提示词太长了这里就不写了,把如下内容复制进行生成)
1.首先这个Skill只用于SQL代码生成,如不设计涉及运行去看数据情况,也不需要开发运行的python脚本,如需要完整的检验及字段信息校验那需要连接mcp,或者提供账号密码及连接地址,最终在数据平台生成任务。
2.需要考虑数据源支持类型,列举一下,如用户没提供该类信息则默认使用Spark SQL
- Hive SQL
- Spark SQL
- Flink SQL
- MaxCompute SQL
3.我们思考一下功能,一个需求开发需要提供哪些信息,可以按照mrd来配置
背景:讲清楚当前业务背景及要解决什么问题
交付物:表、报表等
指标口径:指标名称、指标中文名、指标业务口径、指标技术口径
数据来源:来源的表结构、关联关系、枚举值等
建模DWD考虑:如建设dwd中涉及枚举字段则需要生成对应的code(为原字段)、name,如dwd中存在id_del/delete字段,则需要在where里面进行限制,区分事实表和维度表(事实表为用户xxx行为,维度表为xxx属性),维度表需要优先从ods表开发至dim表,同时ods事实表需要left join dim表才能生成dwd表
建模DWS/ADS考虑:开发dws时将指标按照1d/30d/90d等且进行同粒度进行划分,将指标放入对应的dws表,如没提到关联操作对于mrd查询多表情况需要优先搜索2张表里同名字段进行关联,如没有同名字段,则将更符合的字段进行关联
分区考虑:同时对于分区表使用谓词下推,同时分区获取和上述要求一致,增量分区筛选:对于_di/_hi/_ri等结尾的增量表需要考虑筛选表分区(根据时间来定),全量(如_df/_hf)优先匹配用户描T+1、H+1的分区,如没写分区要求则取最大分区
4.输出内容形式
输出内容:按照mrd内容输出报表样式、insert 表的建表语句、sql语句,如给的数据来源为ods表则需要提供dwd、ads建表语句及代码,并生成对应的报表样式
5.数仓规范要求
元数据规范
表命名
ODS层(接入层)
ods__{业务数据库名}_{业务数据表名}_存储策略(df/di)
DWD层(明细层)
dwd_{一级数据域}_{二级数据域}_{业务过程(不清楚或没有写detail)}_存储策略(df/di)
DWS层(汇总层)
dws_{一级数据域}_{二级数据域}_{颗粒度}(例如买家/卖家)_{业务过程(不清楚或没有写target)}_{周期}(例如近30天写30d、90天写3m)
ADS层(应用层)
ads_{一级应用主题}_{二级应用主题}_{颗粒度}(例如买家/卖家)_{业务过程}
dim表(维度表)
dim__{维度定义}_存储策略(df/di)(例如日期写date)
tmp表(临时表)
tmp_{表名}_{临时表编号}
view(视图)
view_{表名}
备份表
{表名}_bak
存储策略
i:increment,f:full
颗粒度
统计周期
1d 近1天指标统计
1m 近1个月指标统计
1y 近1年指标统计
3m 近3个月指标统计
6m 近6个月指标统计
nd 无法确定具体天可用nd替代
td 历史累计昨日
字段命名
是否xxxxx用户,类型字段命名规范
is_{内容}
枚举值类型字段命名规范
xxxx_type
时间戳类型字段命名规范
xxx_date,xxx_time
百分比命名
xxx(维度,可不加)_原子指标_rate_周期(1d、30d(最近30天)、td(截止至今)、mtd)
数值类型(小数)金额命名
xxx(维度,可不加)_原子指标_amt_周期(1d、30d(最近30天)、td(截止至今)、mtd)
数值类型(整数)金额命名
xxx(维度,可不加)_原子指标_cnt/days_周期(1d、30d(最近30天)、td(截止至今)、mtd)
最近一次相关指标
xxx(维度,可不加)_原子指标_cnt/amt_周期(1d、30d(最近30天)、td(截止至今)、mtd、last1(最近一次))
两个时间段之间统计命名
xxx(时间段1内行为)_xxx(时间段2内操作)_dur
字段类型
文本 string
日期 string
整数 bigint
小数
高精度金额用decimal
正常使用double
枚举值
单枚举1/0-bigint
多枚举-string
各类id string
任务命名
传输任务
数据源: 生产端2消费端_库名_表名 mysql2hive_ods_xx_xxx
hive :生产端2消费端_表名 hive2doris_xx
开发任务
任务名与表名一致
子模块且为sql模块
转换处理:trans_表名
备份:bak_表名
开发:sql_表名
子模块为java/python
java/python_{操作内容}_{表名}
模型分区生命周期
全量表
ODS:小时全量(3天)、天级全量(7~90天)
DWD:小时全量(7~15天)、天级全量(7~30天)
DWS:小时全量(7~30天)、天级全量(7~60天)
ADS:小时全量 (7~30天)、天级全量(7~90天)
DIM:小时全量(3天)、天级全量(90~360天)
增量表
ODS:小时增量(30~180天)、天级增量(90~365天 or 永久)
DWD:小时增量(60~180天)、天级增量(180~365天)
DWS:小时增量(60~365天)、天级增量(180~730天)
ADS:小时增量(90~365天)、天级增量(180~730天)
DIM :默认使用全量表
如用户没指定数据域则按照如下方式进行划分
数据域划分
一级域交易, 二级域订单,trade_order,表名dwd_trade_order_detail_di
一级域交易, 二级域退货,trade_refund,表名dwd_trade_refund_order_detail_di
一级域客服, 二级域工单,cs_ticket,表名dwd_cs_ticket_detail_di
一级域客服, 二级域理赔,cs_repay,表名dwd_cs_repay_bill_detail_df
一级域优惠券,coupon,表名dwd_coupon_send_detail_di
6.Spark SQL数据倾斜skill放入其中
7.案例
各Agent token消耗监控报表mrd
1.需求背景
搭建大模型平台,通过大模型对各业务线运营分析工作中(主要要覆盖异步场景)需要泛化理解、重复性较高的环节进行自动化、智能化替代,数开团队配合大模型平台团队开发模型监控数据资产,监控各Agent token消耗,防止因高额消耗问题出现。
2.数据源
CREATE TABLE IF NOT EXISTS ods_ai_workflow_workflow_runs_df(
id STRING,
tenant_id STRING comment '租户id',
app_id STRING comment 'agent 应用id',
sequence_number STRING comment '序列号',
workflow_id STRING comment '工作流id',
`type` STRING commnt '工作流运行类型',
triggered_from STRING comment '触发来源',
`version` STRING comment '版本',
graph STRING comment '图',
inputs STRING comment '传参',
`status` STRING comment '工作流运行状态',
outputs STRING comment '输出',
error STRING comment '报错',
elapsed_dur DOUBLE comment'耗时',
total_tokens BIGINT comment '总计tokens',
total_steps BIGINT '总计步数',
create_user_role STRING comment '创建者角色',
create_user STRING comment '创建者',
create_time STRING comment '创建时间',
finish_time STRING comment '完成时间',
exceptions_cnt BIGINT comment '异常数'
)
COMMENT 'ods-大模型-工作流执行-全量表'
PARTITIONED BY (`pt` STRING COMMENT '业务日期')
STORED AS PARQUET
TBLPROPERTIES ('table.source'='自定义', **********', 'SYNC_METASTORE'='on','7d');
CREATE TABLE IF NOT EXISTS ods_dwd_risk_ai_apps_df(
id STRING,
tenant_id STRING comment '租户id',
name STRING comment '名称',
mode STRING comment '模式',
icon STRING comment 'icon',
is_able bigint comment '是否可用',
create_time string comment '创建时间',
update_time string comment '更新时间',
workflow_id STRING comment '工作流id',
description STRING comment '描述',
create_user STRING comment '创建者'
)
COMMENT 'ods-大模型-agent-全量表'
PARTITIONED BY (`pt` STRING COMMENT '业务日期')
STORED AS PARQUET
TBLPROPERTIES ('table.source'='自定义', **********', 'SYNC_METASTORE'='on','7d');
3.指标口径
指标名:agent_invoke_cnt_1d,指标comment:大模型调用数-最近一天,指标口径:最近一天workflow_runs调用次数,sql口径:count(workflow_runs里字段id)
指标名:invoke_tokens_cnt_1d,指标comment:大模型调用tokens数-最近1天,指标口径:最近一天workflow_runs下total_tokens消耗量,sql口径:sum(workflow_runs里字段total_tokens)
4.维度
ods_dwd_risk_ai_apps_df里应用名称、date_id取ods_ai_workflow_workflow_runs_df的create_time
5.输出案例
建表语句
CREATE TABLE IF NOT EXISTS yshopping.dwd_ai_workflow_workflow_runs_detail_df(
workflow_runs_id STRING,
tenant_id STRING comment '租户id',
app_id STRING comment 'agent 应用id',
sequence_id STRING comment '序列号',
workflow_id STRING comment '工作流id',
`workflow_runs_type_code` string commnt '工作流运行类型',
workflow_runs_triggered_from_code STRING comment '触发来源',
`version` STRING comment '版本',
graph STRING comment '图',
inputs STRING comment '传参',
`workflow_runs_status_code` STRING comment '工作流运行状态',
outputs STRING comment '输出',
error STRING comment '报错',
elapsed_dur DOUBLE comment'耗时',
total_tokens BIGINT comment '总计tokens',
total_steps BIGINT '总计步数',
create_user_role STRING comment '创建者角色',
creator_id STRING comment '创建者',
create_time STRING comment '创建时间',
finish_time STRING comment '完成时间',
exceptions_cnt BIGINT comment '异常数',
biz_id STRING comment '业务id',
)
COMMENT 'dwd-大模型-工作流执行-全量表'
PARTITIONED BY (`pt` STRING COMMENT '业务日期')
STORED AS PARQUET
TBLPROPERTIES ('table.source'='自定义', **********', 'PARTITION_LIFECYCLE'='30d', 'SYNC_METASTORE'='on');
代码
INSERT OVERWRITE TABLE yshopping.dwd_ai_workflow_workflow_runs_detail_df PARTITION(pt='${bizdate}')
select ,id as workflow_runs_id--STRING,
,tenant_id --STRING comment '租户id',
,app_id --STRING comment 'agent 应用id',
,sequence_number as sequence_id--STRING comment '序列号',
,workflow_id --STRING comment '工作流id',
,`type` as workflow_runs_type_code--STRING commnt '工作流运行类型',这个我不太想转换了
,triggered_from as workflow_runs_triggered_from_code--STRING comment '触发来源',这个我不太想转换了
,`version` --STRING comment '版本',
,graph --STRING comment '图',
,inputs --STRING comment '传参',
,`status` as workflow_runs_status_code--STRING comment '工作流运行状态',这个我不太想转换了
,outputs --STRING comment '输出',
,error --STRING comment '报错',
,elapsed_dur --DOUBLE comment'耗时秒',
,total_tokens --BIGINT comment '总计tokens'
,total_steps --BIGINT '总计步数',
,create_user_role --STRING comment '创建者角色',
,create_user as creator_id--STRING comment '创建者',
,from_unixtime(unix_timestamp(create_time) + 28800) as create_time--由于大模型执行底层时间差8小时因此需要补充转换成+8小时后时间
,from_unixtime(unix_timestamp(finish_time) + 28800) as finish_time--由于大模型执行底层时间差8小时因此需要补充转换成+8小时后时间
,exceptions_cnt --BIGINT comment '异常数'
,get_json_object(input,'$.id') as biz_id
from yshopping.ods_ai_workflow_workflow_runs_df
where pt='${bizdate}'
;
建表语句
CREATE TABLE IF NOT EXISTS yshopping.dim_ai_workflow_app_df(
app_id STRING,
tenant_id STRING comment '租户id',
app_name STRING comment '名称',
mode_code STRING comment '模式',
icon STRING comment 'icon',
is_able bigint comment '是否可用',
create_time string comment '创建时间',
update_time string comment '更新时间',
workflow_id STRING comment '工作流id',
description STRING comment '描述',
creator_id STRING comment '创建者'
)
COMMENT 'ods-大模型-agent-全量表'
PARTITIONED BY (`pt` STRING COMMENT '业务日期')
STORED AS PARQUET
TBLPROPERTIES ('table.source'='自定义', **********', 'SYNC_METASTORE'='on','7d');
代码
INSERT OVERWRITE TABLE `yshopping`.`dim_ai_workflow_app_df` PARTITION(pt= '${bizdate}')
select t0.id as app_id --STRING,
,t0.tenant_id --STRING comment '租户id',
,t0.name as app_name--STRING comment '名称',
,t0.mode as mode_code --STRING comment '模式',
,t0.icon --STRING comment 'icon',
,t0.is_able --bigint comment '是否可用',
,t0.create_time --string comment '创建时间',
,t0.update_time --string comment '更新时间',
,t0.workflow_id --STRING comment '工作流id',
,t0.description --STRING comment '描述',
,t0.create_user as creator_id--STRING comment '创建者'
from yshopping.ods_ai_workflow_app_df t0
where t0.pt='${bizdate}'
建表语句
CREATE TABLE IF NOT EXISTS ads_ai_workflow_invoke_target(
app_name STRING comment '应用名称',
date_id STRING comment '日期yyyy-mm-dd',
agent_invoke_cnt_1d bigint comment '大模型调用数-最近1天',
invoke_tokens_cnt_1d bigint comment '大模型调用tokens数-最近1天'
)
COMMENT 'ads-大模型-工作流域-大模型调用监控指标'
PARTITIONED BY (`pt` STRING COMMENT '业务日期')
STORED AS PARQUET
TBLPROPERTIES ('table.source'='自定义', **********', 'SYNC_METASTORE'='on','7d');
代码
INSERT OVERWRITE TABLE yshopping.ads_ai_workflow_invoke_target PARTITION(pt='${bizdate}')
select
t1.app_name
,substr(t0.create_time,1,10) as date_id
,count(t0.workflow_runs_id) as agent_invoke_cnt_1d
,sum(t0.total_tokens) as invoke_tokens_cnt_1d
from yshopping.dwd_ai_workflow_workflow_runs_detail_df t0
left join
yshopping.dim_ai_workflow_app_df t1
on t0.app_id=t1.app_id
and t1.pt='${bizdate}'
where t0.pt='${bizdate}'
group by t1.app_name
,substr(t0.create_time,1,10)