基于flink sql构建报警系统的若干技术点


声明:本文转载自https://my.oschina.net/qiangzigege/blog/3035897,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

1)选择滑动窗口

滑动窗口会导致一个时间点的数据会分布到多个window里来,其它跟滚动窗口没区别

2)滑动数据重新汇聚

计算源是单个时间窗口内预汇聚的数据,不适用于滑动窗口的计算值,比如前面单个窗口计算出来p90,那么滑动窗口的p90怎么算?

需要把前面的时间窗口的现场保留下来传到当前滑动窗口内才可以继续计算,具体代码如下:

//拿到原始数据后,重新计算
    @SuppressWarnings("unchecked")
    public void accumulate(Object value) {
        if (null == value) {
            return;
        }
        HashMap<String, Object> mapVal = (HashMap<String, Object>) value;
        //1)在历史值和当前值中设置新的max值
        Object maxObj = mapVal.get("max");
        Object minObj = mapVal.get("min");
        Object countObj = mapVal.get("count");
        Object sumObj = mapVal.get("sum");
        Object afObj = mapVal.get("af");
        Object ccObj = mapVal.get("cc");
        if (null == maxObj || null == minObj || null == countObj || null == sumObj || null == afObj
            || null == ccObj) {
            return;
        }
        if (false == (ccObj instanceof JSONArray)) {
            return;
        }
        JSONArray jsonArray = (JSONArray) ccObj;
        int jsonSize = jsonArray.size();
        if (0 == jsonSize) {
            return;
        }

        this.max = Math.max(this.max, ((Number) maxObj).doubleValue());
        this.min = Math.min(this.min, ((Number) minObj).doubleValue());
        this.count += ((Number) countObj).intValue();
        this.sum += ((Number) sumObj).doubleValue();
        //每次都是直接替换
        this.augmentFactor = ((Number) afObj).intValue();
        //5)开始汇总以便后面计算各种95线之类的值
        Integer[] intArray = new Integer[jsonSize];
        intArray = jsonArray.toArray(intArray);
        for (int index = 0; index < jsonSize; index++) {
            countContainer[index] += intArray[index];
        }
        //6)over
    }

3)海量tag的发现

如果上传的metric tag数据非常多,怎么去重是个问题,我采取的方案是

3.1)使用采样率

    @Override
    public void filter(String metric, TreeMap<String, String> tagValues, boolean tagValuesEmpty) {
        Random randomGenerator = RANDOM_THREAD_LOCAL.get();
        if (0 == randomGenerator.nextInt(20)) {
            //取5%的采样率
            ReportQueue.put(ReportQueue.METRIC_TAG,
                new MetricAndTags(metric, tagValues, tagValuesEmpty));
            //结束
        } else {

        }
    }

3.2)布隆过滤器判重

    //普通数据-bloom filter
    private static Integer     SIZE                  = 100 * 1000 * 1000;
    private static Integer     BITS                  = 20;
    private static Integer     HASH_FUNCTION         = 1;
    private static BloomFilter DATA_BLOOM_FILTER     = new BloomFilter(SIZE, BITS, HASH_FUNCTION);
    //哈希code-bloom filter
    private static BloomFilter HASHCODE_BLOOM_FILTER = new BloomFilter(SIZE, BITS, HASH_FUNCTION); 


   public static synchronized boolean isNewKey(String data) {
        Key dataKey = new Key(data.getBytes());
        if (false == DATA_BLOOM_FILTER.membershipTest(dataKey)) {
            //不存在就是真的不存在
            return true;
        }
        //再做hashcode的2次判断
        if (false == HASHCODE_BLOOM_FILTER.membershipTest(hashCodeKey(data))) {
            //不存在就是真的不存在
            return true;
        }
        //(如果2次都说存在,也没办法了,这条数据丢弃)
        //返回false表示不是new key
        return false;
    }

3.3)元数据幂等性保存到es

注意幂等性,之前存在的数据会被更新,而不是新增一条数据,因为我们是保存元数据

具体就是设置请求体里的upsert为true

@Data
public class ExecutionMetricTagValue {
    private Boolean doc_as_upsert=true;
    private EsMetricTagValue doc;
}

3.4)限流防对远程ES的流量冲击

这个是构建一个 Guava对象

private static final RateLimiter RATE_LIMITER = RateLimiter.create(500);

//在JVM级别限流,防止对ES产生冲击
RATE_LIMITER.acquire(1);

 

 

4)用户配置数据拉取

用户配置的一些规则,通过另外一个JVM级别的线程拉取到本地内存,这样就可以不影响flink的计算速度
 

5)报警屏蔽周期

这是为了防止报警洪灾,实现思路

                String res;
                try {
                    SetParams setParams = new SetParams();
                    setParams.nx();
                    setParams.ex(alarmInterval);
                    //仅仅是当前timeSpan内有效,10s不影响30s 1m这种
                    res = JEDIS_CLUSTER.set(timeSpan + "_hubble_alarm_" + fullKey, "1", setParams);
                } catch (Exception e) {
                    LOG.error(e.toString());
                    return;
                }
                if (null != res) {
                    //LOG.info("初次插入,可以报警");

主要就是这些,很难的点没有,就是要注意各个细节

-----------------------------------------------------

其实我觉得报警系统的精髓在于阈值的设置上,傻乎乎的设置静态值是没有技术含量的,整个报警系统的精髓就在于自动设置报警阈值

-----------------------------------------------------

 

下面放界面图

 

本文发表于2019年04月12日 12:00
(c)注:本文转载自https://my.oschina.net/qiangzigege/blog/3035897,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1755 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1