码迷,mamicode.com
首页 > 数据库 > 详细

Java 线程池 +生产者消费者+MySQL读取300 万条数据

时间:2017-03-22 21:36:10      阅读:434      评论:0      收藏:0      [点我收藏+]

标签:用户   ase   mode   cache   代码   log   shm   nbsp   inf   

1.1需求

    数据库300 万条用户数据 ,遍历获取所有用户, 各种组合关联, 获取到一个新的json ,存到redis 上。

1.2 难点

  数据库比较多, 不可能单线程查询所有的数据到内存。

1.3解决办法

 多线程读取, 生产者 每次获取200 条数据, 消费者去消费。(这里 主要是根据MySQL分页去获取下一个200 条数据)

1.4 代码

1.4.1 调用方法

    /**
     * 线程启动
     */
    public void update() {
//redis操作类 HashRedisUtil redisUtil
= HashRedisUtil.getInstance(); //生产者消费者 ProducerConsumer pc = new ProducerConsumer(); //数据仓库 Storage s = pc.new Storage(); ExecutorService service = Executors.newCachedThreadPool(); //一个线程进行查询 Producer p = pc.new Producer(s,userMapper); service.submit(p); System.err.println("生产线程正在生产中。。。。。。。。。"); //是个线程进行修改 for(int i=0;i<10;i++){ System.err.println("消费线程"+i+"正在消费中。。。。。。。。。。"); service.submit(pc.new Consumer( redisUtil,userMapper,s)); } }

1.4.2 主要核心类

package com.ypp.thread;


import java.math.BigDecimal;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.joda.time.LocalDateTime;

import com.alibaba.fastjson.JSONObject;
import com.ypp.constants.Constants;
import com.ypp.mapper.UserMapper;
import com.ypp.model.User;
import com.ypp.model.UserAlis;
import com.ypp.model.UserBaseModel;
import com.ypp.model.UserVip;
import com.ypp.util.HashRedisUtil;
import com.ypp.util.JsonUtils;
import com.ypp.util.PHPSerializer;


public class ProducerConsumer {
    private static Logger logger = Logger.getLogger(ProducerConsumer.class);
//这个page 是核心, 全局变量, 当生产者生产一次 ,获取200 个用户, 会把这个page++, 下次获取就是后一个200 条用户了
private static Integer page = 0; //消费者
public class Consumer implements Runnable { private HashRedisUtil redisUtil; private UserMapper userMapper; private Storage s = null; public Consumer(HashRedisUtil redisUtil, UserMapper userMapper, Storage s) { super(); this.redisUtil = redisUtil; this.userMapper = userMapper; this.s = s; } public void run() { try { while (true) { User users = s.pop(); long bbb = System.currentTimeMillis(); // 获取一个用户的粉丝列表 并存到redis try { fansUpdate(users.getToken(), users.getUserId(), redisUtil); } catch (Exception e1) { e1.printStackTrace(); } // 获取一个用户的关注列表, 并存到redis try { followUpdate(users.getToken(), users.getUserId(), redisUtil); } catch (Exception e) { e.printStackTrace(); } // 获取一个用户的黑名单, 并存到redis try { blackUpdate(users.getToken(), users.getUserId(), redisUtil); } catch (Exception e) { e.printStackTrace(); } // 用户基本信息 try { userbaseUpdate(users.getToken(), users.getUserId(), redisUtil); } catch (Exception e) { e.printStackTrace(); } long ccc = System.currentTimeMillis(); System.out.println("用户:" + users.getToken() + " 全部总共耗时:" + (ccc - bbb) + "毫秒"); Thread.sleep(500); } } catch (InterruptedException e) { e.printStackTrace(); } } public List<User> getUserInfo(Integer iThread) { return userMapper.findUserInfo((iThread - 1) * 200 + 1); } /** * 用户基本信息修改 * * @param token * @param myuserId * @param redisUtil * @throws Exception */ private void userbaseUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } /** * 更新一个用户的黑名单(原来的token改成userID) * * @param token * @param string * @param redisUtil * @throws Exception */ private void blackUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } /** * 获取一个用户的关注 * * @param token * @param string * @param redisUtil * @throws Exception */ private void followUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } /** * 获取一个用户的粉丝列表 * * @param token * @param userId * @param redisUtil * @throws Exception */ private void fansUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } //生产者 public class Producer implements Runnable { private Storage s = null; private UserMapper mapper ; public Producer( Storage s, UserMapper mapper) { this.s = s; this.mapper = mapper; } public void run() { try { while (true) { System.err.println("当前分页是:"+page+"****************************************"); List<User> list= mapper.findUserInfo(page); s.push(list); page++; } } catch (InterruptedException e1) { e1.printStackTrace(); } } }
//数据仓库
public class Storage { BlockingQueue<User> queues = new LinkedBlockingQueue<User>(200); /** * 生产 * * @param p * 产品 * @throws InterruptedException */ public void push(List<User> p) throws InterruptedException { for(User user:p){ queues.put(user); } } /** * 消费 * * @return 产品 * @throws InterruptedException */ public User pop() throws InterruptedException { return queues.take(); } } }

 

Java 线程池 +生产者消费者+MySQL读取300 万条数据

标签:用户   ase   mode   cache   代码   log   shm   nbsp   inf   

原文地址:http://www.cnblogs.com/zgghb/p/6601869.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!