标签:length virtual xxxxxx tar col second code lse gets
#pragma once #include <atomic> #include <mutex> class CCycleBuffer { public: bool isFull(); bool isEmpty(); void empty(); int getReadableLength(); int getWriteableLength(); CCycleBuffer(int size); virtual~CCycleBuffer(); int write(char* buf, int count); int read(char* buf, int count); int getStart() { return m_nReadPos; } int getEnd() { return m_nWritePos; } private: std::atomic_bool m_bEmpty, m_bFull; char* m_pBuf; int m_nBufSize; std::atomic_int m_nReadPos; std::atomic_int m_nWritePos; //std::mutex m_mutex; std::mutex mtx; std::unique_lock <std::mutex> *lck; std::condition_variable cv; int test; };
#include "CCycleBuffer.h" #include <assert.h> #include <memory.h> // 定义 CCycleBuffer::CCycleBuffer(int size) { m_nBufSize = size; m_nReadPos = 0; m_nWritePos = 0; m_pBuf = new char[m_nBufSize]; m_bEmpty = true; m_bFull = false; lck = new std::unique_lock <std::mutex>(mtx); test = 0; } CCycleBuffer::~CCycleBuffer() { delete[] m_pBuf; } /************************************************************************/ /* 向缓冲区写入数据,返回实际写入的字节数 */ /************************************************************************/ int CCycleBuffer::write(char* buf, int count) { std::atomic_int m_nReadPos; m_nReadPos = (int)this->m_nReadPos; if (count <= 0) return 0; // 缓冲区已满,不能继续写入 if (m_bFull) { return 0; } else if (m_nReadPos == m_nWritePos)// 缓冲区为空时 { /* == 内存模型 == |← m_nBufSize →| |← (empty)→ ← (empty) →| |------------||------------------------------| ↑← leftcount →| m_nReadPos m_nWritePos */ int leftcount = m_nBufSize - m_nWritePos; if (leftcount > count) { memcpy(m_pBuf + m_nWritePos, buf, count); //m_mutex.lock(); m_nWritePos += count; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return count; } else { std::atomic_int tmp; int leftcount2=0; memcpy(m_pBuf + m_nWritePos, buf, leftcount); tmp = count - leftcount; leftcount2 = (m_nReadPos > tmp) ? tmp : m_nReadPos; memcpy(m_pBuf, buf + leftcount, leftcount2); //m_mutex.lock(); m_nWritePos = leftcount2; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return leftcount + m_nWritePos; } } else if (m_nReadPos < m_nWritePos)// 有剩余空间可写入 { /* == 内存模型 == |← m_nBufSize →| |← (empty)→ ← (data) → ← (empty) →| |------------||xxxxxxxxxxxxx||---------------| ↑ ↑← leftcount →| m_nReadPos m_nWritePos */ // 剩余缓冲区大小(从写入位置到缓冲区尾) int leftcount = m_nBufSize - m_nWritePos; int test = m_nWritePos; if (leftcount > count) // 有足够的剩余空间存放 { memcpy(m_pBuf + m_nWritePos, buf, count); //m_mutex.lock(); m_nWritePos += count; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return count; } else // 剩余空间不足 { // 先填充满剩余空间,再回头找空间存放 std::atomic_int tmp; int leftcount2=0; memcpy(m_pBuf + test, buf, leftcount); tmp = count - leftcount; leftcount2 = (m_nReadPos > tmp) ? tmp : m_nReadPos; memcpy(m_pBuf, buf + leftcount, leftcount2); //m_mutex.lock(); m_nWritePos = leftcount2; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return leftcount + m_nWritePos; } } else { /* == 内存模型 == |← m_nBufSize →| |← (data) → ← (empty) → ← (data) →| |xxxxxxxxxxxx||---------------||xxxxxxxxxxxxxxx| | ↑← leftcount →↑ | m_nWritePos m_nReadPos */ int leftcount = m_nReadPos - m_nWritePos; if (leftcount > count) { // 有足够的剩余空间存放 memcpy(m_pBuf + m_nWritePos, buf, count); //m_mutex.lock(); m_nWritePos += count; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return count; } else { // 剩余空间不足时要丢弃后面的数据 memcpy(m_pBuf + m_nWritePos, buf, leftcount); //m_mutex.lock(); m_nWritePos += leftcount; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return leftcount; } } } /************************************************************************/ /* 从缓冲区读数据,返回实际读取的字节数 */ /************************************************************************/ int CCycleBuffer::read(char* buf, int count) { std::atomic_int m_nWritePos; m_nWritePos = (int)this->m_nWritePos; if (count <= 0) return 0; //m_bFull = false; if (m_bEmpty) // 缓冲区空,不能继续读取数据 { cv.wait(*lck); //return 0; } else if (m_nReadPos == m_nWritePos) // 缓冲区满时 { /* == 内存模型 == |← m_nBufSize →| |← (data) → ← (data) →| |xxxxxxxxxxxx||xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx| | ↑← leftcount →| m_nWritePos m_nReadPos */ int leftcount = m_nBufSize - m_nReadPos; if (leftcount > count) { memcpy(buf, m_pBuf + m_nReadPos, count); //m_mutex.lock(); m_nReadPos += count; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return count; } else { std::atomic_int tmp; int leftcount2; memcpy(buf, m_pBuf + m_nReadPos, leftcount); tmp = count - leftcount; leftcount2 = (m_nWritePos > tmp) ? tmp : m_nWritePos; memcpy(buf + leftcount, m_pBuf, leftcount2); //m_mutex.lock(); m_nReadPos = leftcount2; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return leftcount + m_nReadPos; } } else if (m_nReadPos < m_nWritePos) // 写指针在前(未读数据是连接的) { /* == 内存模型 == |← m_nBufSize →| |← (empty)→ ← (data) → ← (empty) →| |------------||xxxxxxxxxxxxxxx||---------------| ↑← leftcount →↑ | m_nReadPos m_nWritePos */ int leftcount = m_nWritePos - m_nReadPos; int c = (leftcount > count) ? count : leftcount; memcpy(buf, m_pBuf + m_nReadPos, c); //m_mutex.lock(); m_nReadPos += c; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return c; } else // 读指针在前(未读数据可能是不连接的) { /* == 内存模型 == |← m_nBufSize →| |← (data) → ← (empty) → ← (data) →| |xxxxxxxxxxxx||---------------||xxxxxxxxxxxxxxx| | ↑ ↑← leftcount →| m_nWritePos m_nReadPos */ int leftcount = m_nBufSize - m_nReadPos; if (leftcount > count) // 未读缓冲区够大,直接读取数据 { memcpy(buf, m_pBuf + m_nReadPos, count); //m_mutex.lock(); m_nReadPos += count; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return count; } else // 未读缓冲区不足,需回到缓冲区头开始读 { memcpy(buf, m_pBuf + m_nReadPos, leftcount); std::atomic_int tmp; int leftcount2; tmp = count - leftcount; leftcount2 = (m_nWritePos > tmp) ? tmp : m_nWritePos; memcpy(buf + leftcount, m_pBuf, leftcount2); //m_mutex.lock(); m_nReadPos = leftcount2; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return leftcount + m_nReadPos; } } } /************************************************************************/ /* 获取缓冲区有效数据长度 */ /************************************************************************/ int CCycleBuffer::getReadableLength() { if (m_bEmpty) { return 0; } else if (m_bFull) { return m_nBufSize; } else if (m_nReadPos < m_nWritePos) { return m_nWritePos - m_nReadPos; } else { return m_nBufSize - m_nReadPos + m_nWritePos; } } int CCycleBuffer::getWriteableLength() { return m_nBufSize-getReadableLength(); } void CCycleBuffer::empty() { m_nReadPos = 0; m_nWritePos = 0; m_bEmpty = true; m_bFull = false; } bool CCycleBuffer::isEmpty() { return m_bEmpty; } bool CCycleBuffer::isFull() { return m_bFull; }
#include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable #include <chrono> #include "CCycleBuffer.h" //#define TESTTHREAD #define TESTRINGBUF #ifdef TESTTHREAD std::mutex mtx; // 全局互斥锁. std::condition_variable cv; // 全局条件变量. bool ready = false; // 全局标志位. #endif CCycleBuffer rbuf(10); char buf[] = "123456"; char buf4r[6] = { 0 }; int main() { std::cout<<"begin main..."<<std::endl; #ifdef TESTTHREAD std::thread* threads[10]; // spawn 10 threads: for (int i = 0; i < 10; ++i) { threads[i] = new std::thread([](int id){ std::unique_lock <std::mutex> lck(mtx); //while (!ready) // 如果标志位不为 true, 则等待... cv.wait(lck); // 当前线程被阻塞, 当全局标志位变为 true 之后, // 线程被唤醒, 继续往下执行打印线程编号id. std::cout << "thread " << id << ‘\n‘; },i); } std::cout << "10 threads ready to race...\n"; std::unique_lock <std::mutex> lck(mtx); ready = true; // 设置全局标志位为 true. cv.notify_all(); // 唤醒所有线程. for (auto & th : threads) th->join(); #endif #ifdef TESTRINGBUF int ret; std::thread* threads[2]; //bzero(buf4r,sizeof(buf4r)); //ret = rbuf.getWriteableLength(); //ret = rbuf.write(buf,4); //ret = rbuf.read(buf4r,2); threads[0] = new std::thread([]() { std::cout<<"begin write thread ..."<<std::endl; while (true) { if (5 < rbuf.getWriteableLength()) { rbuf.write(buf, 5); } std::this_thread::sleep_for(std::chrono::seconds(2)); } }); threads[1] = new std::thread([]() { int ret; static int cnt = 0; bool noData; std::cout<<"begin read thread ..."<<std::endl; while (true) { ret = rbuf.read(buf4r, 1); if (1 == ret) { noData = false; //std::cout<<"read thread ..."<<std::endl; std::cout << "Read:["<<buf4r[0]<<"]"<<std::endl; if (0 == ++cnt%5) { //std::cout << std::endl; } //std::cout << "xxx" << std::endl; } else { //if (noData == false) { //noData = true; std::cout << " read nothing " << std::endl; } } } }); #endif // TESTRINGBUF for (auto &th: threads) { th->join(); } //getchar(); return 0; }
ring buffer 亲测好用 C++ 11 加入 条件变量
标签:length virtual xxxxxx tar col second code lse gets
原文地址:https://www.cnblogs.com/liuguoyao514257665/p/10129175.html