为了提升系统性能,对一些频繁的操作,使用数据采集本地缓冲、后续批量处理的模式。将这个模式做成一个可插拔的组件。
顶层接口设计
数据收集器 DataCollector
1
2
3
4
5
6
7
8
9
10
11
12
|
/**
* 数据收集器。
*/
public interface DataCollector<T> {
/**
* 数据收集
*
* @param data 数据
*/
void collect(T data);
}
|
数据批量处理器 DataBatchProcessor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
/**
* 数据批量处理器。
*/
public interface DataBatchProcessor<T> {
/**
* 数据处理批次大小。
*
* @return 默认100。
*/
default int getBatchSize() {
return 100;
}
/**
* 数据处理间隔。
*
* @return 默认1秒。
*/
default Duration getInterval() {
return Duration.ofSeconds(1);
}
/**
* 数据批量处理
* 实现这个方法尽量不要使用耗时较长的逻辑,否则可能会影响数据收集。如果业务逻辑耗时较长,请使用线程池异步处理。
*
* @param dataList 数据列表
*/
void process(List<T> dataList);
}
|
实现
抽象实现 AbstractDataCollector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
@Slf4j
public abstract class AbstractDataCollector<T> implements DataCollector<T>, DataBatchProcessor<T> {
/**
* 数据采集队列
*/
private final LinkedBlockingDeque<T> dataCollector = new LinkedBlockingDeque<>();
/**
* 上次执行时间
*/
private volatile long lastCallTimeMills;
/**
* 数据收集,禁止重写
*
* @param data 数据
*/
@Override
public final void collect(T data) {
dataCollector.add(data);
}
void callProcess() {
if (dataCollector.size() < getBatchSize() && System.currentTimeMillis() - lastCallTimeMills < getInterval().toMillis()) {
return;
}
List<List<T>> list = pollInBatches(getBatchSize());
if (ObjectUtils.isEmpty(list)) {
lastCallTimeMills = System.currentTimeMillis();
return;
}
StringBuilder logStr = new StringBuilder("\r\n数据收集器 ").append(this.getClass().getSimpleName()).append(" ")
.append(getBatchSize()).append(" ").append(getInterval()).append("\r\n");
int no = 0;
int count = 0;
for (List<T> ts : list) {
try {
long t1 = System.currentTimeMillis();
process(ts);
long t2 = System.currentTimeMillis();
count += ts.size();
logStr.append("批次 ").append(++no).append(" 共 ").append(ts.size()).append(" 条数据,耗时:").append(t2 - t1).append("ms.\r\n");
} catch (Exception ex) {
log.info("数据收集器执行异常 " + ex.getMessage() + " , 异常数据: " + JSON.toJSONString(ts), ex);
}
}
log.debug(logStr + "本次数据采集、批量处理任务完成,共 {} 个批次,共处理 {} 条数据", list.size(), count);
lastCallTimeMills = System.currentTimeMillis();
}
private List<List<T>> pollInBatches(int batchSize) {
List<List<T>> list = new ArrayList<>();
T t;
List<T> midList = new ArrayList<>();
while ((t = dataCollector.poll()) != null) {
if (midList.size() == batchSize) {
list.add(midList);
midList = new ArrayList<>();
}
midList.add(t);
}
if (!ObjectUtils.isEmpty(midList)) {
list.add(midList);
}
return list;
}
}
|
任务执行处理器 DataBatchProcessSupport
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
/**
* 数据批量处理支持
*/
@Slf4j
public class DataBatchProcessSupport {
/**
* 收集器执行器
*/
private final ScheduledExecutorService collectorExecutor;
/**
* 已注册的收集器列表
*/
private final List<AbstractDataCollector<?>> collectors;
public DataBatchProcessSupport(@Autowired(required = false) List<AbstractDataCollector<?>> collectors) {
this.collectors = collectors;
collectorExecutor = Executors.newScheduledThreadPool(1, new CollectorThreadFactory());
collectorExecutor.scheduleWithFixedDelay(this::execute, 3, 1, TimeUnit.SECONDS);
log.info("数据采集-批处理器组件初始化完成。");
}
@PreDestroy
public void destroy() {
if (!collectorExecutor.isShutdown()) {
log.debug("关闭 {} 线程池...", this.getClass().getSimpleName());
execute();
collectorExecutor.shutdown();
log.debug("成功 {} 线程池。", this.getClass().getSimpleName());
}
}
private void execute() {
for (AbstractDataCollector<?> collector : collectors) {
try {
collector.callProcess();
} catch (Exception ex) {
log.warn("AbstractDataCollector调用异常,异常信息: " + ex.getMessage(), ex);
}
}
}
static class CollectorThreadFactory implements ThreadFactory {
private static final String THREAD_NAME_PREFIX = "collector-";
private final AtomicInteger threadNumber = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, THREAD_NAME_PREFIX + threadNumber.getAndIncrement());
}
}
}
|
自动配置
Enable注解 EnableDataBatchProcessor
1
2
3
4
5
6
7
8
9
10
|
/**
* 激活数据批量处理
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DataBatchSelector.class)
public @interface EnableDataBatchProcessor {
}
|
导入类 DataBatchSelector
1
2
3
4
5
6
7
8
9
10
|
public class DataBatchSelector implements DeferredImportSelector {
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
boolean enabled = importingClassMetadata.hasAnnotation(EnableDataBatchProcessor.class.getName());
if (enabled) {
return new String[]{DataBatchProcessSupport.class.getName()};
}
return new String[0];
}
}
|
应用示例(用户访问统计)
收集器 AccessCollector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
@Slf4j
public class AccessCollector extends AbstractDataCollector<AccessStatisticsInfo> {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Override
public int getBatchSize() {
return 1000;
}
@Override
public Duration getInterval() {
return Duration.ofSeconds(5);
}
@Override
public void process(List<AccessStatisticsInfo> dataList) {
Map<String, Long> pathMap = new HashMap<>();
Map<String, Long> ipMap = new HashMap<>();
for (AccessStatisticsInfo info : dataList) {
String k = info.getPath();
if (pathMap.containsKey(k)) {
pathMap.put(k, info.getCount() + pathMap.get(k));
} else {
pathMap.put(k, info.getCount());
}
String ipResourceKey = info.getIp() + ':' + k;
if (ipMap.containsKey(ipResourceKey)) {
ipMap.put(ipResourceKey, info.getCount() + ipMap.get(ipResourceKey));
} else {
ipMap.put(ipResourceKey, info.getCount());
}
}
log.info("访问统计{接口编号=访问数量}: {}", pathMap);
LocalDate currentDate = LocalDate.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyMMdd");
String today = currentDate.format(formatter);
byte[] apiDailyKey = APP_STATISTICS_ACCESS_API_DAILY.generationKey(today).getBytes(StandardCharsets.UTF_8);
byte[] ipDailyKey = APP_STATISTICS_ACCESS_IP_DAILY.generationKey(today).getBytes(StandardCharsets.UTF_8);
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
RedisHashCommands hashCmd = connection.hashCommands();
Set<Map.Entry<String, Long>> entries = pathMap.entrySet();
for (Map.Entry<String, Long> entry : entries) {
byte[] allKey = RedisKeyEnum.APP_STATISTICS_ACCESS_API.generationKey().getBytes(StandardCharsets.UTF_8);
byte[] hashKey = entry.getKey().getBytes(StandardCharsets.UTF_8);
hashCmd.hIncrBy(allKey, hashKey, entry.getValue());
hashCmd.hIncrBy(apiDailyKey, hashKey, entry.getValue());
}
Set<Map.Entry<String, Long>> ipEntries = ipMap.entrySet();
for (Map.Entry<String, Long> ipEntry : ipEntries) {
byte[] hashKey = ipEntry.getKey().getBytes(StandardCharsets.UTF_8);
hashCmd.hIncrBy(ipDailyKey, hashKey, ipEntry.getValue());
}
return null;
});
}
}
|
实体类 AccessStatisticsInfo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@Getter
@Setter
public class AccessStatisticsInfo {
/**
* 资源编号
*/
private Integer resourceCode;
/**
* 访问次数
*/
private Long count;
/**
* 访问ip
*/
private String ip;
}
|
访问统计拦截器 AccessStatisticsInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
@Slf4j
public class AccessStatisticsInterceptor implements HandlerInterceptor, WebMvcConfigurer {
@Resource
private AccessCollector accessCollector;
private static final String UN_KNOW_IP = "UN_KNOW";
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(this).addPathPatterns("/**");
log.info("API访问统计组件初始化完成。");
}
private final long printIntervalIfError = Duration.ofSeconds(30).toMillis();
/**
* 不要加锁或原子操作,这里不需要一致性。
* 多打几次日志也没关系。
*/
private volatile long latestPrintErrorMillis;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
try {
if (!(handler instanceof HandlerMethod handlerMethod)) {
return true;
}
statistics(request, handlerMethod);
} catch (Exception ex) {
long current = System.currentTimeMillis();
if (current - latestPrintErrorMillis > printIntervalIfError) {
latestPrintErrorMillis = current;
log.warn("App访问统计报错了," + ex.getMessage(), ex);
}
}
return true;
}
private void statistics(HttpServletRequest request, HandlerMethod handlerMethod) {
String realAddress = IpAddressUtil.getRealAddress(request);
AccessStatisticsInfo info = new AccessStatisticsInfo();
info.setIp(ObjectUtils.isEmpty(realAddress) ? UN_KNOW_IP : realAddress);
AppResource appResource = handlerMethod.getMethodAnnotation(AppResource.class);
if (appResource == null) {
info.setResourceCode(-1);
info.setCount(1L);
accessCollector.collect(info);
return;
}
info.setResourceCode(appResource.value());
info.setCount(1L);
accessCollector.collect(info);
}
}
|
自动配置 selector
1
2
3
4
5
6
7
8
9
10
|
public class AccessStatisticsSelector implements DeferredImportSelector {
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
boolean enabled = importingClassMetadata.hasAnnotation(EnableAccessStatistics.class.getName());
if (enabled) {
return new String[]{AccessStatisticsInterceptor.class.getName(), AccessCollector.class.getName()};
}
return new String[0];
}
}
|
自动配置注解 EnableAccessStatistics
1
2
3
4
5
6
|
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(AccessStatisticsSelector.class)
public @interface EnableAccessStatistics {
}
|