基于Flink和Drools的實時日志處理
分類:互聯網熱點
編輯:聊聊云計算
瀏覽量:429
2020-07-13 16:55:23
**背景**
日志系統接入的日志種類多、格式復雜多樣,主流的有以下幾種日志:
- Filebeat采集到的文本日志,格式多樣
- Winbeat采集到的操作系統日志
- 設備上報到Logstash的syslog日志
- 接入到Kafka的業務日志
以上通過各種渠道接入的日志,存在2個主要的問題:
- 格式不統一、不規范、標準化不夠
- 如何從各類日志中提取出用戶關心的指標,挖掘更多的業務價值
為了解決上面2個問題,我們基于Flink和Drools規則引擎做了實時的日志處理服務。
**系統架構**
架構比較簡單,架構圖如下:
各類日志都是通過Kafka匯總,做日志中轉。
Flink消費Kafka的數據,同時通過API調用拉取Drools規則引擎,對日志做解析處理后,將解析后的數據存儲到Elasticsearch中,用于日志的搜索和分析等業務。
為了監控日志解析的實時狀態,Flink會將日志處理的統計數據,如每分鐘處理的日志量,每種日志從各個機器IP來的日志量寫到Redis中,用于監控統計。
**模塊介紹**
系統項目命名為Eagle。
eagle-api:基于Spring Boot,作為Drools規則引擎的寫入和讀取API服務。
eagle-common:通用類模塊。
eagle-log:基于Flink的日志處理服務。
重點講一下eagle-log:
**對接kafka、ES和Redis**
對接Kafka和ES都比較簡單,用的官方的connector(flink-connector-kafka-0.10和flink-connector-elasticsearch6),詳見代碼。
對接Redis,最開始用的是org.apache.bahir提供的redis connector,后來發現靈活度不夠,就使用了Jedis。
在將統計數據寫入redis的時候,最開始用的keyby分組后緩存了分組數據,在sink中做統計處理后寫入,參考代碼如下:
```
String name = "redis-agg-log";
DataStream>> keyedStream = dataSource.keyBy((KeySelector) log -> log.getIndex())
.timeWindow(Time.seconds(windowTime)).trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
.process(new ProcessWindowFunction>, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable iterable, Collector>> collector) {
ArrayList logs = Lists.newArrayList(iterable);
if (logs.size() > 0) {
collector.collect(new Tuple2(s, logs));
}
}
}).setParallelism(redisSinkParallelism).name(name).uid(name);
```
后來發現這樣做對內存消耗比較大,其實不需要緩存整個分組的原始數據,只需要一個統計數據就OK了,優化后:
```
String name = "redis-agg-log";
DataStream keyedStream = dataSource.keyBy((KeySelector) log -> log.getIndex())
.timeWindow(Time.seconds(windowTime))
.trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
.aggregate(new LogStatAggregateFunction(), new LogStatWindowFunction())
.setParallelism(redisSinkParallelism).name(name).uid(name);
```
這里使用了Flink的聚合函數和Accumulator,通過Flink的agg操作做統計,減輕了內存消耗的壓力。
**使用Broadcast廣播Drools規則引擎**
1、Drools規則流通過broadcast map state廣播出去。
2、Kafka的數據流connect規則流處理日志。
```
//廣播規則流
env.addSource(new RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1)
.broadcast(ruleStateDescriptor);
//Kafka數據流
FlinkKafkaConsumer010 source = new FlinkKafkaConsumer010<>(kafkaTopic, new LogSchema(), properties);
env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism);
//數據流connect規則流處理日志
BroadcastConnectedStream connectedStreams = dataSource.connect(ruleSource);
connectedStreams.process(new LogProcessFunction(ruleStateDescriptor, ruleBase)).setParallelism(processParallelism).name(name).uid(name);
```
具體細節參考開源代碼。
**小結**
本系統提供了一個基于Flink的實時數據處理參考,對接了Kafka、Redis和Elasticsearch,通過可配置的Drools規則引擎,將數據處理邏輯配置化和動態化。
對于處理后的數據,也可以對接到其他sink,為其他各類業務平臺提供數據的解析、清洗和標準化服務。
> 【云棲號在線課堂】每天都有產品技術專家分享!
> 課程地址:https://yqh.aliyun.com/live
> 立即加入社群,與專家面對面,及時了解課程最新動態!
> 【云棲號在線課堂 社群】https://c.tb.cn/F3.Z8gvnK
原文發布時間:2020-07-09
本文作者: aoxiang
本文來自:“”,了解相關信息可以關注“dockone”
聲明:免責聲明:本文內容由互聯網用戶自發貢獻自行上傳,本網站不擁有所有權,也不承認相關法律責任。如果您發現本社區中有涉嫌抄襲的內容,請發
送郵件至:operations@xinnet.com進行舉報,并提供相關證據,一經查實,本站將立刻刪除涉嫌侵權內容。本站原創內容未經允許不得轉載,或轉載時
需注明出處:新網idc知識百科