1、SphU
用过sentinel的都知道SphU是一切的源头
entry = SphU.entry(target, EntryType.IN); 通过这行代码来获取访问令牌,如果获取到令牌,那么就可以访问目标资源,没有获取到entry便无法访问对应资源。
public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
throws BlockException {
// 注意第4个参数值为1
return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
}
public static AsyncEntry asyncEntry(String name, int resourceType, EntryType trafficType, Object[] args)
throws BlockException {
return Env.sph.asyncEntryWithType(name, resourceType, trafficType, 1, false, args);
}
这里有两种获取entry的方法,前者适用于传统处理方式一个请求一个线程进行处理SpringMVC,普通处理业务这种,后者适用于响应式处理方式,根据信号来处理数据,像基于SpringWebFlux,SCG这种。
不过呢,这不是本主题,因为Sentinel初始化,无论是后者还是前者,都是共用的,后面文章讲各自的处理方式。
参数解释:
- name 受保护的资源名称
- resourceType 资源类型 一般是web/rpc
- entryType 进入类型/令牌类型, 一般有两种 in/out 往往是in 代表进来的流量
- args 参数流控所需配置项
2、Env.sph
public class Env {
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
InitExecutor.doInit();
}
}
Env 是sentinel的触发器,第一次使用SphU.entry触发,进行类加载的时候初始化,初始化sentinel基础组件
/**
* If one {@link InitFunc} throws an exception, the init process
* will immediately be interrupted and the application will exit.
*
* The initialization will be executed only once.
*/
public static void doInit() {
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : initFuncs) {
RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
w.func.init();
RecordLog.info("[InitExecutor] Executing {} with order {}",
w.func.getClass().getCanonicalName(), w.order);
}
} catch (Exception ex) {
RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
ex.printStackTrace();
} catch (Error error) {
RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
error.printStackTrace();
}
}
SpiLoader.of(InitFunc.class).loadInstanceListSorted() 通过JavaSPI技术获取到InitFunc所有实现类,Sentinel自己实现了SPI加载机制,并没有采用ServcieLoader.load获取。我们可以看看这种实现。
public List<S> loadInstanceListSorted() {
load();
return createInstanceList(sortedClassList);
}
/**
* Load the Provider class from Provider configuration file
*/
public void load() {
if (!loaded.compareAndSet(false, true)) {
return;
}
// 拼接SPI接口位置 META-INF/services/InitFunc
String fullFileName = SPI_FILE_PREFIX + service.getName();
// 获取类加载器,用来加载指定SPI接口的实现类
ClassLoader classLoader;
if (SentinelConfig.shouldUseContextClassloader()) {
classLoader = Thread.currentThread().getContextClassLoader();
} else {
classLoader = service.getClassLoader();
}
if (classLoader == null) {
classLoader = ClassLoader.getSystemClassLoader();
}
Enumeration<URL> urls = null;
try {
// 通过classLoader.getResources 架子啊SPI配置文件地址,获取SPI配置文件里的配置项,也即SPI实现类字符串
urls = classLoader.getResources(fullFileName);
} catch (IOException e) {
fail("Error locating SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader, e);
}
if (urls == null || !urls.hasMoreElements()) {
RecordLog.warn("No SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader);
return;
}
// 对指定SPI接口实现类,进行类加载,获取class对象
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
InputStream in = null;
BufferedReader br = null;
try {
in = url.openStream();
br = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
String line;
while ((line = br.readLine()) != null) {
if (StringUtil.isBlank(line)) {
// Skip blank line
continue;
}
line = line.trim();
int commentIndex = line.indexOf("#");
if (commentIndex == 0) {
// Skip comment line
continue;
}
if (commentIndex > 0) {
line = line.substring(0, commentIndex);
}
line = line.trim();
Class<S> clazz = null;
try {
// 获取SPI接口实现类 class对象
clazz = (Class<S>) Class.forName(line, false, classLoader);
} catch (ClassNotFoundException e) {
fail("class " + line + " not found", e);
}
if (!service.isAssignableFrom(clazz)) {
fail("class " + clazz.getName() + "is not subtype of " + service.getName() + ",SPI configuration file=" + fullFileName);
}
classList.add(clazz);
Spi spi = clazz.getAnnotation(Spi.class);
String aliasName = spi == null || "".equals(spi.value()) ? clazz.getName() : spi.value();
if (classMap.containsKey(aliasName)) {
Class<? extends S> existClass = classMap.get(aliasName);
fail("Found repeat alias name for " + clazz.getName() + " and "
+ existClass.getName() + ",SPI configuration file=" + fullFileName);
}
classMap.put(aliasName, clazz);
if (spi != null && spi.isDefault()) {
if (defaultClass != null) {
fail("Found more than one default Provider, SPI configuration file=" + fullFileName);
}
defaultClass = clazz;
}
RecordLog.info("[SpiLoader] Found SPI implementation for SPI {}, provider={}, aliasName={}"
+ ", isSingleton={}, isDefault={}, order={}",
service.getName(), line, aliasName
, spi == null ? true : spi.isSingleton()
, spi == null ? false : spi.isDefault()
, spi == null ? 0 : spi.order());
}
} catch (IOException e) {
fail("error reading SPI configuration file", e);
} finally {
closeResources(in, br);
}
}
// 构建SPI接口实现类 顺序,
sortedClassList.addAll(classList);
Collections.sort(sortedClassList, new Comparator<Class<? extends S>>() {
@Override
public int compare(Class<? extends S> o1, Class<? extends S> o2) {
Spi spi1 = o1.getAnnotation(Spi.class);
int order1 = spi1 == null ? 0 : spi1.order();
Spi spi2 = o2.getAnnotation(Spi.class);
int order2 = spi2 == null ? 0 : spi2.order();
return Integer.compare(order1, order2);
}
});
}
List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
这一步本质上是通过读取SPI配置文件,进行逐行类加载,获取到全部实现类class对象列表
可以看到实现类还是蛮多的,那么都是用来干什么的呢???
for (OrderWrapper w : initList) {
w.func.init();
RecordLog.info("[InitExecutor] Executing {} with order {}",
w.func.getClass().getCanonicalName(), w.order);
}
通过上述代码进行遍历初始化组件
3、CommandCenterInitFunc
顾名思义,Sentinel命令中心组件,主要用来接收dashboard发送过来的命令,进行命令解析处理。例如dashboard新增了一条限流规则,rpc到Sentinel命令中心,让这条规则生效
@InitOrder(-1)
public class CommandCenterInitFunc implements InitFunc {
@Override
public void init() throws Exception {
CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
if (commandCenter == null) {
RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
return;
}
commandCenter.beforeStart();
commandCenter.start();
RecordLog.info("[CommandCenterInit] Starting command center: "
+ commandCenter.getClass().getCanonicalName());
}
}
CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
通commandCenterProvider来类加载 命令处理器,因为命令中心接收到命令之后,需要通过对应命令处理器处理。
public final class CommandCenterProvider {
private static CommandCenter commandCenter = null;
static {
resolveInstance();
}
private static void resolveInstance() {
CommandCenter resolveCommandCenter = SpiLoader.of(CommandCenter.class).loadHighestPriorityInstance();
if (resolveCommandCenter == null) {
RecordLog.warn("[CommandCenterProvider] WARN: No existing CommandCenter found");
} else {
commandCenter = resolveCommandCenter;
RecordLog.info("[CommandCenterProvider] CommandCenter resolved: {}", resolveCommandCenter.getClass()
.getCanonicalName());
}
}
/**
* Get resolved {@link CommandCenter} instance.
*
* @return resolved {@code CommandCenter} instance
*/
public static CommandCenter getCommandCenter() {
return commandCenter;
}
private CommandCenterProvider() {}
}
这里无非是通过SPI加载ComandCenter的所有实现类,上面讲过了,不赘诉了。CommandCenter接口有哪些??
命令中心有两个实现方式,一个通过netty接收命令处理,一种就是传统的httpClient,内部的实现方式类似
- NettyHttpCommandCenter
- SimpleHttpCommandCenter
我们以默认的SImpleHttpCommandCenter来讲诉吧!!对了loadHighestPriorityInstance()默认加载高优先级的CommandCenter这里!
3.1、SimpleHttpCommandCenter
commandCenter.beforeStart();
@Override
public void init() throws Exception {
CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
if (commandCenter == null) {
RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
return;
}
commandCenter.beforeStart();
commandCenter.start();
RecordLog.info("[CommandCenterInit] Starting command center: "
+ commandCenter.getClass().getCanonicalName());
}
获取到CommandCenter两个实现类后,调用beforeStart();
@Override
@SuppressWarnings("rawtypes")
public void beforeStart() throws Exception {
// Register handlers
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
registerCommands(handlers);
}
显然又是SPI加载,这次呢是获取所有CommandHandler实现类,也即获取所有命令处理器
public Map<String, CommandHandler> namedHandlers() {
Map<String, CommandHandler> map = new HashMap<String, CommandHandler>();
// 老套路,SPI类加载
List<CommandHandler> handlers = spiLoader.loadInstanceList();
for (CommandHandler handler : handlers) {
String name = parseCommandName(handler);
if (!StringUtil.isEmpty(name)) {
map.put(name, handler);
}
}
return map;
}
CommandHnadler默认实现蛮多的,用到的也不过是那几个 限流规则命令处理器,熔断规则命令处理器等等,有兴趣可以了解下,不难
/**
*
* @throws Exception
*/
@Override
@SuppressWarnings("rawtypes")
public void beforeStart() throws Exception {
// Register handlers
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
// 注册命令处理器缓存一下,方便后续使用
registerCommands(handlers);
}
commandCenter.start();
@Override
public void start() throws Exception {
// 获取当前服务器的最大处理器
int nThreads = Runtime.getRuntime().availableProcessors();
// 创建线程池 核心线程数=最大线程数= 最大处理器数
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
new NamedThreadFactory("sentinel-command-center-service-executor"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
// 定义一个服务器初始化的任务,启动命令中心,接收命令处理
Runnable serverInitTask = new Runnable() {
int port;
// 定义dashboard端口
{
try {
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
// 创建一个serverSocket 接收命令用的
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
// 启动ServerThread,来接收任务并处理
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
};
new Thread(serverInitTask).start();
}
private static ServerSocket getServerSocketFromBasePort(int basePort) {
int tryCount = 0;
while (true) {
try {
ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
server.setReuseAddress(true);
return server;
} catch (IOException e) {
tryCount++;
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException e1) {
break;
}
}
}
return null;
}
服务器创建失败,端口数增加,会自动重建
executor.submit(new ServerThread(serverSocket));
ServerThread.run
@Override
public void run() {
while (true) {
Socket socket = null;
try {
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
HttpEventTask eventTask = new HttpEventTask(socket);
bizExecutor.submit(eventTask);
} catch (Exception e) {
CommandCenterLog.info("Server error", e);
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
CommandCenterLog.info("Error when closing an opened socket", e1);
}
}
try {
// In case of infinite log.
Thread.sleep(10);
} catch (InterruptedException e1) {
// Indicates the task should stop.
break;
}
}
}
}
监听serverSocket连接,接受请求,封装HttpEventTask请求,通过线程池处理
逻辑如下
HttpEventTask.run
@Override
public void run() {
if (socket == null) {
return;
}
PrintWriter printWriter = null;
InputStream inputStream = null;
try {
long start = System.currentTimeMillis();
inputStream = new BufferedInputStream(socket.getInputStream());
OutputStream outputStream = socket.getOutputStream();
printWriter = new PrintWriter(
new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
String firstLine = readLine(inputStream);
CommandCenterLog.info("[SimpleHttpCommandCenter] Socket income: " + firstLine
+ ", addr: " + socket.getInetAddress());
CommandRequest request = processQueryString(firstLine);
if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
// Deal with post method
processPostRequest(inputStream, request);
}
// Validate the target command.
String commandName = HttpCommandUtils.getTarget(request);
if (StringUtil.isBlank(commandName)) {
writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
return;
}
// Find the matching command handler.
System.out.println("sentinel dashboard 请求首行:"+firstLine);
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter);
} else {
// No matching command handler.
writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
}
long cost = System.currentTimeMillis() - start;
CommandCenterLog.info("[SimpleHttpCommandCenter] Deal a socket task: " + firstLine
+ ", address: " + socket.getInetAddress() + ", time cost: " + cost + " ms");
} catch (RequestException e) {
writeResponse(printWriter, e.getStatusCode(), e.getMessage());
} catch (Throwable e) {
CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenter error", e);
try {
if (printWriter != null) {
String errorMessage = SERVER_ERROR_MESSAGE;
e.printStackTrace();
if (!writtenHead) {
writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
} else {
printWriter.println(errorMessage);
}
printWriter.flush();
}
} catch (Exception e1) {
CommandCenterLog.warn("Failed to write error response", e1);
}
} finally {
closeResource(inputStream);
closeResource(printWriter);
closeResource(socket);
}
}
无非是解析命令,通过命令名称匹配对应的CommandHandler进行处理
4、HeartBeatSenderInitFunc
顾名思义发送心跳的处理器
@Override
public void init() {
// 获取默认的HeartBeatSender
HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
if (sender == null) {
RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
return;
}
// 如果有需要初始化调度器,线程数为2
initSchedulerIfNeeded();
// 获取心跳间隔
long interval = retrieveInterval(sender);
setIntervalIfNotExists(interval);
scheduleHeartbeatTask(sender, interval);
}
HeartbeatSenderProvider.getHeartbeatSender(); 获取心跳器
private static void resolveInstance() {
HeartbeatSender resolved = SpiLoader.of(HeartbeatSender.class).loadHighestPriorityInstance();
if (resolved == null) {
RecordLog.warn("[HeartbeatSenderProvider] WARN: No existing HeartbeatSender found");
} else {
heartbeatSender = resolved;
RecordLog.info("[HeartbeatSenderProvider] HeartbeatSender activated: {}", resolved.getClass()
.getCanonicalName());
}
}
同理通过SPI类加载,获取HeartBeatSender实现类
默认有两个实现类,优先加载优先级高的,SimpleHttpHeartBeatSender优先级高,所以默认为他
@Override
public void init() {
// 获取默认的HeartBeatSender
HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
if (sender == null) {
RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
return;
}
// 如果有需要初始化调度器,线程数为2
initSchedulerIfNeeded();
// 获取心跳间隔
long interval = retrieveInterval(sender);
setIntervalIfNotExists(interval);
scheduleHeartbeatTask(sender, interval);
}
默认加载这个配置项
private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
sender.sendHeartbeat();
} catch (Throwable e) {
RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
}
}
}, 5000, interval, TimeUnit.MILLISECONDS);
RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
+ sender.getClass().getCanonicalName());
}
然后开一个定时调度器,定时发送心跳,延迟5s开始,应该是每隔10s发送一次
@Override
public boolean sendHeartbeat() throws Exception {
if (TransportConfig.getRuntimePort() <= 0) {
RecordLog.info("[SimpleHttpHeartbeatSender] Command server port not initialized, won't send heartbeat");
return false;
}
Endpoint addrInfo = getAvailableAddress();
if (addrInfo == null) {
return false;
}
SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());
request.setParams(heartBeat.generateCurrentMessage());
try {
SimpleHttpResponse response = httpClient.post(request);
if (response.getStatusCode() == OK_STATUS) {
return true;
} else if (clientErrorCode(response.getStatusCode()) || serverErrorCode(response.getStatusCode())) {
RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo
+ ", http status code: " + response.getStatusCode());
}
} catch (Exception e) {
RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo, e);
}
return false;
}
发送心跳包,无非是发送一些当前服务的基本信息,维持下心跳
5、总结
Sentinel初始化,有个几个核心对象
- SphU sentinel基本API
- Env 组件初始化
- CommandCenterInitFunc 命令中心,负责接收命令
- CommandHandler,接收的命令,最终的处理器
- HeartBeatSenderInitFunc 心跳发送器,维护与dashboard的心跳,10s一次
简单吧嘿嘿!!!