标签:notify code condition current swa 后端 trace levels implicit
LOG_INFO << "AAA";
LOG_INFO是一个宏,展开后为:
muduo::Logger(__FILE__, __LINE__).stream() << "AAA";
构造了一个匿名对象Logger,在这个对象构造的时候其实已经写入了文件名和行号。
匿名对象调用.stream()函数拿到一个LogStream对象,由这个LogStream对象重载<<将“AAA”写入LogStream的数据成员FixBuffer对象的data_缓冲区内。
匿名对象在这条语句执行完毕以后会被销毁,因此会调用~muduo::Logger()函数将日志消息输出至目的地(标准输出或者磁盘的日志文件);
Logger——Impl——LogStream——operator<<——LogStream的FixBuffer内——g_output——g_flush
由于磁盘IO是移动磁头的方式来记录文件的,其速度与CPU运行速度并不在一个数量级上。因此业务线程中应该避免进行磁盘IO以防止业务得不到及时的处理。
在多线程程序中,业务线程应该专注与其业务逻辑的运算,用另外一个独立的线程来将日志消息写入磁盘。
在muduo的日志系统中,分为前端和后端。前端是业务线程产生一条条的日志消息。后端是日志线程,将日志消息写入文件。
业务线程有多个,日志线程只有一个。这是一个典型地多生产者单消费者模型。
多个线程会互斥地往A缓冲区写数据。
有可能是日志线程没有及时分配到时间片,所以条件变量notify()的时候并没有及时将日志线程备用的两个缓冲区拿来给前端顶替A和B缓冲区的位置;
这个时候就在互斥锁内分配内存。不过这种情况很少发生。
高性能日志系统其高性能所在:
1.业务线程与日志线程独立,保证了业务线程能及时处理业务。
2.多个线程其实是争用一个全局锁往缓冲区A拷贝写数据。这了拷贝数据是否是性能杀手,引用作者的一段话:
这个传递指针的方案似乎会更加高效,但是据我测试,直接拷贝日志数据的做法比传递指针快3倍(在每条消息不大于4KB的时候),估计是内存分配开销所致。因此muduo日志库只提供了这一种异步日志机制。这再次说明“性能”不能凭感觉说了算,一定要有典型场景的测试数据作为支撑。
所有加锁的临界区足够小,但是如果线程数据较多,依旧会是性能瓶颈。
3.四缓冲机制避免了在临界区分配空间的时间消耗(见上图分析)。
4.将Buffers_的数据写磁盘是先进行swap交换到一个临时变量上,待互斥锁释放以后才进行磁盘IO操作。减少在临界区操作的时间,提升性能。
class Logger { public: enum LogLevel { TRACE, DEBUG, INFO, WARN, ERROR, FATAL, NUM_LOG_LEVELS, }; // compile time calculation of basename of source file class SourceFile { public: template<int N> SourceFile(const char (&arr)[N]): data_(arr), size_(N-1) { const char* slash = strrchr(data_, ‘/‘); // builtin function if (slash) { data_ = slash + 1; size_ -= static_cast<int>(data_ - arr); } } explicit SourceFile(const char* filename): data_(filename) { const char* slash = strrchr(filename, ‘/‘); if (slash) { data_ = slash + 1; } size_ = static_cast<int>(strlen(data_)); } const char* data_; int size_; }; Logger(SourceFile file, int line); Logger(SourceFile file, int line, LogLevel level); Logger(SourceFile file, int line, LogLevel level, const char* func); Logger(SourceFile file, int line, bool toAbort); ~Logger(); //返回内部类对象的数据成员LogStream对象。 LogStream& stream() { return impl_.stream_; } static LogLevel logLevel(); static void setLogLevel(LogLevel level); typedef void (*OutputFunc)(const char* msg, int len); typedef void (*FlushFunc)(); static void setOutput(OutputFunc); static void setFlush(FlushFunc); static void setTimeZone(const TimeZone& tz); private: class Impl { public: typedef Logger::LogLevel LogLevel; Impl(LogLevel level, int old_errno, const SourceFile& file, int line); void formatTime(); void finish(); Timestamp time_; //LogStream对象会重载<<运算符,将日志消息写到他的数据成员FixBuffer内 LogStream stream_; LogLevel level_; int line_; SourceFile basename_; }; Impl impl_; };
LogStream主要是重载了<<运算符,将各种类型当作字符串类型存入缓冲区内;
class LogStream : noncopyable { typedef LogStream self; public: typedef detail::FixedBuffer<detail::kSmallBuffer> Buffer; //将bool类型当作字符串0或者1存入缓冲区 self& operator<<(bool v) { buffer_.append(v ? "1" : "0", 1); return *this; } self& operator<<(short); self& operator<<(unsigned short); self& operator<<(int); self& operator<<(unsigned int); self& operator<<(long); self& operator<<(unsigned long); self& operator<<(long long); self& operator<<(unsigned long long); self& operator<<(const void*); self& operator<<(float v) { *this << static_cast<double>(v); return *this; } self& operator<<(double); // self& operator<<(long double); self& operator<<(char v) { buffer_.append(&v, 1); return *this; } // self& operator<<(signed char); // self& operator<<(unsigned char); self& operator<<(const char* str) { if (str) { buffer_.append(str, strlen(str)); } else { buffer_.append("(null)", 6); } return *this; } self& operator<<(const unsigned char* str) { return operator<<(reinterpret_cast<const char*>(str)); } self& operator<<(const string& v) { buffer_.append(v.c_str(), v.size()); return *this; } self& operator<<(const StringPiece& v) { buffer_.append(v.data(), v.size()); return *this; } self& operator<<(const Buffer& v) { *this << v.toStringPiece(); return *this; } void append(const char* data, int len) { buffer_.append(data, len); } const Buffer& buffer() const { return buffer_; } void resetBuffer() { buffer_.reset(); } private: void staticCheck(); template<typename T> void formatInteger(T); //日志消息会存入到这个FixedBuffer对象的数据成员中; //typedef detail::FixedBuffer<detail::kSmallBuffer> Buffer; //detail::kSmallBuffe为4000个字节 Buffer buffer_; static const int kMaxNumericSize = 32; };
LogStream的主要数据成员就是:FixBuffer对象,一条日志消息会暂时存在FixBuffer对象的数据成员data_内;
//非类型模板: //typedef detail::FixedBuffer<detail::kSmallBuffer> Buffer; //detail::kSmallBuffer传入了一个值,用于设置char data_[SIZE]的大小; template<int SIZE> class FixedBuffer : noncopyable { public: FixedBuffer(): cur_(data_) { setCookie(cookieStart); } ~FixedBuffer() { setCookie(cookieEnd); } void append(const char* /*restrict*/ buf, size_t len) { // FIXME: append partially if (implicit_cast<size_t>(avail()) > len) { //这里做了一次拷贝,将日志消息从栈内存拷到了类对象的空间上。 //这里其实是汇集数据的作用,方便后续将数据输出。 memcpy(cur_, buf, len); cur_ += len; } } const char* data() const { return data_; } int length() const { return static_cast<int>(cur_ - data_); } // write to data_ directly char* current() { return cur_; } int avail() const { return static_cast<int>(end() - cur_); } void add(size_t len) { cur_ += len; } void reset() { cur_ = data_; } void bzero() { memZero(data_, sizeof data_); } // for used by GDB const char* debugString(); void setCookie(void (*cookie)()) { cookie_ = cookie; } // for used by unit test string toString() const { return string(data_, length()); } StringPiece toStringPiece() const { return StringPiece(data_, length()); } private: const char* end() const { return data_ + sizeof data_; } // Must be outline function for cookies. static void cookieStart(); static void cookieEnd(); void (*cookie_)(); //存日志消息的数据成员; char data_[SIZE]; char* cur_; };
匿名对象执行析构函数:
Logger::~Logger() { //最后往这条日志消息里面插入:-文件名:行数\n impl_.finish(); //这里拿到了数据的引用,并没有拷贝复制 const LogStream::Buffer& buf(stream().buffer()); //buf.data()返回的是数据成员data_数组的首地址; //g_output和g_flush都是一个可调用对象 //typedef void (*OutputFunc)(const char* msg, int len); //typedef void (*FlushFunc)(); g_output(buf.data(), buf.length()); if (impl_.level_ == FATAL) { g_flush(); abort(); } }
不同的调用对象直接导致了日志消息的输出的目的地。
例如默认的:
//将buffer内的数据输出到标准输出 void defaultOutput(const char* msg, int len) { size_t n = fwrite(msg, 1, len, stdout); //FIXME check n (void)n; }
//将数据刷新到标准输出 void defaultFlush() { fflush(stdout); }
在异步日志中,将日志消息输出到日志线程中,由日志线程将数据输出到磁盘的日志文件中。
使用方法:
muduo::AsyncLogging* g_asyncLog = NULL; void asyncOutput(const char *msg, int len) { //将日志消息数据存入异步日志类对象的缓冲区内 g_asyncLog->append(msg, len); } //创建一个异步日志类对象,超过kRollSize后会滚动日志 muduo::AsyncLogging log(::basename(name), kRollSize); //异步日志启动 log.start(); //设置Logger类的全局变量g_output为asynOutput,析构的时候调用asyncOutput对象 muduo::Logger::setOutput(asyncOutput);
先新建异步日志对象并运行起来,这样是为了??(创建好缓冲区吗??)
再设置Logger对象析构时调用的函数asyncOutput;
数据成员:
private: //日志线程会运行这个函数用于将日志消息写入磁盘的日志文件;与业务线程分开; void threadFunc(); typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer; typedef std::vector<std::unique_ptr<Buffer>> BufferVector; //容器内元素的类型 typedef BufferVector::value_type BufferPtr; //条件变量等待超时的时间,用于3秒超时将日志消息的数据写入磁盘 const int flushInterval_; //日志线程是否在运行; std::atomic<bool> running_; const string basename_; //rollSize_大小后滚动日志 const off_t rollSize_; //用于创建日志线程 muduo::Thread thread_; //用于确保日志线程已经启动 muduo::CountDownLatch latch_; //与条件变量一起用的互斥器,用于对缓冲区加锁。各个业务线程互斥写入下面两个缓冲区 muduo::MutexLock mutex_; muduo::Condition cond_ GUARDED_BY(mutex_); //当前缓冲区和业务缓冲区供前端(业务线程)将数据存放的地方 BufferPtr currentBuffer_ GUARDED_BY(mutex_); BufferPtr nextBuffer_ GUARDED_BY(mutex_); //在这里的都是即将要写入磁盘日志文件的数据; BufferVector buffers_ GUARDED_BY(mutex_);
AsyncLogging类对象内有两个缓冲区数据成员:currentBuffer和nextBuffer;
每个线程在调用void asyncOutput(const char * msg, int len)的时候会调用AsyncLogging对象的append()成员函数,在append()函数内会加锁将日志消息写入到currentBuffer内。
标签:notify code condition current swa 后端 trace levels implicit
原文地址:https://www.cnblogs.com/jialin0x7c9/p/12347800.html