Rocketmq中的nameser启动流程(三)


1.程序启动的入口

程序启动的入口,在org.apache.rocketmq.namesrv.NamesrvStartup中

public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {

        try {
            NamesrvController controller = createNamesrvController(args); //2.创建NamesrvController
            start(controller);//3.启动namesrv
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

namesrv的启动主流程很清晰就两件事情,一个是创建流程,一个是启动流程,但是每个流程里面,又有很多的点


2.创建流程

我删除一些零散的逻辑,下面是主要的逻辑

 public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
	    //  2.1 创建nameserver的配置文件
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
	    //  2.2  创建对应的服务端配置文件
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
	    //  设置nameserver的监听端口号
        nettyServerConfig.setListenPort(9876);
        //  2.3 创建NameServer
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
        return controller;
    }

2.1 创建nameserver的配置文件(NamesrvConfig)

	//  rocketmq的路径,这个路径下面可以放置一下配置文件,例如logback.xml文件
   private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    //  kvConfigPath
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
   	// configStorePath 
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;
	//是否顺序消息
    private boolean orderMessageEnable = false;

2.2 创建对应的服务端配置文件(NettyServerConfig)

	//端口号,nameserver的端口号是:9876
   private int listenPort = 8888; 
   //工作线程数
    private int serverWorkerThreads = 8;
   // 目前在nameserver端没有用处
    private int serverCallbackExecutorThreads = 0;
    //selector线程数
    private int serverSelectorThreads = 3;
   // 单向发送消息的信号量个数
    private int serverOnewaySemaphoreValue = 256;
   //同步发送消息的信号量个数
    private int serverAsyncSemaphoreValue = 64;
   //心跳间隔时间
    private int serverChannelMaxIdleTimeSeconds = 120;
   // 发送缓冲区大小
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
   // 接受缓冲区大小
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
   // 是否使用内存池
    private boolean serverPooledByteBufAllocatorEnable = true;
   //是否启动别的IO模型
    private boolean useEpollNativeSelector = false;

2.3 创建NameServer

 public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
       //nameserver的配置
        this.namesrvConfig = namesrvConfig;
        //有关netty服务端的配置
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        //路由信息的配置,这个比较重要
        this.routeInfoManager = new RouteInfoManager();
        //监听broker下线信息,并通过RouteInfoManager中的方法移除内存中的broker信息
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        //配置文件存储逻辑
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }

//我们来看下一下 RouteInfoManager 里面都什么什么
public RouteInfoManager() {
        this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); //topicQueueTable 存放该topic下面对应的队列信息
        this.brokerAddrTable = new HashMap<String, BrokerData>(128); //brokerAddrTable 存放broker对应的信息
        this.clusterAddrTable = new HashMap<String, Set<String>>(32); // clusterAddrTable 存放集群对应的信息
        this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); //brokerLiveTable 存放活跃的broker信息
        this.filterServerTable = new HashMap<String, List<String>>(256); //filterServerTable 存放需要过滤的信息
    }

2.4 创建流程小结

主要是一些参数的设置和一些路由信息的变量初始化话工作


3.启动nameserver

先看一下启动的整体流程

public static NamesrvController start(final NamesrvController controller) throws Exception {
        //3.1 初始化namesrv
        boolean initResult = controller.initialize();
        //3.2 向JVM中注册Hook函数  JVM关闭之前,执行该Hook函数
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));
        //3.3 nameserv启动
        controller.start();
        return controller;
    }

主要分为了三个部分,初始化,向JVM中注册Hook函数,然后启动nameserver


3.1 初始化nameserver

public boolean initialize() {
	   //加载配置
        this.kvConfigManager.load();
        //3.1.1 初始化netty服务端
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
       //3.1.2初始化工作线程池,8个线程
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        //3.1.3注册默认处理器
        this.registerProcessor();
        //3.1.4每隔10分钟扫描一下不在存活的broker并剔除不存活的机器
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        //3.1.5每隔十分钟打印配置信息
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
        return true;
    }

主要是初始化netty服务端数据,初始化工作线程池,注册一些默认的处理器,和启动两个定时任务,下面看下这几个流程都干了什么

3.1.1 初始化netty服务端
 public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {
	    //设置单向发送的信号量个数和同步发送的信号量个数
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
	 	//设置brokerHousekeepingService
        this.channelEventListener = channelEventListener;
		//获取线程数
        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }
        //公共服务线程池(大概含义就是如果一个任务找不到对应的线程池处理,那么就放到这个地方)
        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
        //根据操作系统选择,使用Epoll方式还是使用NIO方式
        if (useEpoll()) {
            //监听TCP连接
            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });
            //socket 注册到 selector ,3个selector
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
			
            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
    }
3.1.2初始化工作线程池 (这个简单就不分析了)
3.1.3注册默认处理器
private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) { //从namesrvConfig中我们知道,该值为false,那么逻辑就是下面的
            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                this.remotingExecutor);
        } else {
			//这个最主要的就是把 处理器和线程池关联起来
            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
        }
    }
3.1.4 和 3.1.5 启动了两个定时任务

3.2 向JVM注册Hook函数

//这是一种优雅的停机方式
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

3.3 nameserv启动

这里是nameserver真正的启动流程,前面那么多步骤都是为了这个事情

  public void start() throws Exception {
        //启动remote-server
        this.remotingServer.start();
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

   public void start() {
        // 事件处理线程池
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
        //注册公共的handler
        prepareSharableHandlers();
		
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //心跳检测
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });
		//是否启用内存池
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
		//启动broker监听
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
		//启动超时任务
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

4.总结

nameserver在启动的过程中,可以看到有各种参数的设置和定时job启动,还有各种线程池的初始化等等工作

5.参考资料

http://rocketmq.apache.org/