网站首页 文章专栏 Apache DolphinScheduler源码分析 -- master模块
Apache DolphinScheduler源码分析 -- master模块

源码分析之 master 模块

从主类MasterServer开始,从启动到运行,master主要做了以下三件事情


  • Zookeeper 节点初始化

  • 构建并提交工作流实例,跟踪运行状态

  • 监控其他MasterServer和WorkerServer的健康状态并容错

  • 维系心跳

@PostConstruct
public void run(){
      //详情见1.zookeeper初始化
      zkMasterClient.init();
      //详情见2.MasterSchedulerThread线程
      masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
      //详情见3.heartBeatThread线程
      heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM);
}

1. zookeeper初始化

创建DS在zookeeper的相关节点,并判断是否对系统做failover,恢复异常的工作流实例和任务实例。

  • 用于master的failover /dolphinscheduler/lock/failover/master

  • 系统节点,保存master和worker的心跳信息 /dolphinscheduler/masters; /dolphinscheduler/workers;/dolphinscheduler/dead-servers

public void init(){
  logger.info("initialize master client...");
  this.initDao();
  InterProcessMutex mutex = null;
  try {
      // 创建分布式锁节点,用于master节点的failover
      String znodeLock = getMasterStartUpLockPath();
      mutex = new InterProcessMutex(zkClient, znodeLock);
      mutex.acquire();
      // 在ZK中初始化系统节点,
      this.initSystemZNode();
      // 向ZK的/masters节点注册当前的master信息
      this.registerMaster();
      // 通过监听Zookeeper临时节点变化来进行容错处理(如果活跃的master只有自身一个,则进行failover)
      if (getActiveMasterNum() == 1) {
          failoverWorker(null, true);  // 恢复任务实例 详情见1.1.
          failoverMaster(null);   // 恢复工作流实例 详情见1.2.
      }
  }catch (Exception e){
      logger.error("master start up  exception",e);
  }finally {
      releaseMutex(mutex);
  }
}


1.1. failoverWorker 恢复任务实例

private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
 logger.info("start worker[{}] failover ...", workerHost);
 
 List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
 for(TaskInstance taskInstance : needFailoverTaskInstanceList){
    if(needCheckWorkerAlive){
       if(!checkTaskInstanceNeedFailover(taskInstance)){
           // 不需要failover的两种情况
           // 1.任务详情中不存在host信息
           // 2.任务在ZK中存在,则判断启动时间是否小于worker启动时间,小于则不用failover
          continue;
        }
    }
 
    ProcessInstance instance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
    if(instance!=null){
       taskInstance.setProcessInstance(instance);
    }
    // 如果任务中有yarn的任务则杀掉,kill的方式,日志中用正则匹配containId的格式,获取containID,用yarn命令kill。
    ProcessUtils.killYarnJob(taskInstance);
    // 把任务的状态从“running”改为“need failover”
    taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
    processService.saveTaskInstance(taskInstance);
 }
 logger.info("end worker[{}] failover ...", workerHost);
}

1.2. failoverMaster 恢复工作流实例

private void failoverMaster(String masterHost) {
   logger.info("start master failover ...");
  // 获取需要failover的工作流实例
   List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
 
   for(ProcessInstance processInstance : needFailoverProcessInstanceList){
       // 1.更新工作流实例的host为null
       // 2.写入 t_ds_commond 表一条恢复工作流实例的命令
      processService.processNeedFailoverProcessInstances(processInstance);
   }
 
   logger.info("master failover end");
}

2. MasterSchedulerThread 线程

该线程主要对command进行解析生成工作流实例

