码迷,mamicode.com
首页 > 系统相关 > 详细

从零系列--node爬虫利用进程池写数据

时间:2018-09-03 22:58:30      阅读:264      评论:0      收藏:0      [点我收藏+]

标签:lists   ports   next   function   fun   immediate   nbsp   hpa   dia   

1、主进程

const http = require(‘http‘);
const fs = require(‘fs‘);
const cheerio = require(‘cheerio‘);
const request = require(‘request‘);
const makePool = require(‘./pooler‘)
const runJob = makePool(‘./worker‘)
var i = 0;
var url = "http://xxx.com/articles/"; 
//初始url 
let g = ‘‘;
function fetchPage(x) {     //封装了一层函数
  console.log(x)
  if(!x || x==‘‘){
    g.next()
    return
  }
    startRequest(x); 
}


function startRequest(x) {
     //采用http模块向服务器发起一次get请求      
    return http.get(x, function (res) {     
        var html = ‘‘;        //用来存储请求网页的整个html内容
        var titles = [];        
        res.setEncoding(‘utf-8‘); //防止中文乱码
     //监听data事件,每次取一块数据
        res.on(‘data‘, function (chunk) {   
            html += chunk;
        });
     //监听end事件,如果整个网页内容的html都获取完毕,就执行回调函数
        res.on(‘end‘, function () {
          var $ = cheerio.load(html); //采用cheerio模块解析html

          var time = new Date();
          var p =  $(‘.content p‘)
          p.each((index,item)=>{
                if($(item).find(‘strong‘).length) {
                  var fex_item = {
                    //获取文章的标题
                      title: $(item).find(‘strong‘).text().trim(),
                  //获取文章发布的时间
                      time: time,   
                  //获取当前文章的url
                      link: $($(item).children(‘a‘).get(0)).attr(‘href‘),
                      des:$(item).children().remove()&&$(item).text(),
                  //i是用来判断获取了多少篇文章
                      i: index+1     
      
                  };
                  runJob(fex_item,(err,data)=>{
                    if(err) console.error(‘get link error‘)
                    console.log(‘get link ok‘)
                  })
                }
                
          })
          g.next()
        })         

    }).on(‘error‘, function (err) {
        console.log(err);
        g.next()
    });

}
function* gen(urls){
  let len = urls.length;
  for(var i=0;i<len;i++){
    yield fetchPage(urls[i])
  }
}

function getUrl(x){
    //采用http模块向服务器发起一次get请求      
    http.get(x, function (res) {     
      var html = ‘‘;        //用来存储请求网页的整个html内容
      var titles = [];        
      res.setEncoding(‘utf-8‘); //防止中文乱码
   //监听data事件,每次取一块数据
      res.on(‘data‘, function (chunk) {   
          html += chunk;
      });
   //监听end事件,如果整个网页内容的html都获取完毕,就执行回调函数
      res.on(‘end‘, function () {
        var $ = cheerio.load(html); //采用cheerio模块解析html

        var time = new Date();
        var lists =  $(‘.articles .post-list li‘)
        var urls = [];
        lists.each(function(index,item){
          if($(item).find(‘a‘).length) {
              var url = ‘http://xxxx.com‘+$($(item).children(‘a‘).get(0)).attr(‘href‘);
              if(url)
              urls.push(url);      //主程序开始运行
          }
       })
        g = gen(urls)
        g.next()
      })         

  }).on(‘error‘, function (err) {
      console.log(err);
  });
}

getUrl(url)

2、创建进程池

const cp = require(‘child_process‘)
const cpus = require(‘os‘).cpus().length;

module.exports =  function pooler(workModule){
  let awaiting = [],readyPool = [],poolSize = 0;
  return function doWork(job,cb){
    if(!readyPool.length&&poolSize>cpus)
      return awaiting.push([doWork,job,cb])

    let child = readyPool.length ? readyPool.shift():(poolSize++,cp.fork(workModule))
    let cbTriggered = false;
    child.removeAllListeners()
    .once(‘error‘,function(err){
      if(!cbTriggered){
        cb(err)
        cbTriggered = true
      }
      child.kill()
    })
    .once(‘eixt‘,function(){
      if(!cbTriggered)
      cb(new Error(‘childe exited with code:‘+code))
      poolSize--;
      let childIdx = readyPool.indexOf(child)
      if(childIdx > -1)readyPool.splice(childIdx,1)
    })
    .once(‘message‘,function(msg){
      cb(null,msg)
      cbTriggered = true
      readyPool.push(child)
      if(awaiting.length)setImmediate.apply(null,awaiting.shift())
    })
    .send(job)
  }
}

3、工作进程接受消息并处理内容

const fs = require(‘fs‘)
process.on(‘message‘,function(job){
  let _job = job
  let x = ‘TITLE:‘+_job.title+‘\n‘ + ‘LINK:‘+_job.link + ‘\n DES:‘+_job.des+‘\n SAVE-TIME:‘+_job.time
  
  fs.writeFile(‘../xx/data/‘ + _job.title + ‘.txt‘, x, ‘utf-8‘, function (err) {
      if (err) {
          console.log(err);
      }
  });
  process.send(‘finish‘)
})

 

从零系列--node爬虫利用进程池写数据

标签:lists   ports   next   function   fun   immediate   nbsp   hpa   dia   

原文地址:https://www.cnblogs.com/sjptech/p/9581595.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!