线程安全的环形缓冲区实现
睿丰德科技 专注RFID识别技术和条码识别技术与管理软件的集成项目。质量追溯系统、MES系统、金蝶与条码系统对接、用友与条码系统对接
来源:http://blog.csdn.net/lezhiyong
应用背景:线程1将每次数量不一的音频采样点(PCM音频数据)写入环形缓冲区,线程2每次取固定数量采样点送音频编码器,线程1线程2在平均时间内的读写数据量相等。(倒入桶中的水量有时大有时小,但每次取一瓢喝:)
该环形缓冲区借鉴CoolPlayer音频播放器中的环形缓冲区代码实现,在读写操作函数中加了锁,允许多线程同时操作。CPs_CircleBuffer基于内存段的读写,比用模板实现的环形缓冲队列适用的数据类型更广些, CPs_CircleBuffer修改成C++中基于对象的实现,加上详细注释,m_csCircleBuffer锁变量为自用的lock类型(将CRITICAL_SECTION封装起来),调用lock()加锁,调用unlock()解锁。使用效果良好,分享出来。
CPs_CircleBuffer环形缓冲还不具备当待写数据量超出空余缓冲时自动分配内存的功能,这个将在后续进行优化。
CPs_CircleBuffer使用步骤:
[cpp] view plaincopy
- 1、创建对象
- CPs_CircleBuffer* m_pCircleBuffer;
- m_pCircleBuffer = new CPs_CircleBuffer(bufsize);
- 2、写
- if (m_pCircleBuffer->GetFreeSize() < CIC_READCHUNKSIZE)
- {
- Sleep(20);
- continue;
- }
- m_pCircleBuffer->Write(internetbuffer.lpvBuffer,internetbuffer.dwBufferLength);
- 3、读
- m_pCircleBuffer->Read(pDestBuffer,iBytesToRead, piBytesRead);
- 4、其他调用
- if(m_pCircleBuffer->IsComplete())
- break;
- iUsedSpace =m_pCircleBuffer->GetUsedSize();
- m_pCircleBuffer->SetComplete();
CPs_CircleBuffer修改为类的定义:
[cpp] view plaincopy- class CPs_CircleBuffer
- {
- public:
- CPs_CircleBuffer(const unsigned int iBufferSize);
- ~CPs_CircleBuffer();
- public:
- // Public functions
- void Uninitialise();
- void Write(const void* pSourceBuffer, const unsigned int iNumBytes);
- bool Read(void* pDestBuffer, const size_t iBytesToRead, size_t* pbBytesRead);
- void Flush();
- unsigned int GetUsedSize();
- unsigned int GetFreeSize();
- void SetComplete();
- bool IsComplete();
- private:
- unsigned char* m_pBuffer;
- unsigned int m_iBufferSize;
- unsigned int m_iReadCursor;
- unsigned int m_iWriteCursor;
- HANDLE m_evtDataAvailable;
- Vlock m_csCircleBuffer;
- bool m_bComplete;
- };
CPs_CircleBuffer修改为类的实现:
[cpp] view plaincopy- #define CIC_WAITTIMEOUT 3000
- CPs_CircleBuffer::CPs_CircleBuffer(const unsigned int iBufferSize)
- {
- m_iBufferSize = iBufferSize;
- m_pBuffer = (unsigned char*)malloc(iBufferSize);
- m_iReadCursor = 0;
- m_iWriteCursor = 0;
- m_bComplete = false;
- m_evtDataAvailable = CreateEvent(NULL, FALSE, FALSE, NULL);
- }
- CPs_CircleBuffer::~CPs_CircleBuffer()
- {
- Uninitialise();
- }
- // Public functions
- void CPs_CircleBuffer::Uninitialise()//没有必要public这个接口函数,long120817
- {
- CloseHandle(m_evtDataAvailable);
- free(m_pBuffer);
- }
- //Write前一定要调用m_pCircleBuffer->GetFreeSize(),如果FreeSize不够需要等待,long120817
- void CPs_CircleBuffer::Write(const void* _pSourceBuffer, const unsigned int _iNumBytes)
- {
- unsigned int iBytesToWrite = _iNumBytes;
- unsigned char* pSourceReadCursor = (unsigned char*)_pSourceBuffer;
- //CP_ASSERT(iBytesToWrite <= GetFreeSize());//修改为没有足够空间就返回,write前一定要加GetFreeSize判断,否则进入到这里相当于丢掉数据, // long120817
- if (iBytesToWrite > GetFreeSize())
- {
- return;
- }
- _ASSERT(m_bComplete == false);
- m_csCircleBuffer.Lock();
- if (m_iWriteCursor >= m_iReadCursor)
- {
- // 0 m_iBufferSize
- // |-----------------|===========|--------------|
- // pR-> pW->
- // 计算尾部可写空间iChunkSize,long120817
- unsigned int iChunkSize = m_iBufferSize - m_iWriteCursor;
- if (iChunkSize > iBytesToWrite)
- {
- iChunkSize = iBytesToWrite;
- }
- // Copy the data
- memcpy(m_pBuffer + m_iWriteCursor,pSourceReadCursor, iChunkSize);
- pSourceReadCursor += iChunkSize;
- iBytesToWrite -= iChunkSize;
- // 更新m_iWriteCursor
- m_iWriteCursor += iChunkSize;
- if (m_iWriteCursor >= m_iBufferSize)//如果m_iWriteCursor已经到达末尾
- m_iWriteCursor -= m_iBufferSize;//返回到起点0位置,long120817
- }
- //剩余数据从Buffer起始位置开始写
- if (iBytesToWrite)
- {
- memcpy(m_pBuffer + m_iWriteCursor,pSourceReadCursor, iBytesToWrite);
- m_iWriteCursor += iBytesToWrite;
- _ASSERT(m_iWriteCursor < m_iBufferSize);//这个断言没什么意思,应该_ASSERT(m_iWriteCursor <= m_iReadCursor);long20120817
- }
- SetEvent(m_evtDataAvailable);//设置数据写好信号量
- m_csCircleBuffer.UnLock();
- }
- bool CPs_CircleBuffer::Read(void* pDestBuffer, const size_t _iBytesToRead, size_t* pbBytesRead)
- {
- size_t iBytesToRead = _iBytesToRead;
- size_t iBytesRead = 0;
- DWORD dwWaitResult;
- bool bComplete = false;
- while (iBytesToRead > 0 && bComplete == false)
- {
- dwWaitResult = WaitForSingleObject(m_evtDataAvailable, CIC_WAITTIMEOUT);//等待数据写好,long120817
- if (dwWaitResult == WAIT_TIMEOUT)
- {
- //TRACE_INFO2("Circle buffer - did not fill in time!");
- *pbBytesRead = iBytesRead;
- return FALSE;//等待超时则返回
- }
- m_csCircleBuffer.Lock();
- if (m_iReadCursor > m_iWriteCursor)
- {
- // 0 m_iBufferSize
- // |=================|-----|===========================|
- // pW-> pR->
- unsigned int iChunkSize = m_iBufferSize - m_iReadCursor;
- if (iChunkSize > iBytesToRead)
- iChunkSize = (unsigned int)iBytesToRead;
- //读取操作
- memcpy((unsigned char*)pDestBuffer + iBytesRead,m_pBuffer + m_iReadCursor,iChunkSize);
- iBytesRead += iChunkSize;
- iBytesToRead -= iChunkSize;
- m_iReadCursor += iChunkSize;
- if (m_iReadCursor >= m_iBufferSize)//如果m_iReadCursor已经到达末尾
- m_iReadCursor -= m_iBufferSize;//返回到起点0位置,long120817
- }
- if (iBytesToRead && m_iReadCursor < m_iWriteCursor)
- {
- unsigned int iChunkSize = m_iWriteCursor - m_iReadCursor;
- if (iChunkSize > iBytesToRead)
- iChunkSize = (unsigned int)iBytesToRead;
- //读取操作
- memcpy((unsigned char*)pDestBuffer + iBytesRead,m_pBuffer + m_iReadCursor,iChunkSize);
- iBytesRead += iChunkSize;
- iBytesToRead -= iChunkSize;
- m_iReadCursor += iChunkSize;
- }
- //如果有更多的数据要写
- if (m_iReadCursor == m_iWriteCursor)
- {
- if (m_bComplete)//跳出下一个while循环,该值通过SetComplete()设置,此逻辑什么意思?long120817
- bComplete = true;
- }
- else//还有数据可以读,SetEvent,在下一个while循环开始可以不用再等待,long120817
- SetEvent(m_evtDataAvailable);
- m_csCircleBuffer.UnLock();
- }
- *pbBytesRead = iBytesRead;
- return bComplete ? false : true;
- }
- // 0 m_iBufferSize
- // |------------------------------------------------|
- // pR
- // pW
- //读写指针归零
- void CPs_CircleBuffer::Flush()
- {
- m_csCircleBuffer.Lock();
- m_iReadCursor = 0;
- m_iWriteCursor = 0;
- m_csCircleBuffer.UnLock();
- }
- //获取已经写的内存
- unsigned int CPs_CircleBuffer::GetUsedSize()
- {
- return m_iBufferSize - GetFreeSize();
- }
- unsigned int CPs_CircleBuffer::GetFreeSize()
- {
- unsigned int iNumBytesFree;
- m_csCircleBuffer.Lock();
- if (m_iWriteCursor < m_iReadCursor)
- {
- // 0 m_iBufferSize
- // |=================|-----|===========================|
- // pW-> pR->
- iNumBytesFree = (m_iReadCursor - 1) - m_iWriteCursor;
- }
- else if (m_iWriteCursor == m_iReadCursor)
- {
- iNumBytesFree = m_iBufferSize;
- }
- else
- {
- // 0 m_iBufferSize
- // |-----------------|=====|---------------------------|
- // pR-> pW->
- iNumBytesFree = (m_iReadCursor - 1) + (m_iBufferSize - m_iWriteCursor);
- }
- m_csCircleBuffer.UnLock();
- return iNumBytesFree;
- }
- //该函数什么时候调用?long120817
- void CPs_CircleBuffer::SetComplete()
- {
- m_csCircleBuffer.Lock();
- m_bComplete = true;
- SetEvent(m_evtDataAvailable);
- m_csCircleBuffer.UnLock();
- }
附自动初始化和摧毁的锁对象Vlock的实现:
[cpp] view plaincopy- #ifdef WIN32
- #include <windows.h>
- #define V_MUTEX CRITICAL_SECTION //利用临界区实现的锁变量
- #define V_MUTEX_INIT(m) InitializeCriticalSection(m)
- #define V_MUTEX_LOCK(m) EnterCriticalSection(m)
- #define V_MUTEX_UNLOCK(m) LeaveCriticalSection(m)
- #define V_MUTEX_DESTORY(m) DeleteCriticalSection(m)
- #else
- #define V_MUTEX pthread_mutex_t
- #define V_MUTEX_INIT(m) pthread_mutex_init(m,NULL)
- #define V_MUTEX_LOCK(m) pthread_mutex_Lock(m)
- #define V_MUTEX_UNLOCK(m) pthread_mutex_unLock(m)
- #define V_MUTEX_DESTORY(m) pthread_mutex_destroy(m)
- #endif
- class Vlock
- {
- public:
- Vlock(void)
- {
- V_MUTEX_INIT(&m_Lock);
- }
- ~Vlock(void)
- {
- V_MUTEX_DESTORY(&m_Lock);
- }
- public:
- void Lock(){V_MUTEX_LOCK(&m_Lock);}
- void UnLock(){V_MUTEX_UNLOCK(&m_Lock);}
- private:
- V_MUTEX m_Lock;
- };