并发无锁环形队列的达成
发布时间:2021-11-18 12:33:37 所属栏目:教程 来源:互联网
导读:前面在《Linux内核数据结构kfifo详解》一文中详细解析了 Linux 内核并发无锁环形队列kfifo的原理和实现,kfifo鬼斧神工,博大精深,让人叹为观止,但遗憾的是kfifo为内核提供服务,并未开放出来。剑不试则利钝暗,弓不试则劲挠诬,鹰不试则巧拙惑,马不试则
前面在《Linux内核数据结构kfifo详解》一文中详细解析了 Linux 内核并发无锁环形队列kfifo的原理和实现,kfifo鬼斧神工,博大精深,让人叹为观止,但遗憾的是kfifo为内核提供服务,并未开放出来。剑不试则利钝暗,弓不试则劲挠诬,鹰不试则巧拙惑,马不试则良驽疑,光说不练是不能学到精髓的,下面就动手实现自己的并发无锁队列UnlockQueue(单生产者单消费者)。 一、UnlockQueue声明 1: #ifndef _UNLOCK_QUEUE_H 2: #define _UNLOCK_QUEUE_H 3: 4: class UnlockQueue 5: { 6: public: 7: UnlockQueue(int nSize); 8: virtual ~UnlockQueue(); 9: 10: bool Initialize(); 11: 12: unsigned int Put(const unsigned char *pBuffer, unsigned int nLen); 13: unsigned int Get(unsigned char *pBuffer, unsigned int nLen); 14: 15: inline void Clean() { m_nIn = m_nOut = 0; } 16: inline unsigned int GetDataLen() const { return m_nIn - m_nOut; } 17: 18: private: 19: inline bool is_power_of_2(unsigned long n) { return (n != 0 && ((n & (n - 1)) == 0)); }; 20: inline unsigned long roundup_power_of_two(unsigned long val); 21: 22: private: 23: unsigned char *m_pBuffer; /* the buffer holding the data */ 24: unsigned int m_nSize; /* the size of the allocated buffer */ 25: unsigned int m_nIn; /* data is added at offset (in % size) */ 26: unsigned int m_nOut; /* data is extracted from off. (out % size) */ 27: }; 28: 29: #endif UnlockQueue与kfifo 结构相同相同,也是由一下变量组成: UnlockQueue kfifo 作用 m_pBuffer buffer 用于存放数据的缓存 m_nSize size 缓冲区空间的大小,圆整为2的次幂 m_nIn in 指向buffer中队头 m_nOut out 指向buffer中的队尾 UnlockQueue的设计是用在单生产者单消费者情况下,所以不需要锁 lock 如果使用不能保证任何时间最多只有一个读线程和写线程,必须使用该lock实施同步。 二、UnlockQueue构造函数和初始化 1: UnlockQueue::UnlockQueue(int nSize) 2: :m_pBuffer(NULL) 3: ,m_nSize(nSize) 4: ,m_nIn(0) 5: ,m_nOut(0) 6: { 7: //round up to the next power of 2 8: if (!is_power_of_2(nSize)) 9: { 10: m_nSize = roundup_power_of_two(nSize); 11: } 12: } 13: 14: UnlockQueue::~UnlockQueue() 15: { 16: if(NULL != m_pBuffer) 17: { 18: delete[] m_pBuffer; 19: m_pBuffer = NULL; 20: } 21: } 22: 23: bool UnlockQueue::Initialize() 24: { 25: m_pBuffer = new unsigned char[m_nSize]; 26: if (!m_pBuffer) 27: { 28: return false; 29: } 30: 31: m_nIn = m_nOut = 0; 32: 33: return true; 34: } 35: 36: unsigned long UnlockQueue::roundup_power_of_two(unsigned long val) 37: { 38: if((val & (val-1)) == 0) 39: return val; 40: 41: unsigned long maxulong = (unsigned long)((unsigned long)~0); 42: unsigned long andv = ~(maxulong&(maxulong>>1)); 43: while((andv & val) == 0) 44: andv = andv>>1; 45: 46: return andv<<1; 47: } 1.在构造函数中,对传入的size进行2的次幂圆整,圆整的好处是可以将m_nIn % m_nSize 可以转化为 m_nIn & (m_nSize – 1),取模运算”的效率并没有 “位运算” 的效率高。 2.在构造函数中,未给buffer分配内存,而在Initialize中分配,这样做的原因是:我们知道在new UnlockQueue的时候有两步操作,第一步分配内存,第二步调用构造函数,如果将buffer的分配放在构造函数中,那么就可能 buffer 就可能分配失败,而后面用到buffer,还需要判空。 三、UnlockQueue入队和出队操作 1: unsigned int UnlockQueue::Put(const unsigned char *buffer, unsigned int len) 2: { 3: unsigned int l; 4: 5: len = std::min(len, m_nSize - m_nIn + m_nOut); 6: 7: /* 8: * Ensure that we sample the m_nOut index -before- we 9: * start putting bytes into the UnlockQueue. 10: */ 11: __sync_synchronize(); 12: 13: /* first put the data starting from fifo->in to buffer end */ 14: l = std::min(len, m_nSize - (m_nIn & (m_nSize - 1))); 15: memcpy(m_pBuffer + (m_nIn & (m_nSize - 1)), buffer, l); 16: 17: /* then put the rest (if any) at the beginning of the buffer */ 18: memcpy(m_pBuffer, buffer + l, len - l); 19: 20: /* 21: * Ensure that we add the bytes to the kfifo -before- 22: * we update the fifo->in index. 23: */ 24: __sync_synchronize(); 25: 26: m_nIn += len; 27: 28: return len; 29: } 30: 31: unsigned int UnlockQueue::Get(unsigned char *buffer, unsigned int len) 32: { 33: unsigned int l; 34: 35: len = std::min(len, m_nIn - m_nOut); 36: 37: /* 38: * Ensure that we sample the fifo->in index -before- we 39: * start removing bytes from the kfifo. 40: */ 41: __sync_synchronize(); 42: 43: /* first get the data from fifo->out until the end of the buffer */ 44: l = std::min(len, m_nSize - (m_nOut & (m_nSize - 1))); 45: memcpy(buffer, m_pBuffer + (m_nOut & (m_nSize - 1)), l); 46: 47: /* then get the rest (if any) from the beginning of the buffer */ 48: memcpy(buffer + l, m_pBuffer, len - l); 49: 50: /* 51: * Ensure that we remove the bytes from the kfifo -before- 52: * we update the fifo->out index. 53: */ 54: __sync_synchronize(); 55: 56: m_nOut += len; 57: 58: return len; 59: } 入队和出队操作与kfifo相同,用到的技巧也完全相同,有不理解的童鞋可以参考前面一篇文章《Linux内核数据结构kfifo详解》。这里需要指出的是__sync_synchronize()函数,由于linux并未开房出内存屏障函数,而在gcc4.2以上版本提供This builtin issues a full memory barrier,有兴趣同学可以参考Built-in functions for atomic memory access。 四、测试程序 如图所示,我们设计了两个线程,一个生产者随机生成学生信息放入队列,一个消费者从队列中取出学生信息并打印,可以看到整个代码是无锁的。 image 1: #include "UnlockQueue.h" 2: #include <iostream> 3: #include <algorithm> 4: #include <pthread.h> 5: #include <time.h> 6: #include <stdio.h> 7: #include <errno.h> 8: #include <string.h> 9: 10: struct student_info 11: { 12: long stu_id; 13: unsigned int age; 14: unsigned int score; 15: }; 16: 17: void print_student_info(const student_info *stu_info) 18: { 19: if(NULL == stu_info) 20: return; 21: 22: printf("id:%ldt",stu_info->stu_id); 23: printf("age:%ut",stu_info->age); 24: printf("score:%un",stu_info->score); 25: } 26: 27: student_info * get_student_info(time_t timer) 28: { 29: student_info *stu_info = (student_info *)malloc(sizeof(student_info)); 30: if (!stu_info) 31: { 32: fprintf(stderr, "Failed to malloc memory.n"); 33: return NULL; 34: } 35: srand(timer); 36: stu_info->stu_id = 10000 + rand() % 9999; 37: stu_info->age = rand() % 30; 38: stu_info->score = rand() % 101; 39: //print_student_info(stu_info); 40: return stu_info; 41: } 42: 43: void * consumer_proc(void *arg) 44: { 45: UnlockQueue* queue = (UnlockQueue *)arg; 46: student_info stu_info; 47: while(1) 48: { 49: sleep(1); 50: unsigned int len = queue->Get((unsigned char *)&stu_info, sizeof(student_info)); 51: if(len > 0) 52: { 53: printf("------------------------------------------n"); 54: printf("UnlockQueue length: %un", queue->GetDataLen()); 55: printf("Get a studentn"); 56: print_student_info(&stu_info); 57: printf("------------------------------------------n"); 58: } 59: } 60: return (void *)queue; 61: } 62: 63: void * producer_proc(void *arg) 64: { 65: time_t cur_time; 66: UnlockQueue *queue = (UnlockQueue*)arg; 67: while(1) 68: { 69: time(&cur_time); 70: srand(cur_time); 71: int seed = rand() % 11111; 72: printf("******************************************n"); 73: student_info *stu_info = get_student_info(cur_time + seed); 74: printf("put a student info to queue.n"); 75: queue->Put( (unsigned char *)stu_info, sizeof(student_info)); 76: free(stu_info); 77: printf("UnlockQueue length: %un", queue->GetDataLen()); 78: printf("******************************************n"); 79: sleep(1); 80: } 81: return (void *)queue; 82: } 83: 84: 85: int main() 86: { 87: UnlockQueue unlockQueue(1024); 88: if(!unlockQueue.Initialize()) 89: { 90: return -1; 91: } 92: 93: pthread_t consumer_tid, producer_tid; 94: 95: printf("multi thread test.......n"); 96: 97: if(0 != pthread_create(&producer_tid, NULL, producer_proc, (void*)&unlockQueue)) 98: { 99: fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%sn", 100: errno, strerror(errno)); 101: return -1; 102: } 103: 104: if(0 != pthread_create(&consumer_tid, NULL, consumer_proc, (void*)&unlockQueue)) 105: { 106: fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%sn", 107: errno, strerror(errno)); 108: return -1; 109: } 110: 111: pthread_join(producer_tid, NULL); 112: pthread_join(consumer_tid, NULL); 113: 114: return 0; 115: } (编辑:开发网_开封站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |