码迷,mamicode.com
首页 > 编程语言 > 详细

ring buffer 亲测好用 C++ 11 加入 条件变量

时间:2018-12-17 02:16:04      阅读:144      评论:0      收藏:0      [点我收藏+]

标签:length   virtual   xxxxxx   tar   col   second   code   lse   gets   

 

#pragma once
#include <atomic>
#include <mutex>
class CCycleBuffer
{
public:
    bool isFull();
    bool isEmpty();
    void empty();
    int getReadableLength();
    int getWriteableLength();
    CCycleBuffer(int size);
    virtual~CCycleBuffer();
    int write(char* buf, int count);
    int read(char* buf, int count);
    int getStart()
    {
        return m_nReadPos;
    }
    int getEnd()
    {
        return m_nWritePos;
    }

private:
    std::atomic_bool m_bEmpty, m_bFull;
    char* m_pBuf;
    int m_nBufSize;
    std::atomic_int m_nReadPos;
    std::atomic_int m_nWritePos;
    //std::mutex m_mutex;
    std::mutex mtx;
    std::unique_lock <std::mutex> *lck;
    std::condition_variable cv;
    int test;
};
    

 

#include "CCycleBuffer.h"
#include <assert.h>
#include <memory.h>
// 定义 
CCycleBuffer::CCycleBuffer(int size)

