Elasticsearch 6.1.0 启动过程

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 开篇  这篇文章主要目的是想梳理下elasticserach在启动过程中的核心步骤,宏观上讲解清楚elasticsearch启动过程中都做了哪些事情。  原本想通过流程图来进行画,后来网上有人通过xmind来分析整个过程,发现也能够讲解的非常清楚,因此同样采用xmind来自上而下讲解整个过程。

开篇

 这篇文章主要目的是想梳理下elasticserach在启动过程中的核心步骤,宏观上讲解清楚elasticsearch启动过程中都做了哪些事情。

 原本想通过流程图来进行画,后来网上有人通过xmind来分析整个过程,发现也能够讲解的非常清楚,因此同样采用xmind来自上而下讲解整个过程。


启动过程图

说明:

  • 1.通过XMind记录ES启动流程的整个过程。
  • 2.阅读顺序从上往下,标红色旗子的部分是核心流程
  • 3.核心流程我概括为:配置加载;Bootstrap 初始化; Bootstrap setup过程;Bootstrap start过程。
  • 4.每个步骤当中细分下去很多逻辑,这里只讲解能够串联整个过程的逻辑。

elasticsearch 启动过程


配置加载过程


Bootstrap 初始化

  • Elasticsearch的一个重要作用是解析命令参数。
  • 执行带 -h 参数的Elasticsearch启动命令。
  • Elasticsearch的构造函数如下所示,跟帮助信息是一致的。
    // elasticsearch启动命令帮助 
    Elasticsearch() {
        super("starts elasticsearch", () -> {}); 
        versionOption = parser.acceptsAll(Arrays.asList("V", "version"),
            "Prints elasticsearch version information and exits");
        daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),
            "Starts Elasticsearch in the background")
            .availableUnless(versionOption);
        pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"),
            "Creates a pid file in the specified path on start")
            .availableUnless(versionOption)
            .withRequiredArg()
            .withValuesConvertedBy(new PathConverter());
        quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"),
            "Turns off standard output/error streams logging in console")
            .availableUnless(versionOption)
            .availableUnless(daemonizeOption);
    }



Elasticsearch.main过程

    // elasticsearch启动入口函数
    public static void main(final String[] args) throws Exception {
        System.setSecurityManager(new SecurityManager() {
            @Override
            public void checkPermission(Permission perm) {
            }
        });
        LogConfigurator.registerErrorListener();
        final Elasticsearch elasticsearch = new Elasticsearch();
        int status = main(args, elasticsearch, Terminal.DEFAULT);
        if (status != ExitCodes.OK) {
            exit(status);
        }
    }

    // 调用elasticsearch对象的main函数
    static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {
        return elasticsearch.main(args, terminal);
    }
  • 1.创建 SecurityManager 安全管理器
  • 2.LogConfigurator.registerErrorListener() 注册侦听器
  • 3.创建Elasticsearch对象
  • 4.进入elasticsearch.main()过程。



elasticsearch.main过程

说明:

  • Elasticsearch继承关系如上图。
  • elasticsearch.main()过程当中会调用EnvironmentAwareCommand、Command等类。
class Elasticsearch extends EnvironmentAwareCommand {

    static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {
        return elasticsearch.main(args, terminal);
    }
}


public abstract class Command implements Closeable {

    public final int main(String[] args, Terminal terminal) throws Exception {
        if (addShutdownHook()) {
            shutdownHookThread = new Thread(() -> {
                try {
                    this.close();
                } catch (final IOException e) {
                    try (
                        StringWriter sw = new StringWriter();
                        PrintWriter pw = new PrintWriter(sw)) {
                        e.printStackTrace(pw);
                        terminal.println(sw.toString());
                    } catch (final IOException impossible) {
                    }
                }
            });
            Runtime.getRuntime().addShutdownHook(shutdownHookThread);
        }

        beforeMain.run();

        try {
            mainWithoutErrorHandling(args, terminal);
        } catch (OptionException e) {
            return ExitCodes.USAGE;
        } catch (UserException e) {
            return e.exitCode;
        }
        return ExitCodes.OK;
    }

    void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {
        final OptionSet options = parser.parse(args);

        if (options.has(helpOption)) {
            printHelp(terminal);
            return;
        }

        if (options.has(silentOption)) {
            terminal.setVerbosity(Terminal.Verbosity.SILENT);
        } else if (options.has(verboseOption)) {
            terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
        } else {
            terminal.setVerbosity(Terminal.Verbosity.NORMAL);
        }

        execute(terminal, options);
    }
}
  • 5.elasticsearch.main()直接进入Command.main()方法。
  • 6.Command.main()给Runtime类添加一个hook线程,该线程作用是:当Runtime异常关闭时打印异常信息。
  • 7.Command.mainWithoutErrorHandling 方法,根据命令行参数打印或者设置参数,然后执行命令。
  • 8.进入EnvironmentAwareCommand.execute()方法。

EnvironmentAwareCommand.execute过程

