数据采集-批量处理

为了提升系统性能,对一些频繁的操作,使用数据采集本地缓冲、后续批量处理的模式。将这个模式做成一个可插拔的组件。

顶层接口设计

数据收集器 DataCollector

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
/**
 * 数据收集器。
 */
public interface DataCollector<T> {

    /**
     * 数据收集
     *
     * @param data 数据
     */
    void collect(T data);
}

数据批量处理器 DataBatchProcessor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 * 数据批量处理器。
 */
public interface DataBatchProcessor<T> {
    /**
     * 数据处理批次大小。
     *
     * @return 默认100。
     */
    default int getBatchSize() {
        return 100;
    }

    /**
     * 数据处理间隔。
     *
     * @return 默认1秒。
     */
    default Duration getInterval() {
        return Duration.ofSeconds(1);
    }


    /**
     * 数据批量处理
     * 实现这个方法尽量不要使用耗时较长的逻辑,否则可能会影响数据收集。如果业务逻辑耗时较长,请使用线程池异步处理。
     *
     * @param dataList 数据列表
     */
    void process(List<T> dataList);
}

实现

抽象实现 AbstractDataCollector

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Slf4j
public abstract class AbstractDataCollector<T> implements DataCollector<T>, DataBatchProcessor<T> {
    /**
     * 数据采集队列
     */
    private final LinkedBlockingDeque<T> dataCollector = new LinkedBlockingDeque<>();
    /**
     * 上次执行时间
     */
    private volatile long lastCallTimeMills;

    /**
     * 数据收集,禁止重写
     *
     * @param data 数据
     */
    @Override
    public final void collect(T data) {
        dataCollector.add(data);
    }


    void callProcess() {
        if (dataCollector.size() < getBatchSize() && System.currentTimeMillis() - lastCallTimeMills < getInterval().toMillis()) {
            return;
        }
        List<List<T>> list = pollInBatches(getBatchSize());
        if (ObjectUtils.isEmpty(list)) {
            lastCallTimeMills = System.currentTimeMillis();
            return;
        }
        StringBuilder logStr = new StringBuilder("\r\n数据收集器 ").append(this.getClass().getSimpleName()).append(" ")
                .append(getBatchSize()).append(" ").append(getInterval()).append("\r\n");
        int no = 0;
        int count = 0;
        for (List<T> ts : list) {
            try {
                long t1 = System.currentTimeMillis();
                process(ts);
                long t2 = System.currentTimeMillis();
                count += ts.size();
                logStr.append("批次 ").append(++no).append(" 共 ").append(ts.size()).append(" 条数据,耗时:").append(t2 - t1).append("ms.\r\n");
            } catch (Exception ex) {
                log.info("数据收集器执行异常 " + ex.getMessage() + " , 异常数据: " + JSON.toJSONString(ts), ex);
            }
        }
        log.debug(logStr + "本次数据采集、批量处理任务完成,共 {} 个批次,共处理 {} 条数据", list.size(), count);
        lastCallTimeMills = System.currentTimeMillis();
    }

    private List<List<T>> pollInBatches(int batchSize) {
        List<List<T>> list = new ArrayList<>();
        T t;
        List<T> midList = new ArrayList<>();
        while ((t = dataCollector.poll()) != null) {
            if (midList.size() == batchSize) {
                list.add(midList);
                midList = new ArrayList<>();
            }
            midList.add(t);
        }
        if (!ObjectUtils.isEmpty(midList)) {
            list.add(midList);
        }
        return list;
    }
}

任务执行处理器 DataBatchProcessSupport

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
 * 数据批量处理支持
 */
@Slf4j
public class DataBatchProcessSupport {
    /**
     * 收集器执行器
     */
    private final ScheduledExecutorService collectorExecutor;
    /**
     * 已注册的收集器列表
     */
    private final List<AbstractDataCollector<?>> collectors;

    public DataBatchProcessSupport(@Autowired(required = false) List<AbstractDataCollector<?>> collectors) {
        this.collectors = collectors;
        collectorExecutor = Executors.newScheduledThreadPool(1, new CollectorThreadFactory());
        collectorExecutor.scheduleWithFixedDelay(this::execute, 3, 1, TimeUnit.SECONDS);
        log.info("数据采集-批处理器组件初始化完成。");
    }

    @PreDestroy
    public void destroy() {
        if (!collectorExecutor.isShutdown()) {
            log.debug("关闭 {} 线程池...", this.getClass().getSimpleName());
            execute();
            collectorExecutor.shutdown();
            log.debug("成功 {} 线程池。", this.getClass().getSimpleName());
        }
    }

    private void execute() {
        for (AbstractDataCollector<?> collector : collectors) {
            try {
                collector.callProcess();
            } catch (Exception ex) {
                log.warn("AbstractDataCollector调用异常,异常信息: " + ex.getMessage(), ex);
            }
        }
    }

    static class CollectorThreadFactory implements ThreadFactory {
        private static final String THREAD_NAME_PREFIX = "collector-";
        private final AtomicInteger threadNumber = new AtomicInteger();

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, THREAD_NAME_PREFIX + threadNumber.getAndIncrement());
        }
    }

}

自动配置

Enable注解 EnableDataBatchProcessor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10

