开篇
JVM Profiler采集完数据后可以通过多种途径上报数据,对接Console,File,redis,kafka等,这篇文章会把源码罗列一下毕竟都很简单。
JVM Profiler提供灵活的框架可以集成更多的Reporter,只要实现Reporter接口即可,看你个人意愿了,反正github上有源码,直接集成编译打包即可。
ConsoleOutputReporter
- 简单明了的通过Sytem.out.println来上报监控数据。
public class ConsoleOutputReporter implements Reporter {
@Override
public void report(String profilerName, Map<String, Object> metrics) {
System.out.println(String.format("ConsoleOutputReporter - %s: %s", profilerName, JsonUtils.serialize(metrics)));
}
@Override
public void close() {
}
}
FileOutputReporter
- 在指定的目录创建采集数据记录文件。
- 通过FileWriter来往文件写入数据。
public class FileOutputReporter implements Reporter {
private static final AgentLogger logger = AgentLogger.getLogger(FileOutputReporter.class.getName());
private String directory;
private ConcurrentHashMap<String, FileWriter> fileWriters = new ConcurrentHashMap<>();
private volatile boolean closed = false;
public FileOutputReporter() {
}
public String getDirectory() {
return directory;
}
public void setDirectory(String directory) {
synchronized (this) {
if (this.directory == null || this.directory.isEmpty()) {
this.directory = directory;
} else {
throw new RuntimeException(String.format("Cannot set directory to %s because it is already has value %s", directory, this.directory));
}
}
}
@Override
public synchronized void report(String profilerName, Map<String, Object> metrics) {
if (closed) {
logger.info("Report already closed, do not report metrics");
return;
}
FileWriter writer = ensureFile(profilerName);
try {
writer.write(JsonUtils.serialize(metrics));
writer.write(System.lineSeparator());
writer.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public synchronized void close() {
closed = true;
List<FileWriter> copy = new ArrayList<>(fileWriters.values());
for (FileWriter entry : copy) {
try {
entry.flush();
entry.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private FileWriter ensureFile(String profilerName) {
synchronized (this) {
if (directory == null || directory.isEmpty()) {
try {
directory = Files.createTempDirectory("jvm_profiler_").toString();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return fileWriters.computeIfAbsent(profilerName, t -> createFileWriter(t));
}
private FileWriter createFileWriter(String profilerName) {
String path = Paths.get(directory, profilerName + ".json").toString();
try {
return new FileWriter(path, true);
} catch (IOException e) {
throw new RuntimeException("Failed to create file writer: " + path, e);
}
}
}
KafkaOutputReporter
- 依赖kafka-client的jar包来构建KafkaProducer。
- 通过producer.send来发送采集数据。
public class KafkaOutputReporter implements Reporter {
private String brokerList = "localhost:9092";
private boolean syncMode = false;
private String topicPrefix;
private ConcurrentHashMap<String, String> profilerTopics = new ConcurrentHashMap<>();
private Producer<String, byte[]> producer;
public KafkaOutputReporter() {
}
public KafkaOutputReporter(String brokerList, boolean syncMode, String topicPrefix) {
this.brokerList = brokerList;
this.syncMode = syncMode;
this.topicPrefix = topicPrefix;
}
@Override
public void report(String profilerName, Map<String, Object> metrics) {
ensureProducer();
String topicName = getTopic(profilerName);
String str = JsonUtils.serialize(metrics);
byte[] message = str.getBytes(StandardCharsets.UTF_8);
Future<RecordMetadata> future = producer.send(
new ProducerRecord<String, byte[]>(topicName, message));
if (syncMode) {
producer.flush();
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
// 省略一些非核心的代码
private void ensureProducer() {
synchronized (this) {
if (producer != null) {
return;
}
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("retries", 10);
props.put("batch.size", 16384);
props.put("linger.ms", 0);
props.put("buffer.memory", 16384000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
if (syncMode) {
props.put("acks", "all");
}
producer = new KafkaProducer<>(props);
}
}
}
RedisOutputReporter
- 依赖jedis包来实现redis的读写。
- redis当中存储的采集数据的key是机器ip和时间戳的组合,value是采集的数据。
public class RedisOutputReporter implements Reporter {
private static final AgentLogger logger = AgentLogger.getLogger(RedisOutputReporter.class.getName());
private JedisPool redisConn = null;
//JedisPool should always be used as it is thread safe.
public void report(String profilerName, Map<String, Object> metrics) {
ensureJedisConn();
try {
Jedis jedisClient = redisConn.getResource();
jedisClient.set(createOriginStamp(profilerName), JsonUtils.serialize(metrics));
redisConn.returnResource(jedisClient);
} catch (Exception err) {
logger.warn(err.toString());
}
}
public String createOriginStamp(String profilerName) {
try {
return (profilerName + "-" + InetAddress.getLocalHost().getHostAddress() + "-" + System.currentTimeMillis());
} catch (UnknownHostException err) {
logger.warn("Address could not be determined and will be omitted!");
return (profilerName + "-" + System.currentTimeMillis());
}
}
public void close() {
synchronized (this) {
redisConn.close();
redisConn = null;
}
}
private void ensureJedisConn() {
synchronized (this) {
if (redisConn == null || redisConn.isClosed()) {
redisConn = new JedisPool(System.getenv("JEDIS_PROFILER_CONNECTION"));
return;
}
}
}
}