public abstract class EnvironmentAwareCommand extends Command {
    protected void execute(Terminal terminal, OptionSet options) throws Exception {
        final Map<String, String> settings = new HashMap<>();
        for (final KeyValuePair kvp : settingOption.values(options)) {
            if (kvp.value.isEmpty()) {
                throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty");
            }
            if (settings.containsKey(kvp.key)) {
                final String message = String.format(
                        Locale.ROOT,
                        "setting [%s] already set, saw [%s] and [%s]",
                        kvp.key,
                        settings.get(kvp.key),
                        kvp.value);
                throw new UserException(ExitCodes.USAGE, message);
            }
            settings.put(kvp.key, kvp.value);
        }

        putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
        putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
        putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");

        execute(terminal, options, createEnv(terminal, settings));
    }


    protected Environment createEnv(final Terminal terminal, final Map<String, String> settings) throws UserException {
        final String esPathConf = System.getProperty("es.path.conf");
        if (esPathConf == null) {
            throw new UserException(ExitCodes.CONFIG, "the system property [es.path.conf] must be set");
        }
        return InternalSettingsPreparer.prepareEnvironment(Settings.EMPTY, terminal, settings, getConfigPath(esPathConf));
    }


    protected abstract void execute(Terminal terminal, OptionSet options,
                                    Environment env) throws Exception;
}
  • 9.EnvironmentAwareCommand.execute,确保 es.path.data, es.path.home, es.path.logs 等参数已设置,否则从 System.properties 中读取。
  • 10.createEnv最后返回一个 Environment 对象,包含plugins,bin,lib,modules等目录下的文件信息
  • 11.进入elasticsearch的execute()方法。



prepareEnvironment过程

public class InternalSettingsPreparer {

    public static Environment prepareEnvironment(Settings input, Terminal terminal, 
                                                 Map<String, String> properties, Path configPath) {
        // just create enough settings to build the environment, to get the config dir
        Settings.Builder output = Settings.builder();
        initializeSettings(output, input, properties);
        Environment environment = new Environment(output.build(), configPath);

        if (Files.exists(environment.configFile().resolve("elasticsearch.yaml"))) {
        throw new SettingsException("elasticsearch.yaml was deprecated in 5.5.0 and must be renamed to elasticsearch.yml");
        }

        if (Files.exists(environment.configFile().resolve("elasticsearch.json"))) {
        throw new SettingsException("elasticsearch.json was deprecated in 5.5.0 and must be converted to elasticsearch.yml");
        }

        output = Settings.builder(); // start with a fresh output
        Path path = environment.configFile().resolve("elasticsearch.yml");
        if (Files.exists(path)) {
            try {
                output.loadFromPath(path);
            } catch (IOException e) {
                throw new SettingsException("Failed to load settings from " + path.toString(), e);
            }
        }

        // re-initialize settings now that the config file has been loaded
        initializeSettings(output, input, properties);
        finalizeSettings(output, terminal);

        environment = new Environment(output.build(), configPath);

        // we put back the path.logs so we can use it in the logging configuration file
        output.put(Environment.PATH_LOGS_SETTING.getKey(), environment.logsFile().toAbsolutePath().normalize().toString());
        return new Environment(output.build(), configPath);
    }
}


public class Environment {

    public Environment(final Settings settings, final Path configPath) {
        final Path homeFile;
        if (PATH_HOME_SETTING.exists(settings)) {
            homeFile = PathUtils.get(PATH_HOME_SETTING.get(settings)).normalize();
        } else {
            throw new IllegalStateException(PATH_HOME_SETTING.getKey() + " is not configured");
        }

        if (configPath != null) {
            configFile = configPath.normalize();
        } else {
            configFile = homeFile.resolve("config");
        }

        pluginsFile = homeFile.resolve("plugins");

        List<String> dataPaths = PATH_DATA_SETTING.get(settings);
        final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
        if (DiscoveryNode.nodeRequiresLocalStorage(settings)) {
            if (dataPaths.isEmpty() == false) {
                dataFiles = new Path[dataPaths.size()];
                dataWithClusterFiles = new Path[dataPaths.size()];
                for (int i = 0; i < dataPaths.size(); i++) {
                    dataFiles[i] = PathUtils.get(dataPaths.get(i));
                    dataWithClusterFiles[i] = dataFiles[i].resolve(clusterName.value());
                }
            } else {
                dataFiles = new Path[]{homeFile.resolve("data")};
                dataWithClusterFiles = new Path[]{homeFile.resolve("data").resolve(clusterName.value())};
            }
        } else {
            if (dataPaths.isEmpty()) {
                dataFiles = dataWithClusterFiles = EMPTY_PATH_ARRAY;
            } else {
                final String paths = String.join(",", dataPaths);
                throw new IllegalStateException("node does not require local storage yet path.data is set to [" + paths + "]");
            }
        }
        if (PATH_SHARED_DATA_SETTING.exists(settings)) {
            sharedDataFile = PathUtils.get(PATH_SHARED_DATA_SETTING.get(settings)).normalize();
        } else {
            sharedDataFile = null;
        }
        List<String> repoPaths = PATH_REPO_SETTING.get(settings);
        if (repoPaths.isEmpty()) {
            repoFiles = EMPTY_PATH_ARRAY;
        } else {
            repoFiles = new Path[repoPaths.size()];
            for (int i = 0; i < repoPaths.size(); i++) {
                repoFiles[i] = PathUtils.get(repoPaths.get(i));
            }
        }

        // this is trappy, Setting#get(Settings) will get a fallback setting yet return false for Settings#exists(Settings)
        if (PATH_LOGS_SETTING.exists(settings)) {
            logsFile = PathUtils.get(PATH_LOGS_SETTING.get(settings)).normalize();
        } else {
            logsFile = homeFile.resolve("logs");
        }

        if (PIDFILE_SETTING.exists(settings)) {
            pidFile = PathUtils.get(PIDFILE_SETTING.get(settings)).normalize();
        } else {
            pidFile = null;
        }

        binFile = homeFile.resolve("bin");
        libFile = homeFile.resolve("lib");
        modulesFile = homeFile.resolve("modules");

        Settings.Builder finalSettings = Settings.builder().put(settings);
        finalSettings.put(PATH_HOME_SETTING.getKey(), homeFile);
        if (PATH_DATA_SETTING.exists(settings)) {
            finalSettings.putList(PATH_DATA_SETTING.getKey(), dataPaths);
        }
        finalSettings.put(PATH_LOGS_SETTING.getKey(), logsFile.toString());
        this.settings = finalSettings.build();
    }
}
  • 12.createEnv最后返回一个 Environment 对象。



