大厂中Flink SQL开发流程及模板

Flink SQL开发链路全流程

先确定源,一般都是Mysql到kafka/mq,通过读binlog获取,也可以直接读后端提供kafka/mq,其次是流表,流表大多数都在中间件中存放,最后落地结果时可落starrocks/doris中。

流程如下:

流表创建

首先需要先创建流表的库就是流表kafka、olap的ods、dwd、dws、ads方便后续操作,再创建配置流表。

实时开发存在3版块内容,1flink sql实现实时sql方式查询,2.实时jar包上传,3.flink cdc来接入来源库

数据源CDC接入

方法1:Binlog获取,DTS配置,生成ODS流表

这里任务名和ODS任务保持一致即可,配置好MQ信息,及数据源(这里数据源指的是mysql实例信息,即连接串)

创建任务后,再进行Topic的订阅,即可把Mysql Binlog同步到MQ

方法2:使用CDC接生成ODS流表

实时表命名

dwd_一级域_二级域_ri

ads_一级主题域_二级主题域_ri

Flink SQL开发

选择flink sql模板创建flink sql任务

由于我们之前已经创建过流表如果需要把流表数据全部导入可以直接使用无代码模式

如果想写flink sql也可以单独去写

配置完flink参数可上线发布

实时运维

实时运维和任务运维一样可以看到当前实时任务在线上运行,具体细节需要flink webui定位,后续单独成一个板块讲。

DWD代码模板

以订单表,主订单为例,任务为dwd_trade_order_detail_ri,明细数据落kafka流表及hive中

--创建Source表
CREATE TABLE ods_trade_trade_order_ri (
    `message`   varchar
) WITH (
  'connector' = 'kafka',
  'topic' = 'YSHOPPING_TRADE_ORDER_CONNECTOR',
  'properties.bootstrap.servers' = 'xxx:9092,xxxx:9092',
  'properties.group.id' = 'GID_YSHOPPING_TRADE_ORDER_CONNECTOR',
  'scan.topic-partition-discovery.interval' = '20s',
  'format' = 'raw',
  'json.fail-on-missing-field' = 'false',
  'scan.startup.mode'='timestamp'--指定开始时间
  'scan.startup.timestamp-millis'='1706630400000'--指定获取数据的开始时间
);


--创建sink数据表
create table dwd_trade_order_detail_ri (
    order_id      varchar, --主订单号
    order_status_code int, --主订单状态code
    order_status_name   varchar, --主订单状态name
    product_amt int , --商品总额分
    freight_amt int  , --运费总额分
    buyer_id varchar,
    create_time datetime,
    modify_time datetime
) WITH ( 
-- 这里接收ods数据就不用再写消费者组了
   'connector' = 'kafka',
  'topic' = 'YSHOPPING_TRADE_ORDER_CONNECTOR',
  'properties.bootstrap.servers' = 'xxx:9092,xxxx:9092',
  'format' = 'raw'
);


create table dwd_trade_order_detail_2hive_ri (
--先在hive建好表
    order_id      varchar, --主订单号
    order_status_code int, --主订单状态code
    order_status_name   varchar, --主订单状态name
    product_amt int , --商品总额分
    freight_amt int  , --运费总额分
    buyer_id varchar,
    create_time varchar,
    modify_time varchar
)
WITH (
'path' = ' hdfs://cluster100/user/bdms/hive_db/yx_dwd.db/dwd_trade_order_detail_ri/ds=2024-10-22',
'krb.keytab' = 'sloth.keytab',
'krb.principal' = **********',
'part.size' = '267386880',
'rollover.interval' = '900000',
'format' = 'parquet',
'compression' = 'none',
'partition.keys' = 'date',
'connector.type'='filesystem',
'connector.property-version'='1',
'update-mode'='append',
'hdfs-site'='hdfs-site.xml',
'core-site'='core-site.xml',
'krb.conf'='krb5.conf',
'is.related.mammunt'='false',
'part.prefix'='ndi-128',
'part.suffix'='-success',
'inactivity.interval'='300000'
);