public void run() {
    logger.info("master scheduler start successfully...");
    while (Stopper.isRunning()){
 
      // process instance
      ProcessInstance processInstance = null;
 
      InterProcessMutex mutex = null;
      try {
          // 调用OSUtils.checkResource,检查当前资源(内存、CPU)资源超出阈值,则休眠1秒进入下一次循环
          if(OSUtils.checkResource(conf, true)){
              // 检查zookeeper是否连接成功
              if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
 
                  // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
                  // 创建分布式锁 /dolphinscheduler/lock/masters
                  // 获取一个InterProcessMutex锁(分布式的公平可重入互斥锁)。也就是只有一个master可以获取到这个锁
                  String znodeLock = zkMasterClient.getMasterLockPath();
 
                  mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
                  mutex.acquire();
 
                  ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
                  int activeCount = poolExecutor.getActiveCount();
                  // make sure to scan and delete command  table in one transaction
                  // 需要确保实例构建存储过程和command数据从表中删除的过程在一个事务中
                  // 查询一个Command,不为null时进行后续逻辑。
                  Command command = processDao.findOneCommand();
                  if (command != null) {
                      logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
 
                      try{
                          // handleCommand将commond解析成processInstance
                          processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
                          if (processInstance != null) {
                              logger.info("start master exec thread , split DAG ...");
                              // masterExecService,master执行线程
                              masterExecService.execute(new MasterExecThread(processInstance,processDao));
                          }
                      }catch (Exception e){
                          logger.error("scan command error ", e);
                          processDao.moveToErrorCommand(command, e.toString());
                      }
                  }
              }
          }
          // 休眠1秒,进入下一次循环
          // accessing the command table every SLEEP_TIME_MILLIS milliseconds
          Thread.sleep(Constants.SLEEP_TIME_MILLIS);
 
      }catch (Exception e){
          logger.error("master scheduler thread exception : " + e.getMessage(),e);
      }finally{
          //进入下一次循环之前,释放InterProcessMutex锁
          AbstractZKClient.releaseMutex(mutex);
      }
 
    }
    logger.info("master server stopped...");
}

Stopper.isRunning() 的逻辑

 /**
 *  if the process closes, a signal is placed as true, and all threads get this flag to stop working
 */
public class Stopper {
 
 private static volatile AtomicBoolean signal = new AtomicBoolean(false);
 
 public static final boolean isStoped(){
   return signal.get();
 }
 
 public static final boolean isRunning(){
   return !signal.get();
 }
 
 public static final void stop(){
   signal.getAndSet(true);
 }
}

其逻辑就是用一个原子布尔值,标志当前进程是否要退出。如果收到了退出信号,则signal为true,该进程内所有的线程都退出当前循环。


下面我们来分析查询到一个Command之后的逻辑:

if (command != null) {
    logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
 
    try{
        processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
        if (processInstance != null) {
            logger.info("start master exec thread , split DAG ...");
            masterExecService.execute(new MasterExecThread(processInstance,processDao));
        }
    }catch (Exception e){
        logger.error("scan command error ", e);
        processDao.moveToErrorCommand(command, e.toString());
    }
}

其实就是根据Command创建了一个ProcessInstance(流程实例),流程定义是由Scheduler自动创建的,而Quartz已经根据Schedule信息创建了Command保存到了数据库。至此,流程定义与定时的关联逻辑就已经串起来了。

创建流程实例的时候传入了当前可用(masterExecThreadNum - activeCount)的线程数量,如果满足当前dag,则返回ProcessInstance,否则返回null。

ProcessInstance最终交由MasterExecThread去执行。

2.1. handleCommand

根据command对象构建工作流实例,构建后把该条command从t_ds_command表中删除,需要确保的是实例构建存储过程和command数据从表中删除的过程在一个事务中。 command所有类型如下


  • 0 start a new process

  • 1 start a new process from current nodes

  • 2 recover tolerance fault process

  • 3 recover suspended process

  • 4 start process from failure task nodes

  • 5 complement data

  • 6 start a new process from scheduler

  • 7 repeat running a process

  • 8 pause a process

  • 9 stop a process

  • 10 recover waiting thread

@Transactional(rollbackFor = Exception.class)
public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {
   //根据command命令生成新的工作流程实例
   ProcessInstance processInstance = constructProcessInstance(command, host);
  //cannot construct process instance, return null;
  if(processInstance == null){
      logger.error("scan command, command parameter is error: %s", command.toString());
      moveToErrorCommand(command, "process instance is null");
      return null;
  }
  if(!checkThreadNum(command, validThreadNum)){
      logger.info("there is not enough thread for this command: {}",command.toString() );
      return setWaitingThreadProcess(command, processInstance);
  }
  processInstance.setCommandType(command.getCommandType());
  processInstance.addHistoryCmd(command.getCommandType());
  saveProcessInstance(processInstance);
  this.setSubProcessParam(processInstance);
  //保存了任务流实例后将该命令删除
  delCommandByid(command.getId());
  return processInstance;
}

