码迷,mamicode.com
首页 > 其他好文 > 详细

logstash-forward源码分析

时间:2015-08-10 13:43:08      阅读:902      评论:0      收藏:0      [点我收藏+]

标签:

logstash-forward 源码核心思想包括以下几个角色(模块):

prospector: 找到配文件中paths/globs下面文件, 并且启动harvesters,提交文件给harvesters

 harvester: 读取scan文件, 并提交相应的event给spooler

spooler: 做为一个buffer缓冲池,达到大小或者计数器时间到flush池子里面的event信息给publisher

publisher: 连接网络(connect是通过ssl方式认证),传输event数据到指定地点, 并通知registrar传输成功

registrar: 记录文件的records,包括日志现在读取offset等信息,存放在.logstash-forward文件下


主要源码赏析:

prospector------------

主要功能函数 遍历文件属性
func (p *Prospector) scan(path string, output chan *FileEvent, resume *ProspectorResume)
if fileinfo.ModTime().Before(p.lastscan) && time.Since(fileinfo.ModTime()) > p.FileConfig.deadtime {
				var offset int64 = 0
				var is_resuming bool = false

				if resume != nil {
					// Call the calculator - it will process resume state if there is one
					offset, is_resuming = p.calculate_resume(file, fileinfo, resume)
				}

				// Are we resuming a dead file? We have to resume even if dead so we catch any old updates to the file
				// This is safe as the harvester, once it hits the EOF and a timeout, will stop harvesting
				// Once we detect changes again we can resume another harvester again - this keeps number of go routines to a minimum
				if is_resuming {
					emit("Resuming harvester on a previously harvested file: %s\n", file)
					harvester := &Harvester{Path: file, FileConfig: p.FileConfig, Offset: offset, FinishChan: newinfo.harvester}
					go harvester.Harvest(output)
				} else {
					// Old file, skip it, but push offset of file size so we start from the end if this file changes and needs picking up
					emit("Skipping file (older than dead time of %v): %s\n", p.FileConfig.deadtime, file)
					newinfo.harvester <- fileinfo.Size()
				}
			}
			
主要执行到 go harvester.Harvest(output)

harvester------------

主要函数func (h *Harvester) Harvest(output chan *FileEvent),扫面文件

for {
		text, bytesread, err := h.readline(reader, buffer, read_timeout)

		if err != nil {
			if err == io.EOF {
				// timed out waiting for data, got eof.
				// Check to see if the file was truncated
				info, _ := h.file.Stat()
				if info.Size() < h.Offset {
					emit("File truncated, seeking to beginning: %s\n", h.Path)
					h.file.Seek(0, os.SEEK_SET)
					h.Offset = 0
				} else if age := time.Since(last_read_time); age > h.FileConfig.deadtime {
					// if last_read_time was more than dead time, this file is probably
					// dead. Stop watching it.
					emit("Stopping harvest of %s; last change was %v ago\n", h.Path, age)
					return
				}
				continue
			} else {
				emit("Unexpected state reading from %s; error: %s\n", h.Path, err)
				return
			}
		}
		last_read_time = time.Now()

		line++
		event := &FileEvent{
			Source:   &h.Path,
			Offset:   h.Offset,
			Line:     line,
			Text:     text,
			Fields:   &h.FileConfig.Fields,
			fileinfo: &info,
		}
		h.Offset += int64(bytesread)

		output <- event // ship the new event downstream
	} /* forever */
	
主要执行到 output <- event

spooler------------

func Spool(input chan *FileEvent,
  output chan []*FileEvent,
  max_size uint64,
  idle_timeout time.Duration) {
  // heartbeat periodically. If the last flush was longer than
  // ‘idle_timeout‘ time ago, then we‘ll force a flush to prevent us from
  // holding on to spooled events for too long.

  ticker := time.NewTicker(idle_timeout / 2)

  // slice for spooling into
  // TODO(sissel): use container.Ring?
  spool := make([]*FileEvent, max_size)

  // Current write position in the spool
  var spool_i int = 0

  next_flush_time := time.Now().Add(idle_timeout)
  for {
    select {
    case event := <-input:
      //append(spool, event)
      spool[spool_i] = event
      spool_i++

      // Flush if full
      if spool_i == cap(spool) {
        //spoolcopy := make([]*FileEvent, max_size)
        var spoolcopy []*FileEvent
        //fmt.Println(spool[0])
        spoolcopy = append(spoolcopy, spool[:]...)
        output <- spoolcopy
        next_flush_time = time.Now().Add(idle_timeout)

        spool_i = 0
      }
    case <-ticker.C:
      //fmt.Println("tick")
      if now := time.Now(); now.After(next_flush_time) {
        // if current time is after the next_flush_time, flush!
        //fmt.Printf("timeout: %d exceeded by %d\n", idle_timeout,
        //now.Sub(next_flush_time))

        // Flush what we have, if anything
        if spool_i > 0 {
          var spoolcopy []*FileEvent
          spoolcopy = append(spoolcopy, spool[0:spool_i]...)
          output <- spoolcopy
          next_flush_time = now.Add(idle_timeout)
          spool_i = 0
        }
      } /* if ‘now‘ is after ‘next_flush_time‘ */
      /* case ... */
    } /* select */
  } /* for */
} /* spool */