INSERT INTO dwd_trade_order_detail_ri
SELECT
    COALESCE(json_value(message,'$.orderId'),'')  as order_id
    ,cast(json_value(message,'$.orderStatusCode') as int) as order_status_code
    ,case when json_value(message,'$.orderStatusCode')='1'
          then '未付款'
          when json_value(message,'$.orderStatusCode')='2'
          then '已付款'
          when json_value(message,'$.orderStatusCode')='3'
          then '待收货'
          when json_value(message,'$.orderStatusCode')='4'
          then '已收货'
          when json_value(message,'$.orderStatusCode')='5'
          then '已完结'
          when json_value(message,'$.orderStatusCode')='6'
          then '退货'
     end as order_status_name
    ,COALESCE(cast(json_value(message,'$.productAmt') as int),0) as product_amt
    ,COALESCE(json_value(message,'$.freightAmt'),0) as freight_amt
    ,COALESCE(json_value(message,'$.buyerId'),0) as buyer_id
    ,TO_TIMESTAMP(json_value(message,'$.data'),'yyyy-MM-dd HH:mm:ss') as create_time
    ,TO_TIMESTAMP(json_value(message,'$.data'),'yyyy-MM-dd HH:mm:ss') as modify_time
    ,from_unixtime(cast(substring(JSON_VALUE (CAST (`message` as VARCHAR), '$.ts'),1,10) as bigint)) ts 
from ( 
select split_index(message,'orderLog:',1) as message
from 
ods_trade_trade_order_ri
where message like '%xxxxx%'
)
lateral table (
    json_array_to_map(message, 'data')
  ) as t (data)
where is_del=0
;

INSERT INTO dwd_trade_order_detail_2hive_ri
select order_id
      ,order_status_code
      ,order_status_name
      ,product_amt
      ,freight_amt
      ,buyer_id
      ,cast(create_time as varchar) as create_time
      ,cast(modify_time as varchar) as modify_time      
from dwd_trade_order_detail_ri


全部评论

相关推荐

咦哟,从去年八月份开始长跑,两处实习转正都失败了,风雨飘摇,终于拿到offer了更新一下面试记录:秋招:多部门反复面试然后挂掉然后复活,具体问了啥已经忘了,只是被反复煎炸,直至焦香😋春招:base北京抖音hr打来电话说再次复活,准备面试,gogogo北京抖音一面:六道笔试题:1.promise顺序2.定义域问题3.flat展开4.并发请求5.岛屿数量算法(力扣)深度,广度都写6.忘记了,好像也是算法,难度中等其他问题多是框架底层设计,实习项目重难点~~~秒过😇北京抖音二面:三道笔试题:(为什么只有三道是因为第三道没做出来,卡住了)1.中等难度算法(忘记啥题了,应该是个数组的)2.认识js的继承本质(手写继承模式,深入js的面相对象开发)3.手写vue的响应式(卡在了watch,导致挂掉)---后知后觉是我的注册副作用函数写得有问题,有点紧张了其他题目多是项目拷打,项目亮点,对实习项目的贡献~~~第二天,挂,but立马复活转战深圳客服当天约面深圳客服一面:六道笔试题,由于面过太多次字节,面试官叫我直接写,不用讲,快些写完😋,具体都是些继承,深拷贝(注意对数组对象分开处理,深层次对象,循环引用),加中等难度算法题~~~秒过深圳客服二面:口诉八股大战:大概囊括网络,浏览器渲染原理,动画优化,时间循环,任务队列等等(你能想到的简单八股通通拉出来鞭尸😋)算法题:笔试题6道:1:找出数组内重复的数,arr[0]-arr[n]内的数大小为[1-n],例如[1,2,2,3,3]返回[2,3],要求o(n),且不使用任何额外空间(做到了o(n),空间方面欠佳,给面试官说进入下一题,做不来了)2:原滋原味的继承(所以继承真滴很重要)3:力扣股票购买时机难度中等其他滴也忘记了,因为拿到offer后鼠鼠一下子就落地了,脑子自动过滤掉可能会攻击鼠鼠的记忆😷~~~秒过深圳客服三面:项目大战参与战斗的人员有:成员1:表单封装及其底层原理,使用成本的优化,声明式表单成员2:公司内部库生命周期管理成员3:第三方库和内部库冲突如何源码断点调试并打补丁解决成员4:埋点的艺术成员5:线上项目捷报频传如何查出内鬼成员6:大文件分片的风流趣事成员7:设计模式对对碰成员8:我构建hooks应对经理的新增的小需求的故事可能项目回答的比较流利,笔试题3道,都很简单,相信大家应该都可以手拿把掐😇~~~过过过无hr面后续煎熬等待几天直接hr打电话发offer了,希望大家也可以拿到自己心仪的offer
法力无边年:牛哇,你真是准备得充分,我对你没有嫉妒,都是实打实付出
查看19道真题和解析
点赞 评论 收藏
分享
头像
05-16 11:16
已编辑
东华理工大学 Java
牛客737698141号:盲猜几十人小公司,庙小妖风大,咋不叫她去4️⃣呢😁
点赞 评论 收藏
分享
评论
1
7
分享

创作者周榜

更多
牛客网
牛客企业服务