elasticsearch.execute过程

class Elasticsearch extends EnvironmentAwareCommand {

    protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
        if (options.nonOptionArguments().isEmpty() == false) {
           
        }
        if (options.has(versionOption)) {
            terminal.println("Version: " + Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot())
                    + ", Build: " + Build.CURRENT.shortHash() + "/" + Build.CURRENT.date()
                    + ", JVM: " + JvmInfo.jvmInfo().version());
            return;
        }

        final boolean daemonize = options.has(daemonizeOption);
        final Path pidFile = pidfileOption.value(options);
        final boolean quiet = options.has(quietOption);

        try {
            init(daemonize, pidFile, quiet, env);
        } catch (NodeValidationException e) {
        }
    }

    void init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv)
        throws NodeValidationException, UserException {
        try {
            Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);
        } catch (BootstrapException | RuntimeException e) {
        }
    }
}
  • 13.Elasticsearch.execute ,读取daemonize, pidFile,quiet 的值,并 确保配置的临时目录(temp)是有效目录
  • 14.进入Bootstrap初始化阶段


Bootstrap init过程

    static void init(
            final boolean foreground,
            final Path pidFile,
            final boolean quiet,
            final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {

        BootstrapInfo.init();

        INSTANCE = new Bootstrap();

        final SecureSettings keystore = loadSecureSettings(initialEnv);
        final Environment environment = createEnvironment(foreground, 
                                                          pidFile, keystore, initialEnv.settings(), initialEnv.configFile());
        try {
            LogConfigurator.configure(environment);
        } catch (IOException e) {
            throw new BootstrapException(e);
        }
        if (environment.pidFile() != null) {
            try {
                PidFile.create(environment.pidFile(), true);
            } catch (IOException e) {
                throw new BootstrapException(e);
            }
        }

        final boolean closeStandardStreams = (foreground == false) || quiet;
        try {
            if (closeStandardStreams) {
                final Logger rootLogger = ESLoggerFactory.getRootLogger();
                final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
                if (maybeConsoleAppender != null) {
                    Loggers.removeAppender(rootLogger, maybeConsoleAppender);
                }
                closeSystOut();
            }

            checkLucene();

            Thread.setDefaultUncaughtExceptionHandler(
                new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings())));

            INSTANCE.setup(true, environment);

            try {
                IOUtils.close(keystore);
            } catch (IOException e) {
                throw new BootstrapException(e);
            }

            INSTANCE.start();

            if (closeStandardStreams) {
                closeSysError();
            }
        } catch (NodeValidationException | RuntimeException e) {
            throw e;
        }
    }
  • 15.创建Bootstrap对象, INSTANCE = new Bootstrap()。
  • 16.初始化Bootstrap对象,INSTANCE.setup(true, environment)。
  • 17.启动Bootstrap对象INSTANCE.start()。


Bootstrap new过程

    Bootstrap() {
        keepAliveThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    keepAliveLatch.await();
                } catch (InterruptedException e) {
                    // bail out
                }
            }
        }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
        keepAliveThread.setDaemon(false);
        // keep this thread alive (non daemon thread) until we shutdown
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                keepAliveLatch.countDown();
            }
        });
    }
  • 18.创建Bootstrap对象,该类构造函数会创建一个用户线程添加到Runtime Hook中,进行 countDown 操作。


