電子產(chǎn)業(yè)一站式賦能平臺

PCB聯(lián)盟網(wǎng)

搜索
查看: 56|回復: 0
收起左側(cè)

巧奪天工的kfifo竟然把MCU搞掛了?

[復制鏈接]

190

主題

190

帖子

1272

積分

三級會員

Rank: 3Rank: 3

積分
1272
跳轉(zhuǎn)到指定樓層
樓主
發(fā)表于 2024-11-11 20:00:00 | 只看該作者 |只看大圖 回帖獎勵 |倒序瀏覽 |閱讀模式

關注、星標公眾號,直達精彩內(nèi)容
一 問題背景 最近部門時刻準備刪庫跑路的小兄弟調(diào)試CAN通訊的時候,遇到個MCU死機的問題,而負責這塊代碼的哥們已經(jīng)跑路好幾年了,而這個設備也已經(jīng)用了好幾年了,也沒有反饋過什么問題。但是在他那里測試MCU 100%會死機,進入hardfault,一時間CPU都被干燒了,祖?zhèn)鞯拇a這么不靠譜嗎....
最終問題定位在kfifo這部分出了問題,而kfifo是linux kernal里面非常經(jīng)典的一段代碼,應該不至于吧,對此進行了分析驗證。
二 什么是kfifo 釋義摘自:https://blog.csdn.net/linyt/article/details/53355355  講解的很詳細,如有侵權,聯(lián)系刪除~
kfifo是內(nèi)核里面的一個First In First Out數(shù)據(jù)結(jié)構,它采用環(huán)形循環(huán)隊列的數(shù)據(jù)結(jié)構來實現(xiàn),提供一個無邊界的字節(jié)流服務,并且使用并行無鎖編程技術,即當它用于只有一個入隊線程和一個出隊線程的場情時,兩個線程可以并發(fā)操作,而不需要任何加鎖行為,就可以保證kfifo的線程安全。
kfifo代碼既然肩負著這么多特性,那我們先一敝它的代碼:
2.1 kfifo數(shù)據(jù)結(jié)構 struct kfifo {
    unsigned char *buffer;    /* the buffer holding the data */
    unsigned int size;    /* the size of the allocated buffer */
    unsigned int in;    /* data is added at offset (in % size) */
    unsigned int out;    /* data is extracted from off. (out % size) */
    spinlock_t *lock;    /* protects concurrent modifications */
};
這是kfifo的數(shù)據(jù)結(jié)構,kfifo主要提供了兩個操作,__kfifo_put(入隊操作)和__kfifo_get(出隊操作)。 它的各個數(shù)據(jù)成員如下:
buffer: 用于存放數(shù)據(jù)的緩存
size: buffer空間的大小,在初化時,將它向上擴展成2的冪
lock: 如果使用不能保證任何時間最多只有一個讀線程和寫線程,需要使用該lock實施同步。
in, out: 和buffer一起構成一個循環(huán)隊列。 in指向buffer中隊頭,而且out指向buffer中的隊尾,它的結(jié)構如示圖如下:
+--------------------------------------------------------------+
|            ||                      |
+--------------------------------------------------------------+
             ^                          ^                      ^
             |                          |                      |
            out                        in                     size
