透视业务监控系统
约 3709 字大约 12 分钟
2025-07-24
组件描述:该组件不同于普罗米修斯以及skywalking等系统可用性监控,它是一套以用户行为为维度的一套业务流程监控系统。通过 Logback 日志框架的扩展,降低接入侵入性,采集用户行为链路日志,通过公式化配置解析存储行为数据实现监控处理。
核心技术:SpringBoot、MyBatis、MySQL、Kafka、Logback、OGNL,Docker、Gojs
核心职责:
- 调研市面同类 BCP 业务监控系统,分析设计分布式业务监控平台
- 设计并实现了基于Logback的高扩展性日志采集SDK,通过自定义AppenderBase接口实现了对现有系统的非侵入式日志采集。
- 设计开发 Ognl 表达式分析标准,实现数据的解析处理。
- 构建了基于Kafka为中心的消息处理系统,确保日志数据的实时性和可靠性。
- 设计gojs 页面透视业务流程可视化,以及节点监控报警等功能。
全链路监控和字节码增强
非入侵、全链路监控以及字节码增强技术是现代分布式系统和微服务架构中非常重要的概念和技术,用于实现对应用程序的性能监控和故障排查,而不需要修改应用本身的源代码。以下是这些技术的基本介绍:
非入侵监控
非入侵监控指的是在不修改或极少修改应用程序源代码的情况下,通过外部手段对应用进行监控。这种方法可以减少因添加监控逻辑而导致的潜在风险(如引入新的bug),并且能够快速部署到生产环境中。常见的非入侵监控工具包括Prometheus、Grafana等,它们通常通过收集和分析应用程序暴露的metrics来进行监控。
全链路监控
全链路监控是一种面向分布式系统的监控方式,旨在追踪和监控一次用户请求在整个系统中的完整调用链路。它不仅关注单个服务的表现,还会跟踪跨服务边界的数据流动情况,帮助识别性能瓶颈和故障点。全链路监控通常涉及三个关键组件:
- 分布式追踪:例如Jaeger或Zipkin,用来记录和可视化跨多个服务的请求路径。
- 度量指标收集:如Prometheus,用于收集和报警系统和服务级别的度量标准。
- 日志聚合:比如ELK Stack(Elasticsearch, Logstash, Kibana)或阿里云的日志服务(SLS),用于集中管理和分析日志数据。
字节码增强技术
字节码增强技术是指在Java等支持字节码的语言中,在编译后的类加载阶段或运行时动态修改或插入额外的字节码,以增加新的功能或改变现有行为,而不必修改原始源代码。这在实现AOP(面向切面编程)、性能监控等方面非常有用。常用的字节码操作库包括ASM、Javassist和Byte Buddy。Spring AOP就是一个广泛应用的例子,它利用了字节码增强技术来实现在方法执行前后自动插入自定义逻辑的功能。
在实践中,这些技术和方法经常结合使用,为复杂的分布式系统提供全面的监控解决方案。例如,通过字节码增强技术可以在不修改业务代码的前提下,将监控代码注入到目标应用中;然后,利用全链路监控工具追踪整个请求的处理流程,从而达到高效的问题定位和性能优化的目的。
透视业务流程
我先来通过5W1H思想分析下这个东西:
What:它是一个可以通过流程图来透视业务执行过程的组件
Why:它可以在出现问题时,清晰的看到业务的执行流程以及出现问题的节点,同时让未接触过的开发者更容易排查问题,让领导更容易知道当前业务的执行流程
When:它在程序执行每一步关键节点操作的时候记录数据
Who:它可以为团队中的领导、组长、PM等提供价值,受益人可以是想要了解流程的Leader、PM与业务等等
Where:这个东西使用的场景就是,通过监控来定位问题、了解问题、排查问题,缩短排查时间,提高人效。
How:对流程的分解,计划采用日志扩展点的方式+异步数据上报到Server端的方式进行执行节点的分析、关系可视化等等。
典型监控系统
典型的监控系统如SkyWalking、Prometheus和ELK(Elasticsearch, Logstash, Kibana)日志采集系统各自有着不同的应用场景和优势,但同时也可能存在一些限制或挑战:
SkyWalking全链路监控
优势:
- 提供了无侵入式的分布式追踪和性能分析能力。
- 支持多种语言的探针,适用于微服务架构。
- 可视化界面友好,便于快速定位问题。
潜在问题:
- 对于非常复杂的微服务环境,配置和服务映射可能变得复杂。
- 性能开销:虽然字节码增强技术是非侵入式的,但它可能会引入一定的性能开销。
Prometheus
优势:
- 强大的数据模型和灵活的查询语言PromQL。
- 高效的时间序列数据库,适合大规模指标数据的收集和存储。
- 良好的报警机制,可以与Grafana等可视化工具无缝集成。
潜在问题:
- 不直接支持日志数据的处理,需要与其他系统(如ELK)结合使用以提供完整的监控解决方案。
- 数据持久化方案相对有限,长时间的数据保留可能会导致磁盘空间消耗较大。
ELK日志采集系统
优势:
- 强大的日志搜索和分析功能,适用于日志密集型应用。
- 易于扩展,可以根据需求调整集群规模。
- Kibana提供了丰富的可视化选项,便于数据分析和展示。
潜在问题:
- 日志索引管理较为复杂,特别是当数据量巨大时,维护成本会增加。
- 对于高吞吐量的日志数据,可能需要进行额外的优化来确保系统的稳定性和性能。例如,Logstash在处理大量数据时可能会遇到瓶颈,通常建议使用Filebeat作为轻量级的替代方案进行日志采集。
- 存储成本较高,特别是长期保存详细日志的情况下。
综上所述,每种监控系统都有其适用场景和局限性。实际部署时,根据业务需求和系统特性选择合适的工具或组合使用这些工具(如Prometheus用于指标监控,ELK用于日志分析),能够更有效地满足全面的监控需求。
开始编码
日志采集组件SDK实现
标准logback配置
我们采用logback实现日志,常见的logback-spring.xml日志设置内容如下
<?xml version="1.0" encoding="UTF-8"?>
<!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
<configuration scan="true" scanPeriod="10 seconds">
<contextName>logback</contextName>
<!-- name的值是变量的名称,value的值时变量定义的值。通过定义的值会被插入到logger上下文中。定义变量后,可以使“${}”来使用变量。 -->
<springProperty scope="context" name="log.path" source="logging.path"/>
<!-- 日志格式 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>
<conversionRule conversionWord="wex"
converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/>
<conversionRule conversionWord="wEx"
converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
<!-- 输出到控制台 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<!-- 此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息 -->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>info</level>
</filter>
<encoder>
<pattern>%d{yy-MM-dd.HH:mm:ss.SSS} [%-16t] %-5p %-22c{0}%X{ServiceId} -%X{trace-id} %m%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!--输出到文件-->
<!-- 时间滚动输出 level为 INFO 日志 -->
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>./data/log/log_info.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yy-MM-dd.HH:mm:ss.SSS} [%-16t] %-5p %-22c{0}%X{ServiceId} -%X{trace-id} %m%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 每天日志归档路径以及格式 -->
<fileNamePattern>./data/log/log-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
<totalSizeCap>10GB</totalSizeCap>
</rollingPolicy>
</appender>
<!-- 时间滚动输出 level为 ERROR 日志 -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>./data/log/log_error.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yy-MM-dd.HH:mm:ss.SSS} [%-16t] %-5p %-22c{0}%X{ServiceId} -%X{trace-id} %m%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>./data/log/log-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!-- 日志文件保留天数【根据服务器预留,可自行调整】 -->
<maxHistory>7</maxHistory>
<totalSizeCap>5GB</totalSizeCap>
</rollingPolicy>
<!-- WARN 级别及以上 -->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
</appender>
<!-- 异步输出 -->
<appender name="ASYNC_FILE_INFO" class="ch.qos.logback.classic.AsyncAppender">
<!-- 队列剩余容量小于discardingThreshold,则会丢弃TRACT、DEBUG、INFO级别的日志;默认值-1,为queueSize的20%;0不丢失日志 -->
<discardingThreshold>0</discardingThreshold>
<!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
<queueSize>8192</queueSize>
<!-- neverBlock:true 会丢失日志,但业务性能不受影响 -->
<neverBlock>true</neverBlock>
<!--是否提取调用者数据-->
<includeCallerData>false</includeCallerData>
<appender-ref ref="INFO_FILE"/>
</appender>
<appender name="ASYNC_FILE_ERROR" class="ch.qos.logback.classic.AsyncAppender">
<!-- 队列剩余容量小于discardingThreshold,则会丢弃TRACT、DEBUG、INFO级别的日志;默认值-1,为queueSize的20%;0不丢失日志 -->
<discardingThreshold>0</discardingThreshold>
<!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
<queueSize>1024</queueSize>
<!-- neverBlock:true 会丢失日志,但业务性能不受影响 -->
<neverBlock>true</neverBlock>
<!--是否提取调用者数据-->
<includeCallerData>false</includeCallerData>
<appender-ref ref="ERROR_FILE"/>
</appender>
<!-- 开发环境:控制台打印 -->
<springProfile name="dev">
<logger name="com.nmys.view" level="debug"/>
</springProfile>
<root level="info">
<appender-ref ref="CONSOLE"/>
<!-- 异步日志-INFO -->
<appender-ref ref="ASYNC_FILE_INFO"/>
<!-- 异步日志-ERROR -->
<appender-ref ref="ASYNC_FILE_ERROR"/>
</root>
</configuration>
我们需要自定义自己的日志组件,可以使用appender配置,例如名称、类位置等基础信息
自定义日志SDK
我们需要使用Logback提供的扩展方式,并且实现日志推送。
- 引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ognl</groupId>
<artifactId>ognl</artifactId>
<version>3.0.8</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.28</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.26.0</version>
</dependency>
</dependencies>
- 我们需要在SDK中自定义一个类实现AppenderBase
需要读取当前的系统名称、Redis地址和端口、方法名称、工程信息、只有符合当前的工程ID的位置的日志才能读取
/**
* @author flycode
* 日志监控组件SDK
*/
public class MonitorLogAppender<E> extends AppenderBase<E> {
private final RecordLogEntity recordLogEntity = new RecordLogEntity();
public void setSystemName(String systemName) {
recordLogEntity.setSystemName(systemName);
}
public void setGroupId(String groupId) {
recordLogEntity.setGroupId(groupId);
}
public void setHost(String host) {
recordLogEntity.setHost(host);
}
public void setPort(int port) {
recordLogEntity.setPort(port);
}
@Override
protected void append(E e) {
// 1. 只处理log日志
if (e instanceof ILoggingEvent) {
ILoggingEvent loggingEvent = (ILoggingEvent) e;
String className = "unknown";
String methodName = "unknown";
// 2. 读取基本配置信息,例如类名、方法名
StackTraceElement[] callerData = loggingEvent.getCallerData();
if (callerData != null && callerData.length > 0) {
StackTraceElement caller = callerData[0];
className = caller.getClassName();
methodName = caller.getMethodName();
}
// 3. 判断是否包含在指定工程里面
if (!className.startsWith(recordLogEntity.getGroupId())) {
return;
}
// 4. 获取其他日志信息
String formattedMessage = ((ILoggingEvent) e).getFormattedMessage();
LocalLogMessage logMessage = new LocalLogMessage(recordLogEntity.getSystemName(), className, methodName, Arrays.asList(formattedMessage.split(" ")));
System.out.println("当前日志信息:"+logMessage);
}
}
}
里面有两个类,分别记录系统配置和日志信息
/**
* Appender需要提供的相关信息配置
*/
public class RecordLogEntity {
private String systemName;
/**
* 只采集部分符合当前groupId的日志信息
*/
private String groupId;
/**
* redis 地址
*/
private String host;
/**
* redis 端口
*/
private int port;
// 省略Get、Set方法
}
/**
* 记录调用方法信息
*/
public class LocalLogMessage {
private String systemName;
private String className;
private String methodName;
private List<String> logList;
public LocalLogMessage(String systemName, String className, String methodName, List<String> logList) {
this.systemName = systemName;
this.className = className;
this.methodName = methodName;
this.logList = logList;
}
// 省略Get、Set、toString
}
其他项目首先需要引入这个SDK
编写logback.xml配置信息
<appender name="Monitor" class="com.flycode.appender.MonitorLogAppender">
<systemName>business-behavior-monitor-test</systemName>
<groupId>com.flycode</groupId>
<host>127.0.0.1</host>
<port>6379</port>
</appender>
<root level="info">
<appender-ref ref="Monitor"/>
</root>
- 编写测试
@Slf4j
@ExtendWith(SpringExtension.class)
@SpringBootTest
public class ApiTest {
@Test
public void test() {
String name = "sss";
int age = 18;
log.info("name:{},age:{}", name, age);
}
}
- 调用结果
当前日志信息:LocalLogMessage{systemName='business-behavior-monitor-test', className='com.flycode.ApiTest', methodName='test', logList=[name:sss,age:18]}
引入Redis发布订阅
/**
* 发布订阅
*
* @author flycode
*/
public interface IPush {
/**
* 开启Redis连接
* @param host
* @param port
*/
void open(String host,int port);
/**
* 发送日志
* @param localLogMessage
*/
void send(LocalLogMessage localLogMessage);
}
- 实现Redis发布订阅
/**
* @author flycode
* Redis实现发布订阅
*/
public class RedisPushService implements IPush {
private final Logger logger = LoggerFactory.getLogger(RedisPushService.class);
private RedissonClient redissonClient;
@Override
public void open(String host, int port) {
if (null != redissonClient && !redissonClient.isShutdown()) {
return;
}
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + host + ":" + port)
.setConnectionPoolSize(64)
.setConnectionMinimumIdleSize(10)
.setRetryAttempts(3)
.setConnectTimeout(3000)
.setPingConnectionInterval(1000)
.setKeepAlive(true);
this.redissonClient = Redisson.create(config);
RTopic topic = redissonClient.getTopic("business-behavior-monitor-sdk-topic");
topic.addListener(LocalLogMessage.class, new MonitorLogListener());
}
@Override
public void send(LocalLogMessage localLogMessage) {
try {
RTopic topic = redissonClient.getTopic("business-behavior-monitor-sdk-topic");
topic.publish(localLogMessage);
} catch (Exception e) {
logger.error("发布订阅失败{}", e.getMessage());
}
}
}
- 添加监听器
/**
* Redis监听器
* @author flycode
*/
public class MonitorLogListener implements MessageListener<LocalLogMessage> {
private final Logger logger = LoggerFactory.getLogger(MonitorLogListener.class);
public void onMessage(CharSequence charSequence, LocalLogMessage logMessage) {
// logger.info("接受消息{}", JSON.toJSONString(logMessage));,不要使用这个logger,会引起无线递归
System.out.println("接受消息: " + JSON.toJSONString(logMessage));
}
}
- 引入发布订阅功能到MonitorLogAppender
private final IPush push = new RedisPushService();
@Override
protected synchronized void append(E e) {
// 开启推送
push.open(recordLogEntity.getHost(), recordLogEntity.getPort());
// 省略代码
// 推送日志
push.send(logMessage);
}
- 测试,需要使用CountDownLatch进行等待Redis的发布订阅,因为是异步的
@Test
public void test() throws InterruptedException {
String name = "sss";
int age = 18;
log.info("name:{},age:{}", name, age);
new CountDownLatch(1).await();
}
当前日志信息:LocalLogMessage{systemName='business-behavior-monitor-test', className='com.flycode.ApiTest', methodName='test', logList=[name:sss,age:18]}
25-07-28.11:00:09.459 [main ] INFO ApiTest - name:sss,age:18
接受消息: {"className":"com.flycode.ApiTest","logList":["name:sss,age:18"],"methodName":"test","systemName":"business-behavior-monitor-test"}
实现了简单的日志监听SDK组件
流程可视化库表设计

贡献者
flycodeu
版权所有
版权归属:flycodeu