{
    m_nBufSize = size;
    m_nReadPos = 0;
    m_nWritePos = 0;
    m_pBuf = new char[m_nBufSize];
    m_bEmpty = true;
    m_bFull = false;
    lck = new std::unique_lock <std::mutex>(mtx);
    test = 0;
}
CCycleBuffer::~CCycleBuffer()
{
    delete[] m_pBuf;
}
/************************************************************************/
/* 向缓冲区写入数据,返回实际写入的字节数                               */
/************************************************************************/
int CCycleBuffer::write(char* buf, int count)
{
    std::atomic_int m_nReadPos;
    m_nReadPos = (int)this->m_nReadPos;
    if (count <= 0)
        return 0;
    // 缓冲区已满,不能继续写入 
    if (m_bFull)
    {
        return 0;
    }
    else if (m_nReadPos == m_nWritePos)// 缓冲区为空时 
    {
        /*            == 内存模型 ==
        |←              m_nBufSize                →|
        |← (empty)→  ←                 (empty)  →|
        |------------||------------------------------|
                     ↑←       leftcount          →|           
                   m_nReadPos    
                   m_nWritePos
        */
        int leftcount = m_nBufSize - m_nWritePos;
        if (leftcount > count)
        {
            memcpy(m_pBuf + m_nWritePos, buf, count);
            //m_mutex.lock();
            m_nWritePos += count;
            m_bFull = (this->m_nReadPos == m_nWritePos);
            //m_mutex.unlock();
            if(m_bEmpty) cv.notify_all();
            m_bEmpty = false;
            return count;
        }
        else
        {
            std::atomic_int tmp;
            int leftcount2=0;
            memcpy(m_pBuf + m_nWritePos, buf, leftcount);
            tmp = count - leftcount;
            leftcount2 = (m_nReadPos > tmp) ? tmp : m_nReadPos;
            memcpy(m_pBuf, buf + leftcount, leftcount2);
            //m_mutex.lock();
            m_nWritePos = leftcount2;
            m_bFull = (this->m_nReadPos == m_nWritePos);
            //m_mutex.unlock();
            if(m_bEmpty) cv.notify_all();
            m_bEmpty = false;
            return leftcount + m_nWritePos;
        }
    }
    else if (m_nReadPos < m_nWritePos)// 有剩余空间可写入 
    {
        /*            == 内存模型 ==   
        |←              m_nBufSize                →|
        |← (empty)→  ← (data)  →  ←  (empty)  →|
        |------------||xxxxxxxxxxxxx||---------------|
                     ↑                   ↑← leftcount →|  
                   m_nReadPos    m_nWritePos       
        */
        // 剩余缓冲区大小(从写入位置到缓冲区尾) 

        int leftcount = m_nBufSize - m_nWritePos;
        int test = m_nWritePos;
        if (leftcount > count)   // 有足够的剩余空间存放 
        {
            memcpy(m_pBuf + m_nWritePos, buf, count);
            //m_mutex.lock();
            m_nWritePos += count;
            m_bFull = (this->m_nReadPos == m_nWritePos);
            //m_mutex.unlock();
            if(m_bEmpty) cv.notify_all();
            m_bEmpty = false;
            return count;
        }
        else       // 剩余空间不足 
        {
            // 先填充满剩余空间,再回头找空间存放
            std::atomic_int tmp;
            int leftcount2=0;
            memcpy(m_pBuf + test, buf, leftcount);

            tmp = count - leftcount;
            leftcount2 = (m_nReadPos > tmp) ? tmp : m_nReadPos;
            memcpy(m_pBuf, buf + leftcount, leftcount2);
            //m_mutex.lock();
            m_nWritePos = leftcount2;
            m_bFull = (this->m_nReadPos == m_nWritePos);
            //m_mutex.unlock();
            if(m_bEmpty) cv.notify_all();
            m_bEmpty = false;
            return leftcount + m_nWritePos;
        }
    }
    else
    {

        /*            == 内存模型 ==
        |←              m_nBufSize                  →|
        |← (data) →  ←  (empty)  →  ←  (data)   →|
        |xxxxxxxxxxxx||---------------||xxxxxxxxxxxxxxx|
        |             ↑← leftcount →↑               |
                   m_nWritePos    m_nReadPos    
        */
        int leftcount = m_nReadPos - m_nWritePos;
        if (leftcount > count)
        {
            // 有足够的剩余空间存放 
            memcpy(m_pBuf + m_nWritePos, buf, count);
            //m_mutex.lock();
            m_nWritePos += count;
            m_bFull = (this->m_nReadPos == m_nWritePos);
            //m_mutex.unlock();
            if(m_bEmpty) cv.notify_all();
            m_bEmpty = false;
            return count;
        }
        else
        {
            // 剩余空间不足时要丢弃后面的数据 
            memcpy(m_pBuf + m_nWritePos, buf, leftcount);
            //m_mutex.lock();
            m_nWritePos += leftcount;
            m_bFull = (this->m_nReadPos == m_nWritePos);
            //m_mutex.unlock();
            if(m_bEmpty) cv.notify_all();
            m_bEmpty = false;
            return leftcount;
        }
    }
}
/************************************************************************/
/* 从缓冲区读数据,返回实际读取的字节数                                 */
/************************************************************************/
int CCycleBuffer::read(char* buf, int count)
{
    std::atomic_int m_nWritePos;
    m_nWritePos = (int)this->m_nWritePos;
    if (count <= 0)
        return 0;
    //m_bFull = false;
    if (m_bEmpty)       // 缓冲区空,不能继续读取数据 
    {
        cv.wait(*lck);
        //return 0;
    }
    else if (m_nReadPos == m_nWritePos)   // 缓冲区满时 
    {
        /*            == 内存模型 ==
        |←              m_nBufSize                  →|
        |← (data) →  ←                   (data)   →|
        |xxxxxxxxxxxx||xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx|
        |             ↑← leftcount                  →|
                   m_nWritePos 
                   m_nReadPos
        */
        int leftcount = m_nBufSize - m_nReadPos;
        if (leftcount > count)
        {
            memcpy(buf, m_pBuf + m_nReadPos, count);
            //m_mutex.lock();
            m_nReadPos += count;
            m_bEmpty = (m_nReadPos == this->m_nWritePos);
            //m_mutex.unlock();
            m_bFull = false;
            return count;
        }
        else
        {
            std::atomic_int tmp;
            int leftcount2;
            memcpy(buf, m_pBuf + m_nReadPos, leftcount);
            tmp = count - leftcount;
            leftcount2 = (m_nWritePos > tmp) ? tmp : m_nWritePos;
            memcpy(buf + leftcount, m_pBuf, leftcount2);
            //m_mutex.lock();
            m_nReadPos = leftcount2;
            m_bEmpty = (m_nReadPos == this->m_nWritePos);
            //m_mutex.unlock();
            m_bFull = false;
            return leftcount + m_nReadPos;
        }
    }
    else if (m_nReadPos < m_nWritePos)   // 写指针在前(未读数据是连接的) 
    {
        /*            == 内存模型 ==
        |←              m_nBufSize                  →|
        |← (empty)→  ← (data)    →  ←  (empty)  →|
        |------------||xxxxxxxxxxxxxxx||---------------|
                     ↑← leftcount →↑               |
                   m_nReadPos    m_nWritePos
        */
        int leftcount = m_nWritePos - m_nReadPos;
        int c = (leftcount > count) ? count : leftcount;
        memcpy(buf, m_pBuf + m_nReadPos, c);
        //m_mutex.lock();
        m_nReadPos += c;
        m_bEmpty = (m_nReadPos == this->m_nWritePos);
        //m_mutex.unlock();
        m_bFull = false;
        return c;
    }
    else          // 读指针在前(未读数据可能是不连接的) 
    {
        /*            == 内存模型 ==
        |←              m_nBufSize                  →|
        |← (data) →  ←  (empty)  →  ←  (data)   →|
        |xxxxxxxxxxxx||---------------||xxxxxxxxxxxxxxx|
        |             ↑               ↑← leftcount →|
                   m_nWritePos    m_nReadPos
        */
        int leftcount = m_nBufSize - m_nReadPos;
        if (leftcount > count)   // 未读缓冲区够大,直接读取数据 
        {
            memcpy(buf, m_pBuf + m_nReadPos, count);
            //m_mutex.lock();
            m_nReadPos += count;
            m_bEmpty = (m_nReadPos == this->m_nWritePos);
            //m_mutex.unlock();
            m_bFull = false;
            return count;
        }
        else       // 未读缓冲区不足,需回到缓冲区头开始读 
        {
            memcpy(buf, m_pBuf + m_nReadPos, leftcount);
            std::atomic_int tmp;
            int leftcount2;
            tmp = count - leftcount;
            leftcount2 = (m_nWritePos > tmp) ? tmp : m_nWritePos;
            memcpy(buf + leftcount, m_pBuf, leftcount2);
            //m_mutex.lock();
            m_nReadPos = leftcount2;
            m_bEmpty = (m_nReadPos == this->m_nWritePos);
            //m_mutex.unlock();
            m_bFull = false;
            return leftcount + m_nReadPos;
        }
    }
}
/************************************************************************/
/* 获取缓冲区有效数据长度                                               */
/************************************************************************/
int CCycleBuffer::getReadableLength()
{
    if (m_bEmpty)
    {
        return 0;
    }
    else if (m_bFull)
    {
        return m_nBufSize;
    }
    else if (m_nReadPos < m_nWritePos)
    {
        return m_nWritePos - m_nReadPos;
    }
    else
    {
        return m_nBufSize - m_nReadPos + m_nWritePos;
    }
}
int CCycleBuffer::getWriteableLength()
{
    return m_nBufSize-getReadableLength();
}
void CCycleBuffer::empty()
{
    m_nReadPos = 0;
    m_nWritePos = 0;
    m_bEmpty = true;
    m_bFull = false;
}
bool CCycleBuffer::isEmpty()
{
    return m_bEmpty;
}
bool CCycleBuffer::isFull()
{
    return m_bFull;
}

 