Bootstrap setup过程

    private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
        Settings settings = environment.settings();

        try {
            spawner.spawnNativePluginControllers(environment);
        } catch (IOException e) {
            throw new BootstrapException(e);
        }

        initializeNatives(
                environment.tmpFile(),
                BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
                BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
                BootstrapSettings.CTRLHANDLER_SETTING.get(settings));

        // initialize probes before the security manager is installed
        initializeProbes();

        if (addShutdownHook) {
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    try {
                        IOUtils.close(node, spawner);
                        LoggerContext context = (LoggerContext) LogManager.getContext(false);
                        Configurator.shutdown(context);
                    } catch (IOException ex) {
                        throw new ElasticsearchException("failed to stop node", ex);
                    }
                }
            });
        }

        try {
            JarHell.checkJarHell();
        } catch (IOException | URISyntaxException e) {
        }

        IfConfig.logIfNecessary();

        try {
            Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
        } catch (IOException | NoSuchAlgorithmException e) {
        }

        node = new Node(environment) {
            @Override
            protected void validateNodeBeforeAcceptingRequests(
                final BootstrapContext context,
                final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks)
                                                               throws NodeValidationException {
                BootstrapChecks.check(context, boundTransportAddress, checks);
            }
        };
    }
  • 19.为每个插件生成生成本机控制类,spawner.spawnNativePluginControllers(environment);尝试为给定模块生成控制器(native Controller)守护程序。 生成的进程将通过其stdin,stdout和stderr流保持与此JVM的连接,但对此包之外的代码不能使用对这些流的引用。
  • 20.初始化本地资源 initializeNatives()。
  • 21.使用 JarHell 检查重复的 jar 文件 JarHell.checkJarHell()。
  • 22.创建 node 节点 new Node(environment),核心!!!


Bootstrap start过程

    private void start() throws NodeValidationException {
        node.start();
        keepAliveThread.start();
    }
  • 23.启动node对象,node.start()。
  • 24.启动守护进程,keepAliveThread.start()。


node创建过程

    protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
 
        try {
            Settings tmpSettings = Settings.builder().put(environment.settings())
                .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();

            try {
                nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
                resourcesToClose.add(nodeEnvironment);
            } catch (IOException ex) {
            }
            final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
            Logger logger = Loggers.getLogger(Node.class, tmpSettings);
            final String nodeId = nodeEnvironment.nodeId();
            tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);

            final JvmInfo jvmInfo = JvmInfo.jvmInfo();

            this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), 
                                                     environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
            this.settings = pluginsService.updatedSettings();
            localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());

            this.environment = new Environment(this.settings, environment.configFile());

            final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
            final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
            resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));

            DeprecationLogger.setThreadContext(threadPool.getThreadContext());
            resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
            
            final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
            final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
            for (final ExecutorBuilder<?> builder : threadPool.builders()) {
                additionalSettings.addAll(builder.getRegisteredSettings());
            }
            client = new NodeClient(settings, threadPool);
            ..................
}
  • 25.创建一个 NodeEnvironment 对象保存节点环境信息,如各种数据文件的路径
  • 26.读取JVM信息
  • 27.创建 PluginsService 对象,创建过程中会读取并加载所有的模块和插件
  • 28.创建一个最终的 Environment 对象
  • 29.创建线程池 ThreadPool 后面各类对象基本都是通过线程来提供服务,这个线程池可以管理各类线程
  • 30.创建 节点客户端 NodeClient
    protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
            ..................
            final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
            final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
            AnalysisModule analysisModule = new AnalysisModule(this.environment, 
pluginsService.filterPlugins(AnalysisPlugin.class));
            final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, 