/**
 * 激活数据批量处理
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DataBatchSelector.class)
public @interface EnableDataBatchProcessor {
}

导入类 DataBatchSelector

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class DataBatchSelector implements DeferredImportSelector {
    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        boolean enabled = importingClassMetadata.hasAnnotation(EnableDataBatchProcessor.class.getName());
        if (enabled) {
            return new String[]{DataBatchProcessSupport.class.getName()};
        }
        return new String[0];
    }
}

应用示例(用户访问统计)

收集器 AccessCollector

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

@Slf4j
public class AccessCollector extends AbstractDataCollector<AccessStatisticsInfo> {
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public int getBatchSize() {
        return 1000;
    }

    @Override
    public Duration getInterval() {
        return Duration.ofSeconds(5);
    }

    @Override
    public void process(List<AccessStatisticsInfo> dataList) {
        Map<String, Long> pathMap = new HashMap<>();
        Map<String, Long> ipMap = new HashMap<>();
        for (AccessStatisticsInfo info : dataList) {
            String k = info.getPath();
            if (pathMap.containsKey(k)) {
                pathMap.put(k, info.getCount() + pathMap.get(k));
            } else {
                pathMap.put(k, info.getCount());
            }
            String ipResourceKey = info.getIp() + ':' + k;
            if (ipMap.containsKey(ipResourceKey)) {
                ipMap.put(ipResourceKey, info.getCount() + ipMap.get(ipResourceKey));
            } else {
                ipMap.put(ipResourceKey, info.getCount());
            }
        }
        log.info("访问统计{接口编号=访问数量}: {}", pathMap);

        LocalDate currentDate = LocalDate.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyMMdd");
        String today = currentDate.format(formatter);
        byte[] apiDailyKey = APP_STATISTICS_ACCESS_API_DAILY.generationKey(today).getBytes(StandardCharsets.UTF_8);
        byte[] ipDailyKey = APP_STATISTICS_ACCESS_IP_DAILY.generationKey(today).getBytes(StandardCharsets.UTF_8);
        redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
            RedisHashCommands hashCmd = connection.hashCommands();
            Set<Map.Entry<String, Long>> entries = pathMap.entrySet();
            for (Map.Entry<String, Long> entry : entries) {
                byte[] allKey = RedisKeyEnum.APP_STATISTICS_ACCESS_API.generationKey().getBytes(StandardCharsets.UTF_8);
                byte[] hashKey = entry.getKey().getBytes(StandardCharsets.UTF_8);
                hashCmd.hIncrBy(allKey, hashKey, entry.getValue());
                hashCmd.hIncrBy(apiDailyKey, hashKey, entry.getValue());
            }
            Set<Map.Entry<String, Long>> ipEntries = ipMap.entrySet();
            for (Map.Entry<String, Long> ipEntry : ipEntries) {
                byte[] hashKey = ipEntry.getKey().getBytes(StandardCharsets.UTF_8);
                hashCmd.hIncrBy(ipDailyKey, hashKey, ipEntry.getValue());
            }
            return null;
        });
    }
}

实体类 AccessStatisticsInfo

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@Getter
@Setter
public class AccessStatisticsInfo {
    /**
     * 资源编号
     */
    private Integer resourceCode;
    /**
     * 访问次数
     */
    private Long count;
    /**
     * 访问ip
     */
    private String ip;
}

访问统计拦截器 AccessStatisticsInterceptor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Slf4j
public class AccessStatisticsInterceptor implements HandlerInterceptor, WebMvcConfigurer {
    @Resource
    private AccessCollector accessCollector;

    private static final String UN_KNOW_IP = "UN_KNOW";

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(this).addPathPatterns("/**");
        log.info("API访问统计组件初始化完成。");
    }

    private final long printIntervalIfError = Duration.ofSeconds(30).toMillis();
    /**
     * 不要加锁或原子操作,这里不需要一致性。
     * 多打几次日志也没关系。
     */
    private volatile long latestPrintErrorMillis;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        try {
            if (!(handler instanceof HandlerMethod handlerMethod)) {
                return true;
            }
            statistics(request, handlerMethod);
        } catch (Exception ex) {
            long current = System.currentTimeMillis();
            if (current - latestPrintErrorMillis > printIntervalIfError) {
                latestPrintErrorMillis = current;
                log.warn("App访问统计报错了," + ex.getMessage(), ex);
            }
        }
        return true;
    }

    private void statistics(HttpServletRequest request, HandlerMethod handlerMethod) {
        String realAddress = IpAddressUtil.getRealAddress(request);
        AccessStatisticsInfo info = new AccessStatisticsInfo();
        info.setIp(ObjectUtils.isEmpty(realAddress) ? UN_KNOW_IP : realAddress);
        AppResource appResource = handlerMethod.getMethodAnnotation(AppResource.class);
        if (appResource == null) {
            info.setResourceCode(-1);
            info.setCount(1L);
            accessCollector.collect(info);
            return;
        }
        info.setResourceCode(appResource.value());
        info.setCount(1L);
        accessCollector.collect(info);
    }
}

自动配置 selector

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class AccessStatisticsSelector implements DeferredImportSelector {
    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        boolean enabled = importingClassMetadata.hasAnnotation(EnableAccessStatistics.class.getName());
        if (enabled) {
            return new String[]{AccessStatisticsInterceptor.class.getName(), AccessCollector.class.getName()};
        }
        return new String[0];
    }
}

自动配置注解 EnableAccessStatistics

1
2
3
4
5
6
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(AccessStatisticsSelector.class)
public @interface EnableAccessStatistics {
}