怎样写一个类似ROS的易用的android机器人框架(4)
怎样写一个类似ROS的易用的android机器人框架(3)
机器人任务框架的工作流程
为避免机器人执行多任务时对传感器,执行机构的占用冲突,同时又有满足机器人响应突发任务的需求,设计这样的任务框架:
1.当前任务可打断可恢复时
多个任务是排队执行的,即同一时间只有一个任务处于运行状态,任务执行过程中如果有新任务到来并允许允许,当前任务会暂停,保存任务进度和状态后,再执行新任务,新任务结束后,再恢复执行暂停保存的任务。 每个任务都有onStart
,onStop
,onPause
,onResume
四个生命周期,以便根据任务的不同状态进行相应的任务参数设置。 当没有任务时,系统插入一个用户定义的空闲任务,用于通知系统进入待机模式或者控制显示待机界面。 完整的流程如下
IdelTask onStart
-> IdleTask onStop
-> Task onStart
-> Task onPause
-> NewTask onStart
-> NewTask onStop
-> Task onResume
-> Task onStop
-> IdleTask onStart
2. 当前任务可打断不可恢复时
当前任务不可恢复时,不会走 onPause
,onResume
,流程如下: IdelTask onStart
-> IdleTask onStop
-> Task onStart
-> Task onStop
-> NewTask onStart
-> NewTask onStop
-> IdleTask onStart
3.当前任务可恢复但不可打断时
当前任务可恢复但不可打断,但是用户又想要立即执行新任务时,通过接口通知用户选择立刻结束或暂停当前任务还是保持现状。保持现状则新任务只能等到当前任务自行结束后自行,否则,将暂停或者结束当前任务,然后走如上的流程
机器人任务框架的实现过程
代码位于 ai.easy.robot.task
包名下
先定义任务接口,以便实现不同的扩展,接口定义如下:
// 机器人任务接口,由于处理其行为逻辑 interface ITask { // 任務名稱 val name: String // 任务正在运行 val isAlive: Boolean // 任务是否可以恢复 fun canResume(): Boolean //任务开始时触发 fun onStart(ctx: TaskContext) //任务结束时触发 fun onStop(ctx: TaskContext) //任务暂停时触发 fun onPause(ctx: TaskContext) //任务恢复时触发 fun onResume(ctx: TaskContext) }
接着定义TaskPool, TaskPool是实现任务调度的类,其用过一个等待任务队列,保存等待执行的任务,还有一个任务栈,保存已经执行的和被暂停保存起来的任务
private val waitingTask: LinkedList<TaskTableItem> = LinkedList() private val taskStack: Stack<TaskTableItem> = Stack()
taskStack的栈顶元素即为当前任务
TaskTableItem
为任务的信息类 定义如下:
internal data class TaskTableItem( val name: String, val task: ITask, val ctx: TaskContext, val startData: Bundle? = null, val addedTime: Long, var startTime: Long = -1, //开始时间,用于延时任务 var pauseTime: Long = -1, var canInterrupted: Boolean = true, var forceStop: Boolean = false , var priority: Int = 0 )
每个任务都有独立的与之绑定的 TaskContext , 随着任务 onStart 时创建,onStop 时释放。
通过调用 TaskPool 的 mainLoop()
执行任务列表的调度流程,具体流程见代码,为了避免多线程开销,这里用了kotlin的协程实现:
val idleCtx = TaskContext(this) idleCtx.cc = context while (looping) { //是否有新任务 if (waitingTask.size > 0) { val ti = waitingTask.peek() //检测任务添加时间是否失效 if (checkOutDate(ti!!)) { waitingTask.poll() continue } val wanna_task = ti.task // // if (isIdle) { myIdleTask.onStop(idleCtx) log("Idle任务停止") isIdle = false val ctx = ti.ctx ctx.startData = ti.startData ?: idleCtx.startData ctx.cc = context // idleCtx.release() log("${wanna_task.name}任务开始") waitingTask.poll() wanna_task.onStart(ctx) } else if (taskStack.size > 0) { //当前有任务在运行 val cur = taskStack.peek() val cur_task = cur.task val cur_ctx = cur.ctx //当前任务不可打断 if (!cur.canInterrupted && cur_task.isAlive) { delay(20) continue } // if (cur_task.canResume() && cur_task.isAlive && !cur.forceStop) { cur_task.onPause(cur_ctx) log("${cur_task.name}任务暂停") } else { // cur_task.onStop(cur_ctx) log("${cur_task.name}任务停止") cur_ctx.release() taskStack.pop() // } // ti.ctx.startData = ti.startData ?: cur_ctx.startData ti.ctx.cc = context log("${wanna_task.name}任务开始") waitingTask.poll() wanna_task.onStart(ti.ctx) } //将可恢复任务压入堆栈 taskStack.push(ti) // continue } //没有新任务时 //检查当前是否还有任务,弹出失效的任务 if (taskStack.size > 0) { val cur = taskStack.peek() val cur_task = cur.task val cur_ctx = cur.ctx if (!cur_task.isAlive) { cur_task.onStop(cur_ctx) log("${cur_task.name}任务停止") //任务已停止,弹出 taskStack.pop() cur_ctx.release() // if (taskStack.size > 0) { taskStack.peek().let { log("${it.task.name}任务恢复") it.task.onResume(it.ctx) } } // continue } } //无任何任务,转入空闲状态 if (taskStack.size == 0) { if (!isIdle) { log("Idle任务开始") idleTask?.onStart(idleCtx) isIdle = true } } // delay(25) //挂起防止线程堵塞 }
通过调用 TaskPool 的 addNewTask()
通知TaskPool执行新的任务。
需要注意
当 waitingTask含有多个任务时
需要根据新任务的优先级插入到合适的位置。
当前任务不可打断时
通过 InterruptComingSelector
的实现类显示UI通知用户进一步的操作,InterruptComingSelector
的定义如下:
/** * 询问用户是否打断任务的通知器 */ abstract class InterruptComingSelector { companion object { const val CHOICE_STOP = 1 const val CHOICE_PAUSE = 2 const val CHOICE_DO_NOTHING = 0 } /** * 开始询问用户时 * @param taskName 当前任务名 * @param canResume 当前任务可恢复 */ abstract fun makeSelection(taskName: String, canResume: Boolean) /** * 当前任务已结束或者用户取消选择时 */ abstract fun onFinishOrCancel() internal var isFinished: Boolean = false internal var userChoice: Int = -1 /** * 设置用户的选择结果 * @param choice * @see CHOICE_STOP * @see CHOICE_PAUSE * @see CHOICE_DO_NOTHING */ fun finish(choice: Int) { userChoice = choice isFinished = true } }
InterruptComingSelector
的生命周期是由 TaskContext 管理的,以便任务结束而用户还未做出选择时 销毁其显示的UI
任务上下文的实现
TaskContext 实现提供任务内资源的管理,提供与TaskPool的通讯,提供任务定时器等功能。
同时TaskContext通过一个子任务队列功能。子任务是比任务更小的任务,没有暂停和恢复选项,子任务只能按顺序一个个执行,但是当个子任务的run()
执行函数内可以并行运行多个函数,这些其实是用协程实现的。子任务定义如下:
interface ISubTask { /** * 设置执行任务超时,一旦超时,将结束子任务run()并触发onCancel(), 单位: ms */ fun timeout(): Int /** * 子任务的主要工作 * @return 决定下个子任务是否继续 */ fun run(paraMgr: SubTaskParallelManager): Boolean /** * 子任务正常结束或者运行超时时触发 * @param isTimeout * @return 决定下个子任务是否继续 */ fun onCancel(isTimeout: Boolean): Boolean }
TaskContext.createSubTaskQueue()
创建一个子任务队列,该队列会在TaskContext释放时释放。
add()
往队列了添加子任务 start()
开始运行队列中的子任务 forceCancel()
是取消剩下的子任务,清空队列 pauseSubTasks()
是在任务 onPause时暂停队列运行 resumeSubTasks()
是在任务 onResume时恢复队列运行
start()的实现如下:
fun start(whenFinish: () -> Unit) { work = async(ctx.cctx) { // var ok = true while (isActive && queue.size > 0 && ok) { //暂停时挂起 if (isPause) { delay(100) continue } val sub = queue.poll() val t = sub.timeout().toLong() val timeout = if (t < 100L) 100L else t var is_timeout = false try { withTimeout(t) { currJob = async(context) { sub.run(SubTaskParallelManager(ctx.cctx)) } ok = currJob?.await() ?: false } } catch (e: Throwable) { e.printStackTrace() is_timeout = true } finally { val r = sub.onCancel(is_timeout) ok = if (is_timeout) r else ok } } // currJob = null isFinished = true whenFinish() true } }
需要注意的
这个任务框架是通过检查 ITask的 isAlive 的值判断任务结束的,用户扩展ITask时需要在任务结束时将isAlive设为false
任务的onStart
,onStop
,onPause
,onResume
不应该堵塞线程,如果是长时间运行的任务,可通过TaskContext.doMainWork()
来执行, 配合TaskContext.delayMs()
来延时。如果doMainWork()
中有循环, 需要在循环体中调用 suspendMainWorkWhenPaused()
, 这样任务暂停是调用pauseMainWork()
, 就能在suspendMainWorkWhenPaused()
处挂起。