additionalSettingsFilter);
            scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
            resourcesToClose.add(resourceWatcherService);
            final NetworkService networkService = new NetworkService(
                getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));

            List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
            final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
               ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));
            clusterService.addListener(scriptModule.getScriptService());
            resourcesToClose.add(clusterService);
            final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
                scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), 
                       pluginsService.filterPlugins(IngestPlugin.class));
            final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
                clusterService.getClusterSettings(), client);
            final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
                listener::onNewInfo);
            final UsageService usageService = new UsageService(settings);
}
  • 31.创建各种服务类对象 ResourceWatcherService、NetworkService、ClusterService、IngestService、ClusterInfoService、UsageService、MonitorService、CircuitBreakerService、MetaStateService、IndicesService、MetaDataIndexUpgradeService、TemplateUpgradeService、TransportService、ResponseCollectorService、SearchTransportService、NodeService、SearchService、PersistentTasksClusterService,这些服务类是的功能可以根据名称做一个大概的判断。
    protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
            ..................
            ModulesBuilder modules = new ModulesBuilder();
            // plugin modules must be added here, before others or we can get crazy injection errors...
            for (Module pluginModule : pluginsService.createGuiceModules()) {
                modules.add(pluginModule);
            }
            final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, 
                                   threadPool, clusterInfoService);
            ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
            modules.add(clusterModule);
            IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
            modules.add(indicesModule);

            SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
            CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
                settingsModule.getClusterSettings());
            resourcesToClose.add(circuitBreakerService);
            modules.add(new GatewayModule());


            ActionModule actionModule = new ActionModule(false, settings, 
                        clusterModule.getIndexNameExpressionResolver(),
                settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), 
                 settingsModule.getSettingsFilter(),
                threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
            modules.add(actionModule);

            final RestController restController = actionModule.getRestController();
            final NetworkModule networkModule = new NetworkModule(settings, false, 
                                                        pluginsService.filterPlugins(NetworkPlugin.class),
                    threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, 
                                            networkService, restController);
            Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =
                pluginsService.filterPlugins(Plugin.class)
               .stream().map(Plugin::getCustomMetaDataUpgrader)
                    .collect(Collectors.toList());
            Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders =
                     pluginsService.filterPlugins(Plugin.class).stream()
                    .map(Plugin::getIndexTemplateMetaDataUpgrader)
                    .collect(Collectors.toList());
            Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class)
            .stream().map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
            final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, 
                                                   indexTemplateMetaDataUpgraders);
            final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, 
                xContentRegistry,indicesModule.getMapperRegistry(),
               settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders);
            final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, nodeEnvironment, metaStateService,
                metaDataIndexUpgradeService, metaDataUpgrader);
            new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
            final Transport transport = networkModule.getTransportSupplier().get();
            final TransportService transportService = newTransportService(settings, transport, threadPool,
                networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
            final ResponseCollectorService responseCollectorService = 
                                                              new ResponseCollectorService(this.settings, clusterService);
            final SearchTransportService searchTransportService =  new SearchTransportService(settings, 
               transportService,SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
            final Consumer<Binder> httpBind;
            final HttpServerTransport httpServerTransport;
            if (networkModule.isHttpEnabled()) {
                httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
                httpBind = b -> {
                    b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
                };
            } else {
                httpBind = b -> {
                    b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
                };
                httpServerTransport = null;
            }

            final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, 
                                   threadPool, transportService, namedWriteableRegistry,
                networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
                clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
                clusterModule.getAllocationService());

            this.nodeService = new NodeService(settings, threadPool, monitorService, 
                                  discoveryModule.getDiscovery(),
                transportService, indicesService, pluginsService, circuitBreakerService, 
                                   scriptModule.getScriptService(),
                httpServerTransport, ingestService, clusterService,
                                 settingsModule.getSettingsFilter(), responseCollectorService,
                searchTransportService);
  • 32.ModulesBuilder类加入各种模块 ScriptModule、AnalysisModule、SettingsModule、pluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModule
    protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
            ..................
            modules.add(b -> {
                    b.bind(Node.class).toInstance(this);
                    b.bind(NodeService.class).toInstance(nodeService);
                    b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
                    b.bind(PluginsService.class).toInstance(pluginsService);
                    b.bind(Client.class).toInstance(client);
                    b.bind(NodeClient.class).toInstance(client);
                    b.bind(Environment.class).toInstance(this.environment);
                    b.bind(ThreadPool.class).toInstance(threadPool);
                    b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
                    b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
                    b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
                    b.bind(BigArrays.class).toInstance(bigArrays);
                    b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
                    b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
                    b.bind(IngestService.class).toInstance(ingestService);
                    b.bind(UsageService.class).toInstance(usageService);
                    b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
                    b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
                    b.bind(MetaStateService.class).toInstance(metaStateService);
                    b.bind(IndicesService.class).toInstance(indicesService);
                    b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,
                        threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
                        responseCollectorService));
                    b.bind(SearchTransportService.class).toInstance(searchTransportService);
                    b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
                            scriptModule.getScriptService()));
                    b.bind(Transport.class).toInstance(transport);
                    b.bind(TransportService.class).toInstance(transportService);
                    b.bind(NetworkService.class).toInstance(networkService);
                    b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
                    b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);
                    b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
                    b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
                    b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
                    {
                        RecoverySettings recoverySettings = new RecoverySettings(settings, 
settingsModule.getClusterSettings());

                        processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);

                        b.bind(PeerRecoverySourceService.class).toInstance(
                                                  new PeerRecoverySourceService(settings, transportService,
                                indicesService, recoverySettings));
                        b.bind(PeerRecoveryTargetService.class).toInstance(
                                 new PeerRecoveryTargetService(settings, threadPool,
                                transportService, recoverySettings, clusterService));
                    }
                    httpBind.accept(b);
                    pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
                }
            );
            injector = modules.createInjector();

            clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class));

            List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
                .filter(p -> p instanceof LifecycleComponent)
                .map(p -> (LifecycleComponent) p).collect(Collectors.toList());
            pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
                .map(injector::getInstance).collect(Collectors.toList()));
            resourcesToClose.addAll(pluginLifecycleComponents);
            this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
            client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
                    () -> clusterService.localNode().getId());

            if (NetworkModule.HTTP_ENABLED.get(settings)) {
                logger.debug("initializing HTTP handlers ...");
                actionModule.initRestHandlers(() -> clusterService.state().nodes());
            }

            success = true;
        } catch (IOException ex) {
        } finally {
        }
    }
  • 33.elasticsearch里面的组件基本都进行进行了模块化管理,elasticsearch对guice进行了封装通过ModulesBuilder类构建es的模块.


