标签:维护 剩余空间 左值 wap 新建 ogg include main names
这是一个异步日志类:前端多个线程只管向这个日志类的缓冲区中写入日志,后端利用一个线程把缓冲区中的日志写入文件
因此:日志数据流向过程是 [日志->缓冲区->文件]
这是一个多生产者,单消费者的任务场景,多生产者负责把日志写入缓冲区,单消费者负责把缓冲区中数据写入文件
如果只用一个缓冲区,不光要同步各个生产者,还要同步生产者和消费者,
最重要的是需要保证生产者与消费者的并发,也就是前端不断写日志到缓冲区的同时,后端可以把缓冲区写入文件
muduo利用双缓冲区,前端与后端分别维护一个vector<buffer>,里面存放多个小的buffer
前端不断地向vector<buffer>1中写入数据,后端不断地从vector<buffer>2中写日志数据到文件中
每隔一段时间(3s)或者1个buffer满了就交换vector<buffer>1和vector<buffer>2,
由此来保证前端和后端日志线程的并发执行.
前后端缓冲区结构示例:
前端缓冲区 m_buffers [m_currentBuffer][m_nextBuffer][...]
后端缓冲区 bufferToWrite [newBuffer1][newBuffer2][...]
下面是操作步骤:
多个前端线程:
获得锁,
m_currentBuffer空间可用,写入m_currentBuffer
如果m_currentBuffer空间满了,立马把m_currentBuffer写入到m_buffers中;如果m_nextBuffer没满就让
m_nextBuffer作为m_currentBuffer;也就是m_currentBuffer=std::move(m_nextBuffer);
m_nextBuffer也满了就新建一个buffer作为m_currentBuffer;m_currentBuffer.reset(new Buffer);
单个后端线程:
不断等待条件变量m_cond(唤醒条件:过了3s或者m_currentBuffer满了),
被唤醒后把m_currentBuffer中的数据加入到m_buffers;让m_newBuffer1作为m_currentBuffer;
交换m_buffers和buffersToWrite,如果需要,让m_newBuffer2作为m_nextBuffer;
把buffersToWrite中的数据写入到文件中,写完之后留两块buffer作为newBuffer1和newBuffer2
#ifndef ASYNCLOGGING_H #define ASYNCLOGGING_H #include"base/blockingqueue.h" #include"base/boundedblockingqueue.h" #include"base/countdownlatch.h" #include"base/mutex.h" #include"base/thread.h" #include"base/logstream.h" #include<atomic> #include<vector> namespace mymuduo{ class asynclogging:noncopyable { public: asynclogging(const string& basename,off_t rollSize,int flushInterval=3); ~asynclogging() { if(m_running) this->stop(); } //前端线程使用,把数据追加到缓冲区中 void append(const char* logline, int len); void start() { m_running=true; //m_thread在构造函数中被定义为thread(std::bind(&asynclogging::threadFunc,this),"Loggins") //m_thread要执行的线程函数就是threadFunc()函数 m_thread.start(); //必须等到m_latch=0时才能返回,m_latch在threadFunc()执行时变为0, //保证初始化工作都做好了,后端日志线程已经启动才可以返回. m_latch.wait(); } //停止写日志线程 void stop() { m_running=false; m_cond.notify(); m_thread.join(); } private: //后端日志线程函数,用于把缓冲区日志写到文件中去. void threadFunc(); //kLargeBuffer=4000*1000 也就是FixedBuffer内部字符数组大小为4000*1000,4MB typedef detail::FixedBuffer<detail::kLargeBuffer> Buffer; typedef std::vector<std::unique_ptr<Buffer>> BufferVector;//vector,里面存放Buffer之智能指针 typedef BufferVector::value_type BufferPtr; //std::unique_ptr<Buffer>类型 const int m_flushInterval; //超时时间,每隔一段时间写日志,3s std::atomic<bool> m_running; //是否正在运行 const string m_basename; //日志名字 const off_t m_rollSize; //预留的日志大小 mymuduo::thread m_thread; //后端线程,用于把日志写入日志文件 mymuduo::countdownlatch m_latch;//倒计时,用于指示日志记录器何时开始工作 mymuduo::mutexlock m_mutex; //互斥锁 mymuduo::condition m_cond; //条件变量 BufferPtr m_currentBuffer; //当前缓冲区 BufferPtr m_nextBuffer; //预留缓冲区 BufferVector m_buffers; //缓冲区队列,待写入文件 }; } #endif // ASYNCLOGGING_H
#include "asynclogging.h" #include"base/logfile.h" #include"base/timestamp.h" namespace mymuduo { asynclogging::asynclogging(const string& basename,off_t rollSize,int flushInterval) :m_flushInterval(flushInterval),m_running(false),m_basename(basename), m_rollSize(rollSize),m_thread(std::bind(&asynclogging::threadFunc,this),"Loggins"), m_latch(1),m_mutex(),m_cond(m_mutex),m_currentBuffer(new Buffer), m_nextBuffer(new Buffer),m_buffers() { m_currentBuffer->bzero(); m_nextBuffer->bzero(); m_buffers.reserve(16); } //所有LOG_*宏定义都会调用到这个append函数,用于把日志写到缓冲区中 //2个默认缓冲区,m_currentBuffer和m_nextBuffer //current写满了换next,next也写满了新建一个 void asynclogging::append(const char* logline,int len) { //加锁操作日志缓冲区 mutexlockguard mlg(m_mutex); //当前缓冲区剩余空间够,直接写到当前缓冲区 if(m_currentBuffer->avail()>len) m_currentBuffer->append(logline,len); else //当前缓冲区被写满日志了,换m_nextBuffer来写 { //std::move把左值强制转化成右值引用,可以通过右值引用使用该值,来用于移动语义, //例如用vector::push_back()会开辟内存拷贝元素,若是使用了std::move就不会发生复制 //在这里意思就是把m_currentBuffer对象的所有权剥夺,转移到m_buffers.back()上面 //不需要开辟内存拷贝元素,由于m_currentBuffer所有权被剥夺,因此m_currentBuffer变为空 m_buffers.push_back(std::move(m_currentBuffer)); //m_nextBuffer可用就用m_nextBuffer,否则就新建一个额外的缓冲区来写 if(m_nextBuffer) m_currentBuffer=std::move(m_nextBuffer); else m_currentBuffer.reset(new Buffer); //把日志写到m_currentBuffer中 m_currentBuffer->append(logline,len); //通知m_buffers中有数据了,可以把m_buffers中数据写到文件中去了 m_cond.notify(); } } //这个函数是后端线程函数,用于周期性地把日志写入到文件中去 //每隔m_flushInterval(3)秒 或者是 m_currentBuffer满了 , 就把m_currentBuffer加入到m_buffers中 // void asynclogging::threadFunc() { assert(m_running==true); m_latch.countDown(); //latch变为0,start函数可以返回了 logfile output(m_basename,m_rollSize,false); //logfile用于写日志到文件中 //又新建了2块缓冲区,属于后端线程的2块缓冲区 BufferPtr newBuffer1(new Buffer); BufferPtr newBuffer2(new Buffer); newBuffer1->bzero(); newBuffer2->bzero(); //这块缓冲区vector属于后端线程,用于和前端的m_buffers进行交换 BufferVector buffersToWrite; buffersToWrite.reserve(16); while(m_running) { //保证后端缓冲区newBuffer1,2和后端缓冲区队列buffersToWrite是空的 assert(newBuffer1 && newBuffer1->length() == 0); assert(newBuffer2 && newBuffer2->length() == 0); assert(buffersToWrite.empty()); //这一块主要是用于把m_buffers中的数据交换到buffersToWrite中 { mutexlockguard mlg(m_mutex); //每隔3s或是m_currentBuffer满了,就把当前m_currentBuffer加入到m_buffers中 if(m_buffers.empty()) m_cond.waitForSeconds(m_flushInterval); m_buffers.push_back(std::move(m_currentBuffer));//m_currentBuffer添加到m_buffers m_currentBuffer=std::move(newBuffer1); //m_currentBuffer现在是newBuffer1 buffersToWrite.swap(m_buffers); //交换buffersToWrite和m_buffers if(!m_nextBuffer) m_nextBuffer=std::move(newBuffer2); //m_nextBuffer现在是newBuffer2 } assert(!buffersToWrite.empty()); //保证始终有数据可以写入到文件中去 //把m_buffers中的数据转移到bufferToWrite中 //如果要写入文件的buffersToWrite中buffer数目大于25,就删除多余的数据 //删除的目的是为了解决日志消息堆积的问题:前端日志记录太快,后端来不及写到文件中去 if (buffersToWrite.size() > 25) { char buf[256]; snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n", timestamp::now().toFormattedString().c_str(), buffersToWrite.size()-2); fputs(buf, stderr); output.append(buf, static_cast<int>(strlen(buf))); //在buffersToWrite中就留两块缓冲区,其他多余的缓冲区的全部删除 buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end()); } //把bufferToWrite中的数据全部写入到文件中 for (const auto& buffer : buffersToWrite) { // FIXME: use unbuffered stdio FILE ? or use ::writev ? output.append(buffer->data(), buffer->length()); } //重新调整bufferToWrite的大小,仅保留2个buffer用于newBuffer1和newBuffer2 if (buffersToWrite.size() > 2) { // drop non-bzero-ed buffers, avoid trashing buffersToWrite.resize(2); } if (!newBuffer1) { assert(!buffersToWrite.empty()); //从bufferToWrite中弹出一个buffer作为newBuffer1 newBuffer1 = std::move(buffersToWrite.back()); buffersToWrite.pop_back(); newBuffer1->reset(); } if (!newBuffer2) { assert(!buffersToWrite.empty()); //在弹出一个buffer作为newBuffer2 newBuffer2 = std::move(buffersToWrite.back()); buffersToWrite.pop_back(); newBuffer2->reset(); } buffersToWrite.clear(); //保证buffersToWrite为空,要不然和前端m_buffers交换会出问题 output.flush(); } output.flush(); } }//namespace mymuduo
#include "base/asynclogging.h" #include "base/logging.h" #include "base/timestamp.h" #include"base/threadpool.h" #include <stdio.h> #include <sys/resource.h> #include <unistd.h> #include<string> #include<iostream> off_t kRollSize = 500*1000*1000; //全局异步日志类指针,只想主线程唯一的异步日志对象 mymuduo::asynclogging* g_asyncLog = NULL; //日志输出函数,替换掉默认的输出到stdout上,现在利用asynclogging把日志输出到文件中 void asyncOutput(const char* msg, int len) { g_asyncLog->append(msg, len); } //线程函数,每隔一段时间写一个日志 void workerThread() { mymuduo::currentthread::sleepUsec(1000*500); //LOG_WARN实质是创建一个临时变量logger,析构时会调用output函数,就是上面那个asyncOutPut LOG_WARN<<mymuduo::currentthread::tid(); } int main() { //设置日志参数 mymuduo::logger::setOutput(asyncOutput); //输出方式,改成文件 mymuduo::logger::setLogLevel(mymuduo::logger::WARN); //主线程唯一的一步日志类 mymuduo::asynclogging log("zqc",kRollSize); log.start(); g_asyncLog=&log; //typedef void (*outputFunc)(const char* msg,int len); //用线程池实现多线程打印日志 mymuduo::threadpool tp("testlog_threadpool"); tp.setMaxQueueSize(50); tp.start(5); for(int i=0;i<100;i++) tp.run(workerThread); mymuduo::currentthread::sleepUsec(1000*1000*5); tp.stop(); }
部分结果(文件:zqc.20200826-162021.master.72464.log)
20年08月27日 星期4 00:20:21.1598458821 72482 WARN 72482 - main.cpp:26
2020年08月27日 星期4 00:20:21.1598458821 72483 WARN 72483 - main.cpp:26
2020年08月27日 星期4 00:20:21.1598458821 72485 WARN 72485 - main.cpp:26
2020年08月27日 星期4 00:20:21.1598458821 72484 WARN 72484 - main.cpp:26
2020年08月27日 星期4 00:20:22.1598458822 72482 WARN 72482 - main.cpp:26
2020年08月27日 星期4 00:20:22.1598458822 72485 WARN 72485 - main.cpp:26
2020年08月27日 星期4 00:20:22.1598458822 72484 WARN 72484 - main.cpp:26
2020年08月27日 星期4 00:20:22.1598458822 72486 WARN 72486 - main.cpp:26
2020年08月27日 星期4 00:20:22.1598458822 72483 WARN 72483 - main.cpp:26
标签:维护 剩余空间 左值 wap 新建 ogg include main names
原文地址:https://www.cnblogs.com/woodineast/p/13569046.html