管道阻塞等待input数据,然后轮询判断介绍到spool的长度达到max_size或者计数器到点
主要执行到 output <- spoolcopy

publisher------------

for events := range input {
		buffer.Truncate(0)
		compressor, _ := zlib.NewWriterLevel(&buffer, 3)

		for _, event := range events {
			sequence += 1
			writeDataFrame(event, sequence, compressor)
		}
		compressor.Flush()
		compressor.Close()

		compressed_payload := buffer.Bytes()

		// Send buffer until we‘re successful...
		oops := func(err error) {
			// TODO(sissel): Track how frequently we timeout and reconnect. If we‘re
			// timing out too frequently, there‘s really no point in timing out since
			// basically everything is slow or down. We‘ll want to ratchet up the
			// timeout value slowly until things improve, then ratchet it down once
			// things seem healthy.
			emit("Socket error, will reconnect: %s\n", err)
			time.Sleep(1 * time.Second)
			socket.Close()
			socket = connect(config)
		}

	SendPayload:
		for {
			// Abort if our whole request takes longer than the configured
			// network timeout.
			socket.SetDeadline(time.Now().Add(config.timeout))

			// Set the window size to the length of this payload in events.
			_, err = socket.Write([]byte("1W"))
			if err != nil {
				oops(err)
				continue
			}
			binary.Write(socket, binary.BigEndian, uint32(len(events)))
			if err != nil {
				oops(err)
				continue
			}

			// Write compressed frame
			socket.Write([]byte("1C"))
			if err != nil {
				oops(err)
				continue
			}
			binary.Write(socket, binary.BigEndian, uint32(len(compressed_payload)))
			if err != nil {
				oops(err)
				continue
			}
			_, err = socket.Write(compressed_payload)
			if err != nil {
				oops(err)
				continue
			}

			// read ack
			response := make([]byte, 0, 6)
			ackbytes := 0
			for ackbytes != 6 {
				n, err := socket.Read(response[len(response):cap(response)])
				if err != nil {
					emit("Read error looking for ack: %s\n", err)
					socket.Close()
					socket = connect(config)
					continue SendPayload // retry sending on new connection
				} else {
					ackbytes += n
				}
			}

			// TODO(sissel): verify ack
			// Success, stop trying to send the payload.
			break
		}

		// Tell the registrar that we‘ve successfully sent these events
		registrar <- events
	} /* for each event payload
	
socket连接发送数据,主要执行到registrar <- events

registrar------------

func Registrar(state map[string]*FileState, input chan []*FileEvent) {
	for events := range input {
		emit ("Registrar: processing %d events\n", len(events))
		// Take the last event found for each file source
		for _, event := range events {
			// skip stdin
			if *event.Source == "-" {
				continue
			}

			ino, dev := file_ids(event.fileinfo)
			state[*event.Source] = &FileState{
				Source: event.Source,
				// take the offset + length of the line + newline char and
				// save it as the new starting offset.
				// This issues a problem, if the EOL is a CRLF! Then on start it read the LF again and generates a event with an empty line
				Offset: event.Offset + int64(len(*event.Text)) + 1, // REVU: this is begging for BUGs
				Inode:  ino,
				Device: dev,
			}
			//log.Printf("State %s: %d\n", *event.Source, event.Offset)
		}

		if e := writeRegistry(state, ".logstash-forwarder"); e != nil {
			// REVU: but we should panic, or something, right?
			emit("WARNING: (continuing) update of registry returned error: %s", e)
		}
	}
}

func writeRegistry(state map[string]*FileState, path string) error {
	tempfile := path + ".new"
	file, e := os.Create(tempfile)
	if e != nil {
		emit("Failed to create tempfile (%s) for writing: %s\n", tempfile, e)
		return e
	}
	defer file.Close()

	encoder := json.NewEncoder(file)
	encoder.Encode(state)

	return onRegistryWrite(path, tempfile)
}

最后把改变以json格式记录到.logstash-forwarder文件里面,表示日志已经读取到某某offset处咯


logstash-forward源码分析

标签:

原文地址:http://my.oschina.net/u/932809/blog/490102

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!