2.2. MasterExecThread 执行线程

与MasterSchedulerThread一样,MasterExecThread也是实现了Runnable的线程类,先来看MasterExecThread的构造函数。

taskExecService 它是一个固定大小(20)的后台线程池。这意味着,一个DAG最大的并发任务数就是20。

public MasterExecThread(ProcessInstance processInstance,ProcessDao processDao){
    this.processDao = processDao;
 
    this.processInstance = processInstance;
 
    int masterTaskExecNum = conf.getInt(Constants.MASTER_EXEC_TASK_THREADS,
            Constants.defaultMasterTaskExecNum);
    this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",
            masterTaskExecNum);
}

另外这段代码,多次出现,可以风格统一下,不要每次都初始化

/**
 * load configuration file
 */
private static Configuration conf;
static {
        try {
            conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH);
        }catch (ConfigurationException e){
            logger.error("load configuration failed : " + e.getMessage(),e);
            System.exit(1);
        }
    }

下面分析该类的run方法。

public void run() {
    // process instance is null
    // 判断processInstance是否为null。为null则退出
    if (processInstance == null){
        logger.info("process instance is not exists");
        return;
    }
    // check to see if it's done
    // 判断processInstance是否已经完成(成功、报错、取消、暂停、等待)
    if (processInstance.getState().typeIsFinished()){
        logger.info("process instance is done : {}",processInstance.getId());
        return;
    }
 
    try {
        //检查此过程是否是补数 且 流程实例是否为子流程
        if (processInstance.isComplementData() &&  Flag.NO == processInstance.getIsSubProcess()){
            // 详情见2.2.2. 执行补数
            executeComplementProcess();
        }else{
            //详情见2.2.1. 执行流程实例
            executeProcess();
        }
    ......
}

2.2.1. executeProcess() 执行流程实例

private void executeProcess() throws Exception {
    //1.根据流程实例id查找有效的任务列表 initTaskQueue()
    //2.构建DAG处理流程 buildFlowDag() 返回DAG对象,主要包括两个信息:vertex 点,即任务执行节点;edge 边,即任务之间的依赖关系
    prepareProcess();
    //提交并监控任务,直到工作流停止 详情见2.2.1.1
    runProcess();
    //当线程池不足以供流程实例使用时,创建恢复等待线程命令。
    //子工作流程实例无需创建恢复命令。
    //创建恢复等待线程命令并同时删除origin命令。
   //如果存在recovery命令,则仅更新字段update_time
    endProcess();
}

2.2.1.1. runProcess()提交并监控任务

submitPostNode方法传入父任务节点的名字,通过节点名,DAG,获取任务节点列表,并生成任务实例列表readyToSubmitTaskList

private void runProcess(){
    submitPostNode(null);

submitStandByTask()方法里面会遍历任务实例列表readyToSubmitTaskList,判断任务实例的依赖关系,依赖项运行成功则会提交任务执行线程,失败则把当前节点状态改为失败。

       if(canSubmitTaskToQueue()){
           submitStandByTask();
       }
       try {
           Thread.sleep(Constants.SLEEP_TIME_MILLIS);
       } catch (InterruptedException e) {
           logger.error(e.getMessage(),e);
       }
       updateProcessInstanceState();
   }
 
   logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState());
}

submitStandByTask()最终会调用submitTaskExec,这里有个MasterBaseTaskExecThread线程 MasterBaseTaskExecThread线程有两个主要作用

  • 用于把任务实例信息提交到数据库中submitTask()

  • 把任务信息写进zookeeper队列 submitTaskToQueue(),后续worker会来认领任务。(节点命名方式:${processInstancePriority}${processInstanceId}${taskInstancePriority}${taskInstanceId}${task executed by ip1},${ip2}...)


