RTOS中断内核对象(一)
在上一篇文章https://www.nowcoder.com/discuss/479716637294235648中,概述了RTOS的调度模型,谈到了task中如何主动发起调度让渡CPU。但实际使用中,task通常不会显式主动去调用yield让渡CPU,而是通过内核对象,当获取内核对象造成阻塞,由内核对象去调用yield,即调度对于task而言是封闭的。从task的角度看,内核对象是进行了自旋还是调度没有区别。这里内核对象包括了Message Queue,Event Group,Semaphore,Mutex等。本文主要会谈到Message Queue,因为实际上Semaphore和Mutex都可以通过Message Queue实现。
Message Queue概述
什么是Message Queue
Message Queue,即消息队列,本质上来说只是一段内存,和裸机开发中全局变量的作用相同。只不过在裸机开发中,代码是线性流程,编程者需要负责所有程序流程的设计,相当于站在上帝视角上;而在RTOS语境下,任务被拆分成了不同的task,每个task都focus在自己的任务上,此时对于全局变量的访问就需要注意,是不是只有我一个线程回去访问这个全局变量,如果不是的话,就需要解决多task同时访问的问题。另一方面,如果我希望使用这段内存来做不同task之间的消息传递,就需要机制保持访问次序,即生产者应该早于消费者。如果消费者访问早于生成者,那它就应该等待,直到生产者完成访问。消息队列便是RTOS提供给所有task的,能够解决上述问题的内核对象。
RTOS下的实现
在多task的情况下,对于共有内存的访问,我们希望实现的是,当task1正在访问时,task2如果也要访问,那么它应当等待task1访问完毕后再访问,在等待的期间,为了效率最大化,可以主动调度出去,待task1访问结束后再调度回来。顺着这个思路,可以做如下实现:
struct XLIST_ITEM {
struct XLIST_ITEM *pxPrevious;
struct XLIST_ITEM *pxNext;
uint32 value;
};
typedef struct XLIST_ITEM xListItem_t;
typedef struct {
uint32 itemNum; //链表元素个数
xListItem* pxFirstItem; //链表首个元素
xListItem* pxLastItem; //链表末尾元素
}xList_t;
typedef struct
{
xListItem_t list_item //用于将task结构体挂在不同的链表上
uint32 priority; //task优先级
uint32 r13; //用于调度时保存栈顶指针
uint32 stack_base; //指示当前task栈的基地址
uint32 stack_size; //指示当前task栈的大小;
task_func func; //当前task的回调;
}xTaskHandler_t;
typedef struct {
uint32* sharemem; //共享内存
xList_t receiveWaitList; //接收等待链表
xList_t sendWaitList; //发送等待链表
bool haveValidValue; //当前队列中是否有有效数据
}xMsgQueue_t;
xTaskHandler_t* pxCurrentTask = NULL; //当前正在执行的task结构体指针
static xList_t xReadyList[MAX_PRIORITY]; //可以被调度的task的链表,即ready链表
//将item插入链表list尾部
extern void vInsertToEnd(xList_t *list, xListItem_t* item);
//将指定item并从链表list中去掉
extern void vRemoveItem(xList_t *list, xListItem_t* item);
//判断列表是否为空
extern bool isListEmpty(xList_t *list);
//进入临界区,在临界区内不会产生调度
extern void enterCritical();
//退出临界区,在临界区内不会产生调度
extern void exitCritical();
//下面的接口实现通过task结构体的list_item得到task结构体的指针的功能
extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item);
void vMsgQueueReceive(xMsgQueue_t* queue, uint32* data) {
xListItem* temp;
while(1) {
enterCritical();
//有有效数据
if (haveValidValue == 1) {
haveValidValue = 0; //消息已被读走
memcpy(data, queue->sharemem); //获取sharemem中的内容
if (!isListEmpty(queue->sendWaitList)) {
//目前消息队列以空,将发送等待链表中第一个task重新加入ready链表参与调度(如果有的话)
temp = queue->sendWaitList->pxFirstItem;
if (temp != NULL) {
vRemoveItem(queue->sendWaitList, temp);
vInsertToEnd(&(xReadyList[getTaskFromListItem(temp)->priority]), temp);
}
exitCritical();
break;
}
} else { //没有有效数据,挂起自己等待其他task写入
//将当前task从ready链表去掉
vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item));
//将当前task加到queue的接收等待链表
vInsertToEnd(queue->receiveWaitList, &(pxCurrentTask->list_item));
exitCritical();
//触发调度
yield();
}
}
//如果有task挂在发送等待链表,则调度一次给予该task运行机会
if (temp != NULL)
yield();
}
void vMsgQueueSend(xMsgQueue_t* queue, uint32* data) {
xListItem* temp;
while(1) {
enterCritical();
//无有效数据
if (queue->haveValidValue == 0)) {
queue->haveValidValue = 1; //消息已写入
memcpy(data, queue->sharemem); //获取sharemem中的内容
if (!isListEmpty(queue->receiveWaitList)) {
//目前消息队列已写入数据,将接收等待链表中第一个task重新加入ready链表参与调度
temp = queue->receiveWaitList->pxFirstItem;
if (temp != NULL) {
vRemoveItem(queue->receiveWaitList, temp);
vInsertToEnd(&(xReadyList[getTaskFromListItem(temp)->priority]), temp);
}
exitCritical();
break;
}
} else { //有有效数据,挂起自己等待数据被读走
//将当前task从ready链表去掉
vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item));
//将当前task加到queue的发送等待链表
vInsertToEnd(queue->sendWaitList, &(pxCurrentTask->list_item));
exitCritical();
//触发调度
yield();
}
}
if (temp != NULL)
yield();
}
注意,再对内核对象或全局变量操作时,需要防止产生调度,即进入临界区。当task调用vMsgQueueSend时,如果此时队列已满/为空,则会产生阻塞,task被从ready链表中拿去,内核对象中主动调度;当其他task将队列中消息读走,队列中可以写入新消息时,则将task重新加入ready链表,内核对象中主动调度,原先的task继续执行,对于原先的task而言,它对于中间的调度过程并无感知。反过来task调用vMsgQueueReceive时也是同理。
如何扩展
上面的实现其实不能称为队列,因为实际上只能存储一个消息。我们可以将其扩展为多个,只需要将sharemem改为数组,对应其他的修改如下:
struct XLIST_ITEM {
struct XLIST_ITEM *pxPrevious;
struct XLIST_ITEM *pxNext;
uint32 value;
};
typedef struct XLIST_ITEM xListItem_t;
typedef struct {
uint32 itemNum; //链表元素个数
xListItem* pxFirstItem; //链表首个元素
xListItem* pxLastItem; //链表末尾元素
}xList_t;
typedef struct
{
xListItem_t list_item //用于将task结构体挂在不同的链表上
uint32 priority; //task优先级
uint32 r13; //用于调度时保存栈顶指针
uint32 stack_base; //指示当前task栈的基地址
uint32 stack_size; //指示当前task栈的大小;
task_func func; //当前task的回调;
}xTaskHandler_t;
#define MSG_NUM 16
typedef struct {
uint32* msg[MSG_NUM]; //消息队列
uint32 writePtr; //消息队列写指针位置
uint32 readPtr; //消息队列读指针位置
xList_t receiveWaitList; //接收等待链表
xList_t sendWaitList; //发送等待链表
uint32 msgNum; //当前队列中有效消息个数
}xMsgQueue_t;
xTaskHandler_t* pxCurrentTask = NULL; //当前正在执行的task结构体指针
static xList_t xReadyList[MAX_PRIORITY]; //可以被调度的task的链表,即ready链表
//将item插入链表list尾部
extern void vInsertToEnd(xList_t *list, xListItem_t* item);
//将指定item并从链表list中去掉
extern void vRemoveItem(xList_t *list, xListItem_t* item);
//判断列表是否为空
extern bool isListEmpty(xList_t *list);
//进入临界区,在临界区内不会产生调度
extern void enterCritical();
//退出临界区,在临界区内不会产生调度
extern void exitCritical();
//下面的接口实现通过task结构体的list_item得到task结构体的指针的功能
extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item);
void vReadFromQueue(xMsgQueue_t* queue, uint32** data) {
*data = queue->msg[queue->readPtr];
queue->readPtr++; //读指针加1
queue->readPtr %= MSG_NUM; //越界从头开始
queue->msgNum--; //消息数减1
}
void vWriteToQueue(xMsgQueue_t* queue, uint32* data) {
queue->msg[queue->writePtr] = data;
queue->writePtr++; //写指针加1
queue->writePtr %= MSG_NUM; //越界从头开始
queue->msgNum++; //消息数减1
}
void vMsgQueueReceive(xMsgQueue_t* queue, uint32** data) {
xListItem* temp;
while(1) {
enterCritical();
//队列非空
if (queue->msgNum > 0) {
vReadFromQueue(queue, data); //获取消息
if (!isListEmpty(queue->sendWaitList)) {
//目前消息队列未满,将发送等待链表中第一个task重新加入ready链表参与调度(如果有的话)
temp = queue->sendWaitList->pxFirstItem;
if (temp != NULL) {
vRemoveItem(queue->sendWaitList, temp);
vInsertToEnd(&(xReadyList[getTaskFromListItem(temp)->priority]), temp);
}
exitCritical();
break;
}
} else { //队列为空,挂起自己等待其他task写入
//将当前task从ready链表去掉
vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item));
//将当前task加到queue的接收等待链表
vInsertToEnd(queue->receiveWaitList, &(pxCurrentTask->list_item));
exitCritical();
//触发调度
yield();
}
}
//如果有task挂在发送等待链表,则调度一次给予该task运行机会
if (temp != NULL)
yield();
}
void vMsgQueueSend(xMsgQueue_t* queue, uint32* data) {
xListItem* temp;
while(1) {
enterCritical();
//队列未满
if (queue->msgNum < MSG_NUM)) {
vWriteToQueue(queue, &data); //获取sharemem中的内容
if (!isListEmpty(queue->receiveWaitList)) {
//目前消息队列已写入消息,将接收等待链表中第一个task重新加入ready链表参与调度
temp = queue->receiveWaitList->pxFirstItem;
if (temp != NULL) {
vRemoveItem(queue->receiveWaitList, temp);
vInsertToEnd(&(xReadyList[getTaskFromListItem(temp)->priority]), temp);
}
exitCritical();
break;
}
} else { //队列已满,挂起自己等待消息被读走
//将当前task从ready链表去掉
vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item));
//将当前task加到queue的发送等待链表
vInsertToEnd(queue->sendWaitList, &(pxCurrentTask->list_item));
exitCritical();
//触发调度
yield();
}
}
if (temp != NULL)
yield();
}
此外,我们也可以为消息队列添加一个超时机制,此时如果task由于发送/接收消息队列时产生了阻塞,那么将该task从ready链表中拿掉的后,同时加入dealy链表和消息队列的发送/接收等待链表,当超时或队列运行发送/接收两条件任意一个得到满足,将task从这两个链表中同时去掉,并加入ready链表。
未完待续
#嵌入式##嵌入式工程师#21届985微电子硕士,校招SSP进入紫光展锐任嵌入式工程师,8个月后跳槽至ASR工作至今。本专栏记录上学/工作时学到的一些知识,debug的一些工具使用等,希望有所帮助。
查看6道真题和解析