node启动过程

    public Node start() throws NodeValidationException {
        if (!lifecycle.moveToStarted()) {
            return this;
        }

        Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
        logger.info("starting ...");
        pluginLifecycleComponents.forEach(LifecycleComponent::start);

        injector.getInstance(MappingUpdatedAction.class).setClient(client);
        injector.getInstance(IndicesService.class).start();
        injector.getInstance(IndicesClusterStateService.class).start();
        injector.getInstance(SnapshotsService.class).start();
        injector.getInstance(SnapshotShardsService.class).start();
        injector.getInstance(RoutingService.class).start();
        injector.getInstance(SearchService.class).start();
        nodeService.getMonitorService().start();

        final ClusterService clusterService = injector.getInstance(ClusterService.class);

        final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
        nodeConnectionsService.start();
        clusterService.setNodeConnectionsService(nodeConnectionsService);

        injector.getInstance(ResourceWatcherService.class).start();
        injector.getInstance(GatewayService.class).start();
        Discovery discovery = injector.getInstance(Discovery.class);
        clusterService.getMasterService().setClusterStatePublisher(discovery::publish);

        // Start the transport service now so the publish address will be added to the local disco node in ClusterService
        TransportService transportService = injector.getInstance(TransportService.class);
        transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
        transportService.start();
        assert localNodeFactory.getNode() != null;
        assert transportService.getLocalNode().equals(localNodeFactory.getNode())
            : "transportService has a different local node than the factory provided";
        final MetaData onDiskMetadata;
        try {
            // we load the global state here (the persistent part of the cluster state stored on disk) to
            // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
            if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
                onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
            } else {
                onDiskMetadata = MetaData.EMPTY_META_DATA;
            }
            assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), 
             transportService.boundAddress(), pluginsService
            .filterPlugins(Plugin
            .class)
            .stream()
            .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));

        clusterService.addStateApplier(transportService.getTaskManager());
        // start after transport service so the local disco is known
        discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
        clusterService.start();
        assert clusterService.localNode().equals(localNodeFactory.getNode())
            : "clusterService has a different local node than the factory provided";
        transportService.acceptIncomingRequests();
        discovery.startInitialJoin();
        // tribe nodes don't have a master so we shouldn't register an observer         s
        final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
        if (initialStateTimeout.millis() > 0) {
            final ThreadPool thread = injector.getInstance(ThreadPool.class);
            ClusterState clusterState = clusterService.state();
            ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, 
                                                              null, logger, thread.getThreadContext());
            if (clusterState.nodes().getMasterNodeId() == null) {
                logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
                final CountDownLatch latch = new CountDownLatch(1);
                observer.waitForNextChange(new ClusterStateObserver.Listener() {
                    @Override
                    public void onNewClusterState(ClusterState state) { latch.countDown(); }

                    @Override
                    public void onClusterServiceClose() {
                        latch.countDown();
                    }

                    @Override
                    public void onTimeout(TimeValue timeout) {
                        logger.warn("timed out while waiting for initial discovery state - timeout: {}",
                            initialStateTimeout);
                        latch.countDown();
                    }
                }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);

                try {
                    latch.await();
                } catch (InterruptedException e) {
                    throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
                }
            }
        }


        if (NetworkModule.HTTP_ENABLED.get(settings)) {
            injector.getInstance(HttpServerTransport.class).start();
        }

        if (WRITE_PORTS_FILE_SETTING.get(settings)) {
            if (NetworkModule.HTTP_ENABLED.get(settings)) {
                HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
                writePortsFile("http", http.boundAddress());
            }
            TransportService transport = injector.getInstance(TransportService.class);
            writePortsFile("transport", transport.boundAddress());
        }

        pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);

        return this;
    }
  • 34.通过 injector 获取各个类的对象,调用 start() 方法启动(实际进入各个类的中 doStart 方法): LifecycleComponent、IndicesService、IndicesClusterStateService、SnapshotsService、SnapshotShardsService、RoutingService、SearchService、MonitorService、NodeConnectionsService、ResourceWatcherService、GatewayService、Discovery、TransportService
  • 35.IndicesService:索引管理
    IndicesClusterStateService:跨集群同步

SnapshotsService:负责创建快照
SnapshotShardsService:此服务在数据和主节点上运行,并控制这些节点上当前快照的分片。
RoutingService:侦听集群状态,当它收到集群改变事件将验证集群状态,路由表可能会更新
SearchService:搜索服务
MonitorService:监控
NodeConnectionsService:此组件负责在节点添加到群集状态后连接到节点,并在删除它们时断开连接。 此外,它会定期检查所有连接是否仍处于打开状态,并在需要时还原它们。 请注意,如果节点断开/不响应ping,则此组件不负责从群集中删除节点。 这是由NodesFaultDetection完成的。 主故障检测由链接MasterFaultDetection完成。
ResourceWatcherService:通用资源观察器服务
GatewayService:网关

  • 36.集群发现与监控等,启动 HttpServerTransport, 绑定服务端口。