另外MasterBaseTaskExecThread有两个子类,除了上面的两个作用外:

  • MasterTaskExecThread 任务执行完成后会把需要kill的任务信息写入zk队列中等待worker来kill任务。

  • SubProcessTaskExecThread 在当前工作流运行结束后会继续运行子工作流并做相关状态更新,子工作流完全完成才同步状态为子工作流的状态。


MasterBaseTaskExecThread线程异步提交,会把结果写入activeTaskNode。

 private TaskInstance submitTaskExec(TaskInstance taskInstance) {
        MasterBaseTaskExecThread abstractExecThread = null;
        if(taskInstance.isSubProcess()){
            abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance);
        }else {
            abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance);
        }
        Future<Boolean> future = taskExecService.submit(abstractExecThread);
        activeTaskNode.putIfAbsent(abstractExecThread, future);
        return abstractExecThread.getTaskInstance();
    }

然后会遍历activeTaskNode,判断线程是否执行完成,若完成则移除该线程信息,再判断节点是否执行成功

 for(Map.Entry<MasterBaseTaskExecThread,Future<Boolean>> entry: activeTaskNode.entrySet()) {
                Future<Boolean> future = entry.getValue();
                TaskInstance task  = entry.getKey().getTaskInstance();
 
                if(!future.isDone()){
                    continue;
                }
                // node monitor thread complete
                activeTaskNode.remove(entry.getKey());
                if(task == null){
                    this.taskFailedSubmit = true;
                    continue;
                }
                logger.info("task :{}, id:{} complete, state is {} ",
                        task.getName(), task.getId(), task.getState().toString());
                // 如果节点成功,则继续提交任务节点
                if(task.getState() == ExecutionStatus.SUCCESS){
                    completeTaskList.put(task.getName(), task);
                    submitPostNode(task.getName());
                    continue;
                }
                // 如果节点失败,先重试,然后再继续执行失败流程
                if(task.getState().typeIsFailure()){
                    if(task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){
                        this.recoverToleranceFaultTaskList.add(task);
                    }
                    if(task.taskCanRetry()){
                        addTaskToStandByList(task);
                    }else{
                        completeTaskList.put(task.getName(), task);
                        if( task.getTaskType().equals(TaskType.CONDITIONS.toString()) ||
                                haveConditionsAfterNode(task.getName())) {
                            submitPostNode(task.getName());
                        }else{
                            errorTaskList.put(task.getName(), task);
                            if(processInstance.getFailureStrategy() == FailureStrategy.END){
                                killTheOtherTasks();
                            }
                        }
                    }
                    continue;
                }
                // other status stop/pause
                completeTaskList.put(task.getName(), task);
            }
            // send alert

2.2.2. executeComplementProcess() 执行补数流程实例

private void executeComplementProcess() throws Exception {
    ....
    //根据调度的时间规则和补数的时间范围计算出需要补数的日期列表
    int processDefinitionId = processInstance.getProcessDefinitionId();
    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
    List<Date> listDate = Lists.newLinkedList();
    if(!CollectionUtils.isEmpty(schedules)){
        for (Schedule schedule : schedules) {
            listDate.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedule.getCrontab()));
        }
}
//接下来是一个循环,用日期列表的每个日期执行一次
//以下三个方法同 2.2.1
....
prepareProcess();
....
runProcess();
....

3. heartBeatThread线程

每30秒上报一次心跳信息, 同时判断host是否在dead-servers节点下,即判断进程是否已经挂了。 进程正常则更新zookeeper的/dolphinscheduler/masters/${host}/ 下的节点名称,包括以下信息 ip, port ,cpUsage, memoryUsage, loadAverage, registerTIme, currentTime

 private Runnable heartBeatThread(){
        logger.info("start master heart beat thread...");
        Runnable heartBeatThread  = new Runnable() {
            @Override
            public void run() {
                if(Stopper.isRunning()) {
                    // send heartbeat to zk
                    if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
                        logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server");
                        return;
                    }
 
                    zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX);
                }
            }
        };
        return heartBeatThread;
    }




版权声明:本文由星尘阁原创出品,转载请注明出处!

本文链接:http://www.52xingchen.cn/detail/53




赞助本站,网站的发展离不开你们的支持!
来说两句吧
大侠留个名吧,或者可以使用QQ登录。
: 您已登陆!可以继续留言。
最新评论