标签:
本节将研究日至组件的后端,并给出C++实现;
说明几点:
(1)首先线程安全性,即多个线程可以并发的写日至到后端,而多个线程的日至不会出现交织,比如在多线程中使用cout时,经常会发现数据出现交叉的情况,这是因为cout << x << y在输出cout << x 和cout << y并不是一个原子操作,可被其他的线程打断的;
(2)在如下示意图中,日至后端库维护3个缓冲池,一个是empty buffer缓冲池,前端可以将数据写入到该缓冲池中;另一个是full buffer缓冲池,后端负责要将该缓冲池的内容写入到日至文件中;当empty buffer的一个buffer满时,将会把它放入到full buffer中;最后一个缓冲池是swap buffers,它是空的,在进行写日至之前,首先和full buffers进行交换一下,这样full buffers就变为空,这样后端对日至的写就针对swap buffers来了(耗时时间可能很长,如果使用full buffers将会阻塞current buffer放入到full buffers),而full buffers可以继续来放入current buffer,因此不会阻塞前端写日至的操作;
(3)当前端出现将大量的数据写入到empty buffer中时,empty buffer可能已经为空,此时empty buffer是可以动态增长的;万一前端出现死循环,将大量的数据写入到empty buffer,进而full buffers也会出现大量日至内容,此时程序仅仅是丢弃full buffers多余的日至内容;最后将swapBuffers的内容回收到empty buffer时,empty buffers可能会有较多的buffer内存,empty buffers也是会受限制的;
class AsyncLogger final { public: AsyncLogger(const AsyncLogger&) = delete; AsyncLogger& operator=(const AsyncLogger&) = delete; explicit AsyncLogger(size_t maxEmptySmallBuffersSize = 10, size_t maxfullSmallBuffersSize = 20, size_t fflushInterval = 5); ~AsyncLogger(); void start(); void stop(); void append(const char* buf, size_t len); bool running() const { return _running; } public: void _takeEmptyBufferUnlock(); void _asyncLogger(); Base::Thread _thread; bool _running; std::vector<std::shared_ptr<SmallBuffer>> _emptyBuffers; std::vector<std::shared_ptr<SmallBuffer>> _fullBuffers; std::shared_ptr<SmallBuffer> _currentBuffer; Base::Mutex _mutex; Base::Condition _cond; const size_t cMaxEmptySmallBuffersSize; const size_t cMaxFullSmallBuffersSize; const size_t _fflushInterval; };
说明几点:
(1)append(const char* buf, size_t len)是提供给日至前端写数据的接口,在前一篇博客描述日至前端的时候介绍过日至前端的默认输出是通往控制台的,而如今通过该接口,日至前端就可以将日至内容写入到该后端;
(2)void _asyncLogger()就是日至后端写出数据至日至文件的主要函数;
(3) const size_t cMaxEmptySmallBuffersSize; 和const size_t cMaxFullSmallBuffersSize;分别是empty buffers和full buffers维护的缓冲池的最大大小,而 const size_t _fflushInterval;表示当full buffers为空时,后端异步写入将等待的时间,超过该时间后端会将日至内容写入,即使当前的full buffers为空,也会将current buffer写入;
AsyncLogger::AsyncLogger(size_t maxEmptySmallBuffersSize, size_t maxFullSmallBuffersSize, size_t fflushInterval) : _thread(std::bind(&AsyncLogger::_asyncLogger, this)), _running(false), _currentBuffer(new SmallBuffer), _mutex(), _cond(_mutex), cMaxEmptySmallBuffersSize(maxEmptySmallBuffersSize), cMaxFullSmallBuffersSize(maxFullSmallBuffersSize), _fflushInterval(fflushInterval) { _emptyBuffers.reserve(10); } AsyncLogger::~AsyncLogger() { if (_running) stop(); } void AsyncLogger::start() { assert(!_running); _running = true; _emptyBuffers.push_back(shared_ptr<SmallBuffer>(new SmallBuffer)); _emptyBuffers.push_back(shared_ptr<SmallBuffer>(new SmallBuffer)); _thread.start(); } void AsyncLogger::stop() { assert(_running); _running = false; //important _thread.join(); }
void AsyncLogger::append(const char* buf, size_t len) { assert(_running); { MutexLockGuard lock(_mutex); if ( _currentBuffer->avail() < len) { _takeEmptyBufferUnlock(); } _currentBuffer->append(buf, len); } _cond.wake(); } void AsyncLogger::_takeEmptyBufferUnlock() { _fullBuffers.push_back(_currentBuffer); if (_emptyBuffers.empty()) { _currentBuffer.reset(new SmallBuffer); } else { _currentBuffer = _emptyBuffers.back(); _emptyBuffers.pop_back(); } }
说明几点:
(1)设置后端的输出类型如下:AsyncLogger async; setDefualtFunc(std::bind(&AsyncLogger::append, &async, std::placeholders::_1, std::placeholders::_2));(2)在append函数,如果发现current buffer的缓冲已经不足存放当前日至内容时,就需要重新获得新的空缓冲了,主要是通过_takeEmptyBufferUnlock()来获得;
void AsyncLogger::_asyncLogger() { assert(_running); vector<shared_ptr<SmallBuffer>> swapBuffers; LogFile output; while (_running) { { MutexLockGuard lock(_mutex); if (_fullBuffers.empty()) { _cond.waitForSeconds(_fflushInterval); //important } _takeEmptyBufferUnlock(); swapBuffers.swap(_fullBuffers); assert(_fullBuffers.empty()); } if (swapBuffers.size() > cMaxFullSmallBuffersSize) { std::copy(swapBuffers.end() - cMaxFullSmallBuffersSize , swapBuffers.end() , swapBuffers.begin()); swapBuffers.resize(cMaxFullSmallBuffersSize); } for (auto& buffer : swapBuffers) { output.output(buffer->buffer(), buffer->used()); //刷入 buffer->reset(); //impotant } { MutexLockGuard lock(_mutex); //将full buffer回收 std::copy(swapBuffers.begin(), swapBuffers.end(), back_inserter(_emptyBuffers)); if (_emptyBuffers.size() > cMaxEmptySmallBuffersSize) _emptyBuffers.resize(cMaxEmptySmallBuffersSize); } swapBuffers.clear(); output.fflush(); } output.fflush(); }说明几点:
(2)如下代码就是将日至内容写入到日至文件,日至文件LogFile output将在下文介绍;
for (auto& buffer : swapBuffers) { output.output(buffer->buffer(), buffer->used()); //写入 buffer->reset(); //impotant }(3)if (swapBuffers.size() > cMaxFullSmallBuffersSize)和 if (_emptyBuffers.size() > cMaxEmptySmallBuffersSize)两个判断主要就是针对full buffers和empty buffers的最大大小的限制做出的处理,对于full buffers将会丢弃多余的日至内容,对于empty buffers是只要维持到一定的缓冲量,多余的内存也将会回收给系统;
class File final { public: File(const File&) = delete; File& operator=(const File&) = delete; explicit File(const std::string& name); ~File(); void append(const char* buf, size_t len); std::string name() const { return _name; } void fflush(); private: typedef StreamBuffer<cLargeBufferSize> Buffer; std::string _name; FILE* _file; Buffer _buffer; };
File::File(const std::string& filename): _name(filename), _file(::fopen(_name.c_str(), "a+")) { if (!_file) { LOG_SYSERR << "fopen error"; } else { if (setvbuf(_file, const_cast<char *>(_buffer.buffer()), _IONBF, _buffer.avail()) != 0) LOG_SYSERR << "setvbuf error"; } } File::~File() { if (_file) ::fclose(_file); } void File::append(const char* buf, size_t len) { if (::fwrite(buf, 1, len, _file) != len) { LOG_WARN << "fwrite complete error"; } } void File::fflush() { if (_file) ::fflush(_file); }
说明几点:
(1)在File的构造函数中,将会设置File的缓冲大小,是一个StreamBuffer;
class File; class LogFile final { public: LogFile(const LogFile&) = delete; LogFile& operator=(const LogFile&) = delete; explicit LogFile(size_t fflushInterval = 5 * 60 * 60); void output(const SmallBuffer& buffer); void output(const char* buf, size_t size); void fflush(); size_t fflushInterval() const { return _fflushInterval; } private: std::string _getLogFileName(); std::unique_ptr<File> _file; const size_t _fflushInterval; size_t _totalWriteBytes; Base::TimeStamp _last; const size_t cLargestFileSize = 1000 * 1024 * 1024; //C++ 11 };说明几点:
(2)_getLogFileName()为如何产生日至文件的文件名;
LogFile::LogFile(size_t interval): _file(new File(_getLogFileName())), _fflushInterval(interval), _totalWriteBytes(0), _last(TimeStamp::now()) { } void LogFile::output(const SmallBuffer& buffer) { output(buffer.buffer(), buffer.avail()); } void LogFile::fflush() { _file->fflush(); } string LogFile::_getLogFileName() { char buf[32]; snprintf(buf, sizeof(buf), "-[pid:%d].log", ::getpid()); return TimeStamp::now().toFormattedString() + buf; }
void LogFile::output(const char* buf, size_t size) { if (!_file) { _file.reset(new File(_getLogFileName())); _last = TimeStamp::now(); _totalWriteBytes = 0; } _file->append(buf, size); _totalWriteBytes += size; TimeStamp now = TimeStamp::now(); if (static_cast<size_t>(now - _last) >= _fflushInterval) { _file->fflush(); _last = now; } if (_totalWriteBytes > cLargestFileSize) { _file.reset(new File(_getLogFileName())); //impotant, maybe not a new file (Time will influnce filename) _totalWriteBytes = 0; } }
说明几点:
(1)当_file为空时,产生一个新的日至文件;_getLogFileName()为如何产生日至文件的文件名;
(2)当日至文件超过cLargestFileSize大小时,将会更换日至文件;fflushInterval为日至内容写入到磁盘的时间间隔;
(3)_file->append(buf, size);为具体的将日至内容写入到磁盘文件的过程,此时日至文件内容已经至少在在File的缓冲中;
#include <Log/AsyncLogger.h> #include <Log/LogStream.h> #include <Log/Logger.h> #include <Base/Thread.h> #include <Base/Mutex.h> #include <unistd.h> using namespace Log; using namespace Base; int main(void) { AsyncLogger async; setDefualtFunc(std::bind(&AsyncLogger::append, &async, std::placeholders::_1, std::placeholders::_2)); Logger::setCurrentLogLevel(Logger::LogLevel::DEBUG); async.start(); int count = 0; while (count++ < 1000) { LOG_SYSERR << ++count << "tid " << CurrentThread::tid(); LOG_ERROR << ++count << "tid " << CurrentThread::tid();; LOG_WARN << ++count << "tid " << CurrentThread::tid(); LOG_INFO << ++count << "tid " << CurrentThread::tid(); LOG_DEBUG << ++count << "tid " << CurrentThread::tid(); LOG_TRACE << ++count << "tid " << CurrentThread::tid(); } return 0; }
0150328 08:21:35.743687102 16918 WARN AsyncLoggerTest.cc:29 main - 988tid 16918 20150328 08:21:35.743695832 16918 INFO AsyncLoggerTest.cc:31 main - 989tid 16918 20150328 08:21:35.743704563 16918 DEBUG AsyncLoggerTest.cc:33 main - 990tid 16918 20150328 08:21:35.743713363 16918 SYSERR AsyncLoggerTest.cc:25 main - 992tid 16918 20150328 08:21:35.743722372 16918 ERROR AsyncLoggerTest.cc:27 main - 993tid 16918 20150328 08:21:35.743731382 16918 WARN AsyncLoggerTest.cc:29 main - 994tid 16918 20150328 08:21:35.743740321 16918 INFO AsyncLoggerTest.cc:31 main - 995tid 16918 20150328 08:21:35.743749401 16918 DEBUG AsyncLoggerTest.cc:33 main - 996tid 16918 20150328 08:21:35.743758340 16918 SYSERR AsyncLoggerTest.cc:25 main - 998tid 16918 20150328 08:21:35.743775172 16918 ERROR AsyncLoggerTest.cc:27 main - 999tid 16918 20150328 08:21:35.743784251 16918 WARN AsyncLoggerTest.cc:29 main - 1000tid 16918 20150328 08:21:35.743793401 16918 INFO AsyncLoggerTest.cc:31 main - 1001tid 16918 20150328 08:21:35.743808207 16918 DEBUG AsyncLoggerTest.cc:33 main - 1002tid 16918
标签:
原文地址:http://blog.csdn.net/skyuppour/article/details/44701437