线程池

    public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
        super(settings);

        assert Node.NODE_NAME_SETTING.exists(settings);

        final Map<String, ExecutorBuilder> builders = new HashMap<>();
        final int availableProcessors = EsExecutors.numberOfProcessors(settings);
        final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
        final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
        final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
        builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, 
                      genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
        builders.put(Names.INDEX, 
                          new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
        builders.put(Names.BULK, 
                          new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); 
        builders.put(Names.GET, 
                          new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
        builders.put(Names.SEARCH, 
                        new AutoQueueAdjustingExecutorBuilder(settings,
                        Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
        builders.put(Names.MANAGEMENT, 
         new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, 
TimeValue.timeValueMinutes(5)));
        builders.put(Names.LISTENER, 
                    new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));
        builders.put(Names.FLUSH, 
   new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
        builders.put(Names.REFRESH, 
                      new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
        builders.put(Names.WARMER, 
                    new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
        builders.put(Names.SNAPSHOT, 
                  new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
        builders.put(Names.FETCH_SHARD_STARTED, 
        new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
        builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));
        builders.put(Names.FETCH_SHARD_STORE, 
        new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
        for (final ExecutorBuilder<?> builder : customBuilders) {
            if (builders.containsKey(builder.name())) {
                throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");
            }
            builders.put(builder.name(), builder);
        }
        this.builders = Collections.unmodifiableMap(builders);

        threadContext = new ThreadContext(settings);

        final Map<String, ExecutorHolder> executors = new HashMap<>();
        for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
            final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
            final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
            if (executors.containsKey(executorHolder.info.getName())) {
            }
            logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));
            executors.put(entry.getKey(), executorHolder);
        }

        executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
        this.executors = unmodifiableMap(executors);
        this.scheduler = Scheduler.initScheduler(settings);
        TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
        this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(
                                                  settings, "[timer]"), estimatedTimeInterval.millis());
        this.cachedTimeThread.start();
    }

线程池类型 ThreadPoolType

fixed(固定):fixed线程池拥有固定数量的线程来处理请求,在没有空闲线程时请求将被挂在队列中。queue_size参数可以控制在没有空闲线程时,能排队挂起的请求数
fixed_auto_queue_size:此类型为实验性的,将被更改或删除,不关注
scaling(弹性):scaling线程池拥有的线程数量是动态的,这个数字介于core和max参数的配置之间变化。keep_alive参数用来控制线程在线程池中空闲的最长时间
direct:此类线程是一种不支持关闭的线程,就意味着一旦使用,则会一直存活下去.

一些重要的线程池

generic:用于通用的请求(例如:后台节点发现),线程池类型为 scaling。
index:用于index/delete请求,线程池类型为 fixed, 大小的为处理器数量,队列大小为200,最大线程数为 1 + 处理器数量。
search:用于count/search/suggest请求。线程池类型为 fixed, 大小的为 int((处理器数量 3) / 2) +1,队列大小为1000。*
get:用于get请求。线程池类型为 fixed,大小的为处理器数量,队列大小为1000。
analyze:用于analyze请求。线程池类型为 fixed,大小的1,队列大小为16
write:用于单个文档的 index/delete/update 请求以及 bulk 请求,线程池类型为 fixed,大小的为处理器数量,队列大小为200,最大线程数为 1 + 处理器数量。
snapshot:用于snaphost/restore请求。线程池类型为 scaling,线程保持存活时间为5分钟,最大线程数为min(5, (处理器数量)/2)。
warmer:用于segment warm-up请求。线程池类型为 scaling,线程保持存活时间为5分钟,最大线程数为min(5, (处理器数量)/2)。
refresh:用于refresh请求。线程池类型为 scaling,线程空闲保持存活时间为5分钟,最大线程数为min(10, (处理器数量)/2)。
listener:主要用于Java客户端线程监听器被设置为true时执行动作。线程池类型为 scaling,最大线程数为min(10, (处理器数量)/2)。


PluginsService插件服务

    public static final String ES_PLUGIN_PROPERTIES = "plugin-descriptor.properties";
    public static final String ES_PLUGIN_POLICY = "plugin-security.policy";

    public static PluginInfo readFromProperties(final Path path) throws IOException {
        final Path descriptor = path.resolve(ES_PLUGIN_PROPERTIES);
        final Properties props = new Properties();
        try (InputStream stream = Files.newInputStream(descriptor)) {
            props.load(stream);
        }
        final String name = props.getProperty("name");
        if (name == null || name.isEmpty()) {
            throw new IllegalArgumentException(
                    "property [name] is missing in [" + descriptor + "]");
        }
        final String description = props.getProperty("description");
        if (description == null) {
            throw new IllegalArgumentException(
                    "property [description] is missing for plugin [" + name + "]");
        }
        final String version = props.getProperty("version");
        if (version == null) {
            throw new IllegalArgumentException(
                    "property [version] is missing for plugin [" + name + "]");
        }

        final String esVersionString = props.getProperty("elasticsearch.version");
        if (esVersionString == null) {
            throw new IllegalArgumentException(
                    "property [elasticsearch.version] is missing for plugin [" + name + "]");
        }
        final Version esVersion = Version.fromString(esVersionString);
        if (esVersion.equals(Version.CURRENT) == false) {
            final String message = String.format(
                    Locale.ROOT,
                    "plugin [%s] is incompatible with version [%s]; was designed for version [%s]",
                    name,
                    Version.CURRENT.toString(),
                    esVersionString);
            throw new IllegalArgumentException(message);
        }
        final String javaVersionString = props.getProperty("java.version");
        if (javaVersionString == null) {
            throw new IllegalArgumentException(
                    "property [java.version] is missing for plugin [" + name + "]");
        }
        JarHell.checkVersionFormat(javaVersionString);
        JarHell.checkJavaVersion(name, javaVersionString);
        final String classname = props.getProperty("classname");
        if (classname == null) {
            throw new IllegalArgumentException(
                    "property [classname] is missing for plugin [" + name + "]");
        }

        final String hasNativeControllerValue = props.getProperty("has.native.controller");
        final boolean hasNativeController;
        if (hasNativeControllerValue == null) {
            hasNativeController = false;
        } else {
            switch (hasNativeControllerValue) {
                case "true":
                    hasNativeController = true;
                    break;
                case "false":
                    hasNativeController = false;
                    break;
                default:
                    final String message = String.format(
                        Locale.ROOT,
                        "property [%s] must be [%s], [%s], or unspecified but was [%s]",
                        "has_native_controller",
                        "true",
                        "false",
                        hasNativeControllerValue);
                    throw new IllegalArgumentException(message);
            }
        }

        final String requiresKeystoreValue = props.getProperty("requires.keystore", "false");
        final boolean requiresKeystore;
        try {
            requiresKeystore = Booleans.parseBoolean(requiresKeystoreValue);
        } catch (IllegalArgumentException e) {
            throw new IllegalArgumentException("property [requires.keystore] must be [true] or [false]," +
                                               " but was [" + requiresKeystoreValue + "]", e);
        }

        return new PluginInfo(name, description, version, classname, hasNativeController, requiresKeystore);
    }
  • 读取模块的配置文件 plugin-descriptor.properties,解析出内容并存储到 Map中。
  • 分别校验 name, description, version, elasticsearch.version, java.version, classname, extended.plugins, has.native.controller, requires.keystore 这些配置项,缺失或者不按要求则抛出异常。
  • 根据配置项构造一个 PluginInfo 对象返回。


参考文章

Elasticsearch 6.3.2 启动过程

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
Java 测试技术 索引
【Elasticsearch 5.6.12 源码】——【3】启动过程分析(下)
本文主要解决以下问题: 1、ES启动过程中的Node对象都初始化了那些服务?
1336 0
|
前端开发 安全 Java
【Elasticsearch 5.6.12 源码】——【2】启动过程分析(上)
本文主要解决以下问题: 1、启动ES的入口类及入口方法是哪个? 2、分析梳理ES服务启动的主要流程?
1611 0
|
15天前
|
存储 安全 数据管理
如何在 Rocky Linux 8 上安装和配置 Elasticsearch
本文详细介绍了在 Rocky Linux 8 上安装和配置 Elasticsearch 的步骤,包括添加仓库、安装 Elasticsearch、配置文件修改、设置内存和文件描述符、启动和验证 Elasticsearch,以及常见问题的解决方法。通过这些步骤,你可以快速搭建起这个强大的分布式搜索和分析引擎。
32 5
|
1月前
|
存储 JSON Java
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
这篇文章是关于Elasticsearch的学习指南,包括了解Elasticsearch、版本对应、安装运行Elasticsearch和Kibana、安装head插件和elasticsearch-ik分词器的步骤。
142 0
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
|
2月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
3月前
|
数据可视化 Docker 容器
一文教会你如何通过Docker安装elasticsearch和kibana 【详细过程+图解】
这篇文章提供了通过Docker安装Elasticsearch和Kibana的详细过程和图解,包括下载镜像、创建和启动容器、处理可能遇到的启动失败情况(如权限不足和配置文件错误)、测试Elasticsearch和Kibana的连接,以及解决空间不足的问题。文章还特别指出了配置文件中空格的重要性以及环境变量中字母大小写的问题。
一文教会你如何通过Docker安装elasticsearch和kibana 【详细过程+图解】
|
3月前
|
JSON 自然语言处理 数据库
Elasticsearch从入门到项目部署 安装 分词器 索引库操作
这篇文章详细介绍了Elasticsearch的基本概念、倒排索引原理、安装部署、IK分词器的使用,以及如何在Elasticsearch中进行索引库的CRUD操作,旨在帮助读者从入门到项目部署全面掌握Elasticsearch的使用。
|
3月前
|
Ubuntu Oracle Java
如何在 Ubuntu VPS 上安装 Elasticsearch
如何在 Ubuntu VPS 上安装 Elasticsearch
42 0
|
3月前
|
存储 Ubuntu Oracle
在Ubuntu 14.04上安装和配置Elasticsearch的方法
在Ubuntu 14.04上安装和配置Elasticsearch的方法
45 0
|
3月前
|
存储 安全 Java
在CentOS 7上安装和配置Elasticsearch的方法
在CentOS 7上安装和配置Elasticsearch的方法
273 0
下一篇
无影云桌面