#include <iostream>                // std::cout
#include <thread>                // std::thread
#include <mutex>                // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable
#include <chrono>
#include "CCycleBuffer.h"
//#define TESTTHREAD 
#define TESTRINGBUF

#ifdef TESTTHREAD
std::mutex mtx; // 全局互斥锁.
std::condition_variable cv; // 全局条件变量.
bool ready = false; // 全局标志位.
#endif
CCycleBuffer rbuf(10);
char buf[] = "123456";
char  buf4r[6] = { 0 };
int main()
{
    std::cout<<"begin main..."<<std::endl;
#ifdef TESTTHREAD
    std::thread* threads[10];
    // spawn 10 threads:
    for (int i = 0; i < 10; ++i) {
        threads[i] = new  std::thread([](int id){
            std::unique_lock <std::mutex> lck(mtx);
            //while (!ready) // 如果标志位不为 true, 则等待...
            cv.wait(lck); // 当前线程被阻塞, 当全局标志位变为 true 之后,
        // 线程被唤醒, 继续往下执行打印线程编号id.
            std::cout << "thread " << id << \n;
        },i);
    }
        

    std::cout << "10 threads ready to race...\n";
    std::unique_lock <std::mutex> lck(mtx);
    ready = true; // 设置全局标志位为 true.
    cv.notify_all(); // 唤醒所有线程.

    for (auto & th : threads)
        th->join();
#endif
#ifdef TESTRINGBUF
    

    int ret;
    std::thread* threads[2];
    //bzero(buf4r,sizeof(buf4r));
    //ret = rbuf.getWriteableLength();
    //ret = rbuf.write(buf,4);
    //ret = rbuf.read(buf4r,2);
    threads[0] = new std::thread([]() {
        std::cout<<"begin write thread ..."<<std::endl;
        while (true)
        {
            if (5 < rbuf.getWriteableLength())
            {
                rbuf.write(buf, 5);
            }
            std::this_thread::sleep_for(std::chrono::seconds(2));
        }
    });
    threads[1] = new std::thread([]() {
        int ret;
        static int cnt = 0;
        bool noData;
        std::cout<<"begin read thread ..."<<std::endl;
        while (true)
        {
            ret = rbuf.read(buf4r, 1);
            if (1 == ret)
            {
                noData = false;
                //std::cout<<"read thread ..."<<std::endl;
                std::cout << "Read:["<<buf4r[0]<<"]"<<std::endl;
                if (0 == ++cnt%5)
                {
                    //std::cout << std::endl;
                }
                //std::cout << "xxx" << std::endl;
            }
            else
            {
                //if (noData == false)
                {
                    //noData = true;
                    std::cout << " read nothing " << std::endl;
                }
            }
        }
    });
#endif // TESTRINGBUF
    for (auto &th: threads)
    {
        th->join();
    }
    //getchar();
    return 0;
}

 

ring buffer 亲测好用 C++ 11 加入 条件变量

标签:length   virtual   xxxxxx   tar   col   second   code   lse   gets   

原文地址:https://www.cnblogs.com/liuguoyao514257665/p/10129175.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!