當然,內(nèi)核開發(fā)者使用了一種更好的技術處理了in, out和buffer的關系,我們將在下面進行詳細分析。
2.2 kfifo功能描述 kfifo提供如下對外功能規(guī)格:
  • 1 只支持一個讀者和一個讀者并發(fā)操作;
  • 2 無阻塞的讀寫操作,如果空間不夠,則返回實際訪問空間;kfifo_alloc 分配kfifo內(nèi)存和初始化工作
    struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
    {
        unsigned char *buffer;
        struct kfifo *ret;
        /*
         * round up to the next power of 2, since our 'let the indices
         * wrap' tachnique works only in this case.
         */
        if (size & (size - 1)) {
            BUG_ON(size > 0x80000000);
            size = roundup_pow_of_two(size);
        }
        buffer = kmalloc(size, gfp_mask);
        if (!buffer)
            return ERR_PTR(-ENOMEM);
        ret = kfifo_init(buffer, size, gfp_mask, lock);
        if (IS_ERR(ret))
            kfree(buffer);
        return ret;
    }
    這里值得一提的是,kfifo->size的值總是在調(diào)用者傳進來的size參數(shù)的基礎上向2的冪擴展,這是內(nèi)核一貫的做法。這樣的好處不言而喻——對kfifo->size取模運算可以轉(zhuǎn)化為與運算,如下:
    kfifo->in % kfifo->size
    可以轉(zhuǎn)化為
    kfifo->in & (kfifo->size – 1)
    在kfifo_alloc函數(shù)中,使用size & (size – 1)來判斷size 是否為2冪,如果條件為真,則表示size不是2的冪,然后調(diào)用roundup_pow_of_two將之向上擴展為2的冪。
    這都是常用的技巧,只不過大家沒有將它們結(jié)合起來使用而已,下面要分析的__kfifo_put和__kfifo_get則是將kfifo->size的特點發(fā)揮到了極致。
    __kfifo_put和__kfifo_get巧妙的入隊和出隊
    _kfifo_put是入隊操作,它先將數(shù)據(jù)放入buffer里面,最后才修改in參數(shù);__kfifo_get是出隊操作,它先將數(shù)據(jù)從buffer中移走,最后才修改out。你會發(fā)現(xiàn)in和out兩者各司其職。
    下面是__kfifo_put和__kfifo_get的代碼
    unsigned int __kfifo_put(struct kfifo *fifo,
                 unsigned char *buffer, unsigned int len)
    {
        unsigned int l;
        len = min(len, fifo->size - fifo->in + fifo->out);
        /*
         * Ensure that we sample the fifo->out index -before- we
         * start putting bytes into the kfifo.
         */
        smp_mb();
        /* first put the data starting from fifo->in to buffer end */
        l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
        memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
        /* then put the rest (if any) at the beginning of the buffer */
        memcpy(fifo->buffer, buffer + l, len - l);
        /*
         * Ensure that we add the bytes to the kfifo -before-
         * we update the fifo->in index.
         */
        smp_wmb();
        fifo->in += len;
        return len;
    }
    奇怪嗎?代碼完全是線性結(jié)構,沒有任何if-else分支來判斷是否有足夠的空間存放數(shù)據(jù)。內(nèi)核在這里的代碼非常簡潔,沒有一行多余的代碼。
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    這個表達式計算當前寫入的空間,換成人可理解的語言就是:
    l = kfifo可寫空間和預期寫入空間的最小值
    使用min宏來代if-else分支
    __kfifo_get也應用了同樣技巧,代碼如下:
    unsigned int __kfifo_get(struct kfifo *fifo,
                 unsigned char *buffer, unsigned int len)
    {
        unsigned int l;
        len = min(len, fifo->in - fifo->out);
        /*
         * Ensure that we sample the fifo->in index -before- we
         * start removing bytes from the kfifo.
         */
        smp_rmb();
        /* first get the data from fifo->out until the end of the buffer */
        l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
        memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
        /* then get the rest (if any) from the beginning of the buffer */
        memcpy(buffer + l, fifo->buffer, len - l);
        /*
         * Ensure that we remove the bytes from the kfifo -before-
         * we update the fifo->out index.
         */
        smp_mb();
        fifo->out += len;
        return len;
    }
    原來,kfifo每次入隊或出隊,kfifo->in或kfifo->out只是簡單地kfifo->in/kfifo->out += len,并沒有對kfifo->size 進行取模運算。因此kfifo->in和kfifo->out總是一直增大,直到unsigned in最大值時,又會繞回到0這一起始端。但始終滿足:
    kfifo->in - kfifo->out size
    即使kfifo->in回繞到了0的那一端,這個性質(zhì)仍然是保持的。
    對于給定的kfifo:
    數(shù)據(jù)空間長度為:kfifo->in - kfifo->out
    而剩余空間(可寫入空間)長度為:kfifo->size - (kfifo->in - kfifo->out)
    盡管kfifo->in和kfofo->out一直超過kfifo->size進行增長,但它對應在kfifo->buffer空間的下標卻是如下:
    kfifo->in % kfifo->size (i.e. kfifo->in & (kfifo->size - 1))
    kfifo->out % kfifo->size (i.e. kfifo->out & (kfifo->size - 1))
    往kfifo里面寫一塊數(shù)據(jù)時,數(shù)據(jù)空間、寫入空間和kfifo->size的關系如果滿足:
    kfifo->in % size + len > size
    那就要做寫拆分了,見下圖:
                                                        kfifo_put(寫)空間開始地址
                                                        |
                                                       \_/
                                                        |XXXXXXXXXX
    XXXXXXXX|                                                   
    +--------------------------------------------------------------+
    |                        ||          |
    +--------------------------------------------------------------+
                             ^                          ^          ^
                             |                          |          |
                           out%size                   in%size     size
            ^
            |
          寫空間結(jié)束地址                     
    第一塊當然是: [kfifo->in % kfifo->size, kfifo->size]
    第二塊當然是:[0, len - (kfifo->size - kfifo->in % kfifo->size)]
    下面是代碼,細細體味吧:
    /* first put the data starting from fifo->in to buffer end */   
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));   
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);   
    /* then put the rest (if any) at the beginning of the buffer */   
    memcpy(fifo->buffer, buffer + l, len - l);  
    對于kfifo_get過程,也是類似的,請各位自行分析。
    kfifo_get和kfifo_put無鎖并發(fā)操作
    計算機科學家已經(jīng)證明,當只有一個讀經(jīng)程和一個寫線程并發(fā)操作時,不需要任何額外的鎖,就可以確保是線程安全的,也即kfifo使用了無鎖編程技術,以提高kernel的并發(fā)。
    kfifo使用in和out兩個指針來描述寫入和讀取游標,對于寫入操作,只更新in指針,而讀取操作,只更新out指針,可謂井水不犯河水,示意圖如下:
                                                   ||
    +--------------------------------------------------------------+
    |                        ||               |
    +--------------------------------------------------------------+
                             ||
                             ^                     ^               ^
                             |                     |               |
                            out                   in              size
    為了避免讀者看到寫者預計寫入,但實際沒有寫入數(shù)據(jù)的空間,寫者必須保證以下的寫入順序:
    往[kfifo->in, kfifo->in + len]空間寫入數(shù)據(jù)
    更新kfifo->in指針為 kfifo->in + len
    在操作1完成時,讀者是還沒有看到寫入的信息的,因為kfifo->in沒有變化,認為讀者還沒有開始寫操作,只有更新kfifo->in之后,讀者才能看到。
    那么如何保證1必須在2之前完成,秘密就是使用內(nèi)存屏障:smp_mb(),smp_rmb(), smp_wmb(),來保證對方觀察到的內(nèi)存操作順序。
    三 kfifo在工程中的移植應用上面已經(jīng)詳細介紹了kfifo的實現(xiàn)原理以及讓人驚嘆的代碼簡潔之道,接下來在MCU工程中實際去應用,看看效果如何。
    直接上代碼:
    kfifo.h
    #ifndef __KFIFO_H
    #define __KFIFO_H
      
    #include "stdint.h"
    #include "stdlib.h"
    #include "string.h"
    #include "stdio.h"
      
    #define MAX(a, b) (((a) > (b)) ? (a) : (b))
    #define MIN(a, b) (((a) (b)) ? (a) : (b))
      
    #define is_power_of_2(x) ((x) != 0 && (((x) & ((x)-1)) == 0))
      
    typedef struct {
    unsigned char *buffer; /* the buffer holding the data*/
    unsigned int size; /* the size of the allocated buffer*/
    unsigned int in; /* data is added at offset (in % size)*/
    unsigned int out; /* data is extracted from off. (out % size)*/
    } kfifo;
      
    /**
    * @brief CAN Rx message structure definition
    */
      
    #define CAN_FIFO_SIZE (2 * 1024)
    typedef struct {
    uint8_t FifoBuf1[CAN_FIFO_SIZE];
    // uint8_t FifoBuf2[CAN_FIFO_SIZE];
      
    uint8_t FpStep;
      
    } CAN_STA_TYPE;
      
    /* USER CODE BEGIN Prototypes */
    typedef struct {
    uint32_t StdId;
    uint32_t ExtId;
    uint32_t IDE;
    uint32_t RTR;
    uint32_t DLC;
    uint8_t Data[8];
    uint32_t Timestamp;
    uint32_t FilterMatchIndex;
    } MyCanRxMsgTypeDef;
      
    extern kfifo gFifoReg1;
    extern kfifo gFifoReg2;
      
    void kfifo_init(kfifo *fifo, unsigned char *buffer, unsigned int size);
    unsigned int kfifo_put(kfifo *fifo, unsigned char *buffer, unsigned int len);
    unsigned int kfifo_get(kfifo *fifo, unsigned char *buffer, unsigned int len);
      
    unsigned int can_fifo_get(kfifo *fifo, unsigned char *buffer, unsigned int len);
      
    void kfifo_reset(kfifo *fifo);
    unsigned int kfifo_len(kfifo *fifo);
      
    #endif
    kfifo.c
    #include "kfifo.h"
    kfifo gFifoReg1;
    kfifo gFifoReg2;
    /*
    * kfifo初始化
    */
    void kfifo_init(kfifo *fifo, unsigned char *buffer, unsigned int size)
    {
        if (!is_power_of_2(size))
        {
            return;
        }
        fifo->buffer = buffer;
        fifo->size = size;
        fifo->in = fifo->out = 0u;
        memset(fifo->buffer, 0, size);
    }
    /*
    * __kfifo_put - puts some data into the FIFO, no locking version
    * @fifo: the fifo to be used.
    * @buffer: the data to be added.
    * @len: the length of the data to be added.
    *
    * This function copies at most 'len' bytes from the 'buffer' into
    * the FIFO depending on the free space, and returns the number of
    * bytes copied.
    *
    * Note that with only one concurrent reader and one concurrent
    * writer, you don't need extra locking to use these functions.
    */
    unsigned int kfifo_put(kfifo *fifo, unsigned char *buffer, unsigned int len)
    {
        unsigned int l = 0;
        len = MIN(len, fifo->size - fifo->in + fifo->out);
        /* first put the data starting from fifo->in to buffer end*/
        l = MIN(len, fifo->size - (fifo->in & (fifo->size - 1)));
        memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
        /* then put the rest (if any) at the beginning of the buffer*/
        memcpy(fifo->buffer, buffer + l, len - l);
       
        fifo->in += len;
        return len;
    }
    /*
    * __kfifo_get - gets some data from the FIFO, no locking version
    * @fifo: the fifo to be used.
    * @buffer: where the data must be copied.
    * @len: the size of the destination buffer.
    *
    * This function copies at most 'len' bytes from the FIFO into the
    * 'buffer' and returns the number of copied bytes.
    *
    * Note that with only one concurrent reader and one concurrent
    * writer, you don't need extra locking to use these functions.
    */
    unsigned int kfifo_get(kfifo *fifo, unsigned char *buffer, unsigned int len)
    {
        unsigned int l = 0;
        len = MIN(len, fifo->in - fifo->out);
        /* first get the data from fifo->out until the end of the buffer*/
        l = MIN(len, fifo->size - (fifo->out & (fifo->size - 1)));
        memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
        /* then get the rest (if any) from the beginning of the buffer*/
        memcpy(buffer + l, fifo->buffer, len - l);
        fifo->out += len;
        return len;
    }
    unsigned int can_fifo_get(kfifo *fifo, unsigned char *buffer, unsigned int len)
    {
        //__disable_irq();
       
        len = kfifo_get(fifo, buffer, len);
       
        //__enable_irq();
        return len;
    }
    /*
    * __kfifo_reset - removes the entire FIFO contents, no locking version
    * @fifo: the fifo to be emptied.
    */
    void kfifo_reset(kfifo *fifo)
    {
        fifo->in = fifo->out = 0;
    }
    /*
    * __kfifo_len - returns the number of bytes available in the FIFO, no locking version
    * @fifo: the fifo to be used.
    */
    unsigned int kfifo_len(kfifo *fifo)
    {
        return fifo->in - fifo->out;
    }
    因為是單線程讀寫,所以沒有進行加鎖操作。
    回到問題背景所說的,之前用的時候一直沒問題,現(xiàn)在測試100%把CPU干冒煙,為何會有如此大的差別?
    回顧了下測試環(huán)境,“跑路的工程師”調(diào)試的時候使用的上位機是自定義的數(shù)據(jù)發(fā)送間隔時間,對與kfifo來說,發(fā)個間隔時間較大,kfifo進出次數(shù)不會差太多,kfifo是“反應的過來”的,實際使用過的時候,上位機替換成了APP,數(shù)據(jù)發(fā)送間隔更大,所以沒有出問題。
    現(xiàn)在,“準備刪庫跑路的工程師”,用的是固定間隔的上位機,而且數(shù)據(jù)發(fā)送頻率很高,接收使用的CAN中斷接收,發(fā)送的時候是使用的任務輪詢,任務運行周期是100ms,創(chuàng)建的buffer緩沖為2048字節(jié),發(fā)送一會兒就會死機,100%復現(xiàn),增大buffer,不過是死的時間晚了點,雖遲但到~~~
    void can_nbbus_entry(void *parameter)
    {
    nbbus_can_init();
    while (1)
    {
      nbbus_poll(&nbbus_can, NB_BUS_BATDIAG_ID);
      rt_thread_mdelay(100);
    }
    }
    于是對數(shù)據(jù)幀進行了分析,can接收到的原始數(shù)據(jù)幀是沒問題的,前面沒出問題的數(shù)據(jù)一直沒問題,MCU崩潰的時候,CAN接收到的原始數(shù)據(jù)依然是沒問題的,原始數(shù)據(jù)既然沒問題,問題應該出在后面解析轉(zhuǎn)發(fā)流程了,分析最終定位在數(shù)據(jù)幀出現(xiàn)了錯位,CAN數(shù)據(jù)幀的數(shù)據(jù)出現(xiàn)了移位,導致can數(shù)據(jù)長度字節(jié)取了一個比較大的數(shù),數(shù)據(jù)拷貝的時候出現(xiàn)了越界,擦出了內(nèi)存中的一些數(shù)據(jù),進入了hardfault
    那么為什么CAN數(shù)據(jù)幀出現(xiàn)了錯位呢,并且似乎出現(xiàn)在某一固定時刻?
    can數(shù)據(jù)接收數(shù)據(jù)如下,can的結(jié)構體有36字節(jié),每次kfifo_put傳入的len為36字節(jié),剩余空間與36字節(jié)進行比較
    typedef struct
    {
        uint32_t StdId;
        uint32_t ExtId;
        uint32_t IDE;
        uint32_t RTR;
        uint32_t DLC;
        uint8_t Data[8];
        uint32_t Timestamp;
        uint32_t FilterMatchIndex;
    } MyCanRxMsgTypeDef;
    void HAL_CAN_RxFifo1MsgPendingCallback(CAN_HandleTypeDef *hcan)
    {
        HAL_StatusTypeDef HAL_RetVal;
        if (hcan == &hcan1)
        {
            HAL_RetVal = HAL_CAN_GetRxMessage(hcan,  CAN_RX_FIFO1, (CAN_RxHeaderTypeDef *)&gCanRxMsg11,  gCanRxMsg11.Data);
            if ( HAL_OK == HAL_RetVal)
            {
                kfifo_put(&gFifoReg1, (uint8_t *)&gCanRxMsg11, sizeof(gCanRxMsg11));
                __HAL_CAN_ENABLE_IT (hcan, CAN_IT_RX_FIFO1_MSG_PENDING);
            }
        }
    }
    接下來著重分析下數(shù)據(jù)是如何產(chǎn)生錯亂的,前面說到定義了2048字節(jié)的buffer緩存空間,對存儲空間結(jié)構分組如下:
  • 回復

    使用道具 舉報

    發(fā)表回復

    您需要登錄后才可以回帖 登錄 | 立即注冊

    本版積分規(guī)則


    聯(lián)系客服 關注微信 下載APP 返回頂部 返回列表