网站首页 文章专栏 Apache DolphinScheduler源码分析 -- master模块
从主类MasterServer开始,从启动到运行,master主要做了以下三件事情
Zookeeper 节点初始化
构建并提交工作流实例,跟踪运行状态
监控其他MasterServer和WorkerServer的健康状态并容错
维系心跳
@PostConstruct public void run(){ //详情见1.zookeeper初始化 zkMasterClient.init(); //详情见2.MasterSchedulerThread线程 masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); //详情见3.heartBeatThread线程 heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM); }
创建DS在zookeeper的相关节点,并判断是否对系统做failover,恢复异常的工作流实例和任务实例。
用于master的failover /dolphinscheduler/lock/failover/master
系统节点,保存master和worker的心跳信息 /dolphinscheduler/masters; /dolphinscheduler/workers;/dolphinscheduler/dead-servers
public void init(){ logger.info("initialize master client..."); this.initDao(); InterProcessMutex mutex = null; try { // 创建分布式锁节点,用于master节点的failover String znodeLock = getMasterStartUpLockPath(); mutex = new InterProcessMutex(zkClient, znodeLock); mutex.acquire(); // 在ZK中初始化系统节点, this.initSystemZNode(); // 向ZK的/masters节点注册当前的master信息 this.registerMaster(); // 通过监听Zookeeper临时节点变化来进行容错处理(如果活跃的master只有自身一个,则进行failover) if (getActiveMasterNum() == 1) { failoverWorker(null, true); // 恢复任务实例 详情见1.1. failoverMaster(null); // 恢复工作流实例 详情见1.2. } }catch (Exception e){ logger.error("master start up exception",e); }finally { releaseMutex(mutex); } }
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { logger.info("start worker[{}] failover ...", workerHost); List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); for(TaskInstance taskInstance : needFailoverTaskInstanceList){ if(needCheckWorkerAlive){ if(!checkTaskInstanceNeedFailover(taskInstance)){ // 不需要failover的两种情况 // 1.任务详情中不存在host信息 // 2.任务在ZK中存在,则判断启动时间是否小于worker启动时间,小于则不用failover continue; } } ProcessInstance instance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); if(instance!=null){ taskInstance.setProcessInstance(instance); } // 如果任务中有yarn的任务则杀掉,kill的方式,日志中用正则匹配containId的格式,获取containID,用yarn命令kill。 ProcessUtils.killYarnJob(taskInstance); // 把任务的状态从“running”改为“need failover” taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); processService.saveTaskInstance(taskInstance); } logger.info("end worker[{}] failover ...", workerHost); }
private void failoverMaster(String masterHost) { logger.info("start master failover ..."); // 获取需要failover的工作流实例 List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); for(ProcessInstance processInstance : needFailoverProcessInstanceList){ // 1.更新工作流实例的host为null // 2.写入 t_ds_commond 表一条恢复工作流实例的命令 processService.processNeedFailoverProcessInstances(processInstance); } logger.info("master failover end"); }
该线程主要对command进行解析生成工作流实例
public void run() { logger.info("master scheduler start successfully..."); while (Stopper.isRunning()){ // process instance ProcessInstance processInstance = null; InterProcessMutex mutex = null; try { // 调用OSUtils.checkResource,检查当前资源(内存、CPU)资源超出阈值,则休眠1秒进入下一次循环 if(OSUtils.checkResource(conf, true)){ // 检查zookeeper是否连接成功 if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master // 创建分布式锁 /dolphinscheduler/lock/masters // 获取一个InterProcessMutex锁(分布式的公平可重入互斥锁)。也就是只有一个master可以获取到这个锁 String znodeLock = zkMasterClient.getMasterLockPath(); mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); mutex.acquire(); ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService; int activeCount = poolExecutor.getActiveCount(); // make sure to scan and delete command table in one transaction // 需要确保实例构建存储过程和command数据从表中删除的过程在一个事务中 // 查询一个Command,不为null时进行后续逻辑。 Command command = processDao.findOneCommand(); if (command != null) { logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); try{ // handleCommand将commond解析成processInstance processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command); if (processInstance != null) { logger.info("start master exec thread , split DAG ..."); // masterExecService,master执行线程 masterExecService.execute(new MasterExecThread(processInstance,processDao)); } }catch (Exception e){ logger.error("scan command error ", e); processDao.moveToErrorCommand(command, e.toString()); } } } } // 休眠1秒,进入下一次循环 // accessing the command table every SLEEP_TIME_MILLIS milliseconds Thread.sleep(Constants.SLEEP_TIME_MILLIS); }catch (Exception e){ logger.error("master scheduler thread exception : " + e.getMessage(),e); }finally{ //进入下一次循环之前,释放InterProcessMutex锁 AbstractZKClient.releaseMutex(mutex); } } logger.info("master server stopped..."); }
Stopper.isRunning() 的逻辑
/** * if the process closes, a signal is placed as true, and all threads get this flag to stop working */ public class Stopper { private static volatile AtomicBoolean signal = new AtomicBoolean(false); public static final boolean isStoped(){ return signal.get(); } public static final boolean isRunning(){ return !signal.get(); } public static final void stop(){ signal.getAndSet(true); } }
其逻辑就是用一个原子布尔值,标志当前进程是否要退出。如果收到了退出信号,则signal为true,该进程内所有的线程都退出当前循环。
下面我们来分析查询到一个Command之后的逻辑:
if (command != null) { logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); try{ processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command); if (processInstance != null) { logger.info("start master exec thread , split DAG ..."); masterExecService.execute(new MasterExecThread(processInstance,processDao)); } }catch (Exception e){ logger.error("scan command error ", e); processDao.moveToErrorCommand(command, e.toString()); } }
其实就是根据Command创建了一个ProcessInstance(流程实例),流程定义是由Scheduler自动创建的,而Quartz已经根据Schedule信息创建了Command保存到了数据库。至此,流程定义与定时的关联逻辑就已经串起来了。
创建流程实例的时候传入了当前可用(masterExecThreadNum - activeCount)的线程数量,如果满足当前dag,则返回ProcessInstance,否则返回null。
ProcessInstance最终交由MasterExecThread去执行。
根据command对象构建工作流实例,构建后把该条command从t_ds_command表中删除,需要确保的是实例构建存储过程和command数据从表中删除的过程在一个事务中。 command所有类型如下
0 start a new process
1 start a new process from current nodes
2 recover tolerance fault process
3 recover suspended process
4 start process from failure task nodes
5 complement data
6 start a new process from scheduler
7 repeat running a process
8 pause a process
9 stop a process
10 recover waiting thread
@Transactional(rollbackFor = Exception.class) public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) { //根据command命令生成新的工作流程实例 ProcessInstance processInstance = constructProcessInstance(command, host); //cannot construct process instance, return null; if(processInstance == null){ logger.error("scan command, command parameter is error: %s", command.toString()); moveToErrorCommand(command, "process instance is null"); return null; } if(!checkThreadNum(command, validThreadNum)){ logger.info("there is not enough thread for this command: {}",command.toString() ); return setWaitingThreadProcess(command, processInstance); } processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); this.setSubProcessParam(processInstance); //保存了任务流实例后将该命令删除 delCommandByid(command.getId()); return processInstance; }
与MasterSchedulerThread一样,MasterExecThread也是实现了Runnable的线程类,先来看MasterExecThread的构造函数。
taskExecService 它是一个固定大小(20)的后台线程池。这意味着,一个DAG最大的并发任务数就是20。
public MasterExecThread(ProcessInstance processInstance,ProcessDao processDao){ this.processDao = processDao; this.processInstance = processInstance; int masterTaskExecNum = conf.getInt(Constants.MASTER_EXEC_TASK_THREADS, Constants.defaultMasterTaskExecNum); this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread", masterTaskExecNum); }
另外这段代码,多次出现,可以风格统一下,不要每次都初始化
/** * load configuration file */ private static Configuration conf; static { try { conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH); }catch (ConfigurationException e){ logger.error("load configuration failed : " + e.getMessage(),e); System.exit(1); } }
下面分析该类的run方法。
public void run() { // process instance is null // 判断processInstance是否为null。为null则退出 if (processInstance == null){ logger.info("process instance is not exists"); return; } // check to see if it's done // 判断processInstance是否已经完成(成功、报错、取消、暂停、等待) if (processInstance.getState().typeIsFinished()){ logger.info("process instance is done : {}",processInstance.getId()); return; } try { //检查此过程是否是补数 且 流程实例是否为子流程 if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()){ // 详情见2.2.2. 执行补数 executeComplementProcess(); }else{ //详情见2.2.1. 执行流程实例 executeProcess(); } ...... }
private void executeProcess() throws Exception { //1.根据流程实例id查找有效的任务列表 initTaskQueue() //2.构建DAG处理流程 buildFlowDag() 返回DAG对象,主要包括两个信息:vertex 点,即任务执行节点;edge 边,即任务之间的依赖关系 prepareProcess(); //提交并监控任务,直到工作流停止 详情见2.2.1.1 runProcess(); //当线程池不足以供流程实例使用时,创建恢复等待线程命令。 //子工作流程实例无需创建恢复命令。 //创建恢复等待线程命令并同时删除origin命令。 //如果存在recovery命令,则仅更新字段update_time endProcess(); }
submitPostNode方法传入父任务节点的名字,通过节点名,DAG,获取任务节点列表,并生成任务实例列表readyToSubmitTaskList
private void runProcess(){ submitPostNode(null);
submitStandByTask()方法里面会遍历任务实例列表readyToSubmitTaskList,判断任务实例的依赖关系,依赖项运行成功则会提交任务执行线程,失败则把当前节点状态改为失败。
if(canSubmitTaskToQueue()){ submitStandByTask(); } try { Thread.sleep(Constants.SLEEP_TIME_MILLIS); } catch (InterruptedException e) { logger.error(e.getMessage(),e); } updateProcessInstanceState(); } logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState()); }
submitStandByTask()最终会调用submitTaskExec,这里有个MasterBaseTaskExecThread线程 MasterBaseTaskExecThread线程有两个主要作用
用于把任务实例信息提交到数据库中submitTask()
把任务信息写进zookeeper队列 submitTaskToQueue(),后续worker会来认领任务。(节点命名方式:${processInstancePriority}${processInstanceId}${taskInstancePriority}${taskInstanceId}${task executed by ip1},${ip2}...)
另外MasterBaseTaskExecThread有两个子类,除了上面的两个作用外:
MasterTaskExecThread 任务执行完成后会把需要kill的任务信息写入zk队列中等待worker来kill任务。
SubProcessTaskExecThread 在当前工作流运行结束后会继续运行子工作流并做相关状态更新,子工作流完全完成才同步状态为子工作流的状态。
MasterBaseTaskExecThread线程异步提交,会把结果写入activeTaskNode。
private TaskInstance submitTaskExec(TaskInstance taskInstance) { MasterBaseTaskExecThread abstractExecThread = null; if(taskInstance.isSubProcess()){ abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance); }else { abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance); } Future<Boolean> future = taskExecService.submit(abstractExecThread); activeTaskNode.putIfAbsent(abstractExecThread, future); return abstractExecThread.getTaskInstance(); }
然后会遍历activeTaskNode,判断线程是否执行完成,若完成则移除该线程信息,再判断节点是否执行成功
for(Map.Entry<MasterBaseTaskExecThread,Future<Boolean>> entry: activeTaskNode.entrySet()) { Future<Boolean> future = entry.getValue(); TaskInstance task = entry.getKey().getTaskInstance(); if(!future.isDone()){ continue; } // node monitor thread complete activeTaskNode.remove(entry.getKey()); if(task == null){ this.taskFailedSubmit = true; continue; } logger.info("task :{}, id:{} complete, state is {} ", task.getName(), task.getId(), task.getState().toString()); // 如果节点成功,则继续提交任务节点 if(task.getState() == ExecutionStatus.SUCCESS){ completeTaskList.put(task.getName(), task); submitPostNode(task.getName()); continue; } // 如果节点失败,先重试,然后再继续执行失败流程 if(task.getState().typeIsFailure()){ if(task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){ this.recoverToleranceFaultTaskList.add(task); } if(task.taskCanRetry()){ addTaskToStandByList(task); }else{ completeTaskList.put(task.getName(), task); if( task.getTaskType().equals(TaskType.CONDITIONS.toString()) || haveConditionsAfterNode(task.getName())) { submitPostNode(task.getName()); }else{ errorTaskList.put(task.getName(), task); if(processInstance.getFailureStrategy() == FailureStrategy.END){ killTheOtherTasks(); } } } continue; } // other status stop/pause completeTaskList.put(task.getName(), task); } // send alert
private void executeComplementProcess() throws Exception { .... //根据调度的时间规则和补数的时间范围计算出需要补数的日期列表 int processDefinitionId = processInstance.getProcessDefinitionId(); List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId); List<Date> listDate = Lists.newLinkedList(); if(!CollectionUtils.isEmpty(schedules)){ for (Schedule schedule : schedules) { listDate.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedule.getCrontab())); } } //接下来是一个循环,用日期列表的每个日期执行一次 //以下三个方法同 2.2.1 .... prepareProcess(); .... runProcess(); ....
每30秒上报一次心跳信息, 同时判断host是否在dead-servers节点下,即判断进程是否已经挂了。 进程正常则更新zookeeper的/dolphinscheduler/masters/${host}/ 下的节点名称,包括以下信息 ip, port ,cpUsage, memoryUsage, loadAverage, registerTIme, currentTime
private Runnable heartBeatThread(){ logger.info("start master heart beat thread..."); Runnable heartBeatThread = new Runnable() { @Override public void run() { if(Stopper.isRunning()) { // send heartbeat to zk if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) { logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server"); return; } zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX); } } }; return heartBeatThread; }
版权声明:本文由星尘阁原创出品,转载请注明出处!