多线程设置
package cloud.app.prod.home.utils; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; /** * Author : YongBo Xie </br> * File Name: DSHMessageAndSmsSendThreadUtil.java </br> * Created Date: 2018年3月26日 下午2:59:46 </br> * Modified Date: 2018年3月26日 下午2:59:46 </br> * Version: 1.0 </br> */ public class DSHWechatMsgAndSmsSendThreadUtil { private static Logger logger = Logger.getLogger(DSHWechatMsgAndSmsSendThreadUtil.class); public static void main(String[] args) { List<String> mobileList = new ArrayList<>(); mobileList.add("17721111111"); mobileList.add("15223333333"); String content = "SCRM 多线程发送短信测试"; try { sendSMS(mobileList, content); } catch (Exception e) { e.printStackTrace(); } } /** * 发送短信 * * @param mobileList * 手机号码 * @param content * 短信内容 * @throws Exception */ public static void sendSMS(List<String> mobileList, String content) throws Exception { try { logger.info("send message start..."); long startTime = System.currentTimeMillis(); BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(20000); ThreadPoolExecutor executors = new ThreadPoolExecutor(5, 10, 60000, TimeUnit.SECONDS, queue); // 要推送的用户总数 int count = mobileList.size(); logger.info("message all count=" + count); // 初始每个线程处理的用户数量 final int eveLength = 5; // 计算处理所有用户需要的线程数量 int eveBlocks = count / eveLength + (count % eveLength != 0 ? 1 : 0); logger.info("need thread‘s count=" + eveBlocks); // 线程计数器 CountDownLatch doneSignal = new CountDownLatch(eveBlocks); // 开启线程处理 int doneCount = 0; for (int page = 0; page < eveBlocks; page++) { /* blocks太大可以再细分重新调度 */ content = content + ",线程" + (page + 1); SmsSendThread ms = new SmsSendThread(mobileList, content, page, eveLength, doneSignal); executors.execute(ms); // logger.info("start thread =>{}",page+1); doneCount++; } doneSignal.await();// 等待所有计数器线程执行完 long endTime = System.currentTimeMillis(); logger.info("send message all thread ends!time(s)=" + (startTime - endTime) / 1000); logger.info("all thread count=" + doneCount); } catch (Exception e) { logger.error("send message error=>{}", e); } } /** * 微信公众号推送消息 * @param accessToken 公众号token * @param openIdList 微信OpenID列表 * @param content 消息内容 * @throws Exception */ public static void sendWechatMsg(String accessToken, List<String> openIdList, String content) throws Exception { try { logger.info("send message start..."); long startTime = System.currentTimeMillis(); BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(20000); ThreadPoolExecutor executors = new ThreadPoolExecutor(5, 10, 60000, TimeUnit.SECONDS, queue); // 要推送的用户总数 int count = openIdList.size(); logger.info("message all count=" + count); // 初始每个线程处理的用户数量 final int eveLength = 2000; // 计算处理所有用户需要的线程数量 int eveBlocks = count / eveLength + (count % eveLength != 0 ? 1 : 0); logger.info("need thread‘s count=" + eveBlocks); // 线程计数器 CountDownLatch doneSignal = new CountDownLatch(eveBlocks); // 开启线程处理 int doneCount = 0; for (int page = 0; page < eveBlocks; page++) { /* blocks太大可以再细分重新调度 */ WachatMagSendThread ms = new WachatMagSendThread(accessToken, openIdList, content, page, eveLength, doneSignal); executors.execute(ms); // logger.info("start thread =>{}",page+1); doneCount++; } doneSignal.await();// 等待所有计数器线程执行完 long endTime = System.currentTimeMillis(); logger.info("send message all thread ends!time(s)=" + (startTime - endTime) / 1000); logger.info("all thread count=" + doneCount); } catch (Exception e) { logger.error("send message error=>{}", e); } } }
发送短信线程
package cloud.app.prod.home.utils; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; /** * Author : YongBo Xie </br> * File Name: SmsSendThreadUtils.java </br> * Created Date: 2018年3月26日 下午2:17:42 </br> * Modified Date: 2018年3月26日 下午2:17:42 </br> * Version: 1.0 </br> */ public class SmsSendThread implements Runnable { private static Logger logger = Logger.getLogger(SmsSendThread.class); private List<String> mobileList;//手机号码 private String content;//短信内容 private int currentIndex;//当前索引 private int rows;//处理数据条数 private CountDownLatch doneSignal;//处理线程条数 public SmsSendThread(List<String> mobileList, String content, int currentIndex, int rows, CountDownLatch doneSignal) { this.mobileList = mobileList; this.content = content; this.currentIndex = currentIndex; this.rows = rows; this.doneSignal = doneSignal; } @Override public void run() { // 查询当前的block范围内的发送的手机号=>筛选目标客户群手机号--------- String mobiles = filterPhones(mobileList, currentIndex, rows); try { DSHSendMessageUtil.postSendMsg(content, mobiles); } catch (Exception e) { logger.error("send message thread exception=>{" + mobiles + "}{" + e.getMessage() + "}"); e.printStackTrace(); } finally { doneSignal.countDown();//工人完成工作,计数器减一 } } /** * 筛选目标客户群手机号 * @param mobileList 手机号码 * @param currentIndex 当前索引 * @param rows 处理数据条数 * @return */ private String filterPhones(List<String> mobileList, int currentIndex, int rows) { int startIndex = currentIndex * rows; int endIndex = (currentIndex + 1) * rows; if (endIndex > mobileList.size()) { endIndex = mobileList.size(); } String mobiles = ""; for (int i = startIndex; i < endIndex; i++) { mobiles = mobileList.get(i) + "," + mobiles; } return mobiles; } }
短信发送
package cloud.app.prod.home.utils; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; /** * File Name: DSHSendMessageUtil.java<br> * Created Date: 2018年3月15日 上午9:54:12<br> * Modified Date: 2018年3月15日 上午9:54:12<br> * Version: 1.0<br> */ public class DSHSendMessageUtil { private static final String OPER_ID = "ceshfj2"; private static final String OPER_PASS = "jcUiROHM"; private static final String SEND_URL = "http://qxtsms.guodulink.net:8000/QxtSms/QxtFirewall"; private static final String SEND_TIME = ""; private static final String VALID_TIME = ""; private static final String APPEND_ID = "888"; /** * Test * @param args * @throws Exception */ public static void main(String args[]) throws Exception { try { String content = "http://news.sina.com.cn/o/2018-03-15/doc-ifyshvuy1243084.shtml"; String mobiles = "13918159104,15216865591"; /* post方式发送消息 */ boolean postResponse = DSHSendMessageUtil.postSendMsg(content, mobiles); System.out.println("post方式返回的响应为:" + postResponse); } catch (Exception e) { // TODO: handle exception } } /** * 发送短信 * @param content 短信内容 * @param mobiles 手机号,多个手机号可用“,”隔开 * @return boolean */ public static boolean postSendMsg(String content, String mobiles) throws Exception { boolean flag = false; try { /* 将内容用URLEncoder编一次GBK */ String encoderContent = ""; encoderContent = URLEncoder.encode(content, "GBK"); /* 消息参数 */ StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("OperID=").append(OPER_ID) .append("&OperPass=").append(OPER_PASS) .append("&SendTime=").append(SEND_TIME) .append("&ValidTime=").append(VALID_TIME) .append("&AppendID=").append(APPEND_ID) .append("&DesMobile=").append(mobiles.trim()) .append("&Content=").append(encoderContent) .append("&ContentType=").append("8"); /* 使用post方式发送消息 */ String response = postURL(stringBuilder.toString(), SEND_URL); if (response.indexOf("<code>01</code>") >= 0) { flag = true; } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return flag; } /** * @param commString 需要发送的url参数串 * @param address 需要发送的url地址 * @return 国都返回的XML格式的串 * @catch Exception */ public static String postURL(String commString, String address) { String rec_string = ""; URL url = null; HttpURLConnection urlConn = null; try { /* 得到url地址的URL类 */ url = new URL(address); /* 获得打开需要发送的url连接 */ urlConn = (HttpURLConnection) url.openConnection(); /* 设置连接超时时间 */ urlConn.setConnectTimeout(30000); /* 设置读取响应超时时间 */ urlConn.setReadTimeout(30000); /* 设置post发送方式 */ urlConn.setRequestMethod("POST"); /* 发送commString */ urlConn.setDoOutput(true); urlConn.setDoInput(true); OutputStream out = urlConn.getOutputStream(); out.write(commString.getBytes()); out.flush(); out.close(); /* 发送完毕 获取返回流,解析流数据 */ BufferedReader rd = new BufferedReader(new InputStreamReader(urlConn.getInputStream(), "GBK")); StringBuffer sb = new StringBuffer(); int ch; while ((ch = rd.read()) > -1) { sb.append((char) ch); } rec_string = sb.toString().trim(); rd.close(); } catch (Exception e) { rec_string = "-107"; } finally { if (urlConn != null) { urlConn.disconnect(); } } return rec_string; } }
微信公众号推送消息线程
package cloud.app.prod.home.utils; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; import cloud.app.prod.home.wechat.DSHWechatAPIHander; import net.sf.json.JSONArray; import net.sf.json.JSONObject; /** * Author : YongBo Xie </br> * File Name: WachatMagSendThread.java </br> * Created Date: 2018年3月26日 下午4:11:23 </br> * Modified Date: 2018年3月26日 下午4:11:23 </br> * Version: 1.0 </br> */ public class WachatMagSendThread implements Runnable { private static Logger logger = Logger.getLogger(WachatMagSendThread.class); private String accessToken;//公众号token private List<String> openIdList;//微信OpenID列表 private String content;//消息内容 private int currentIndex;//当前索引 private int rows;//处理数据条数 private CountDownLatch doneSignal;//处理线程条数 public WachatMagSendThread(String accessToken, List<String> openIdList, String content, int currentIndex, int rows, CountDownLatch doneSignal) { this.accessToken = accessToken; this.openIdList = openIdList; this.content = content; this.currentIndex = currentIndex; this.rows = rows; this.doneSignal = doneSignal; } @Override public void run() { // 查询当前的block范围内的发送的OpenID--------- JSONArray openIdArray = filterOpenIds(openIdList, currentIndex, rows); try { // 设置发送消息的参数 JSONObject msgJson = new JSONObject(); msgJson.put("touser", openIdArray); msgJson.put("msgtype", "text"); JSONObject contentJson = new JSONObject(); contentJson.put("content", content); msgJson.put("text", contentJson); DSHWechatAPIHander.sendMessage(accessToken, msgJson.toString()); } catch (Exception e) { logger.error("send message thread exception=>{" + openIdArray.toString() + "}{" + e.getMessage() + "}"); e.printStackTrace(); } finally { doneSignal.countDown();//工人完成工作,计数器减一 } } /** * 筛选目标客户群OpenID * @param openIdList OpenID * @param currentIndex 当前索引 * @param rows 处理数据条数 * @return */ private JSONArray filterOpenIds(List<String> openIdList, int currentIndex, int rows) { int startIndex = currentIndex * rows; int endIndex = (currentIndex + 1) * rows; if (endIndex > openIdList.size()) { endIndex = openIdList.size(); } JSONArray openIdArray = new JSONArray(); for (int i = startIndex; i < endIndex; i++) { openIdArray.add(openIdList.get(i)); } return openIdArray; } }
微信公众号推送消息
package cloud.app.prod.home.wechat; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpClient; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.methods.RequestBuilder; import org.apache.http.conn.routing.HttpRoute; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.apache.log4j.Logger; /** * Author : YongBo Xie </br> * File Name: WechatAPIHander.java </br> * Created Date: 2018年3月13日 下午6:29:00 </br> * Modified Date: 2018年3月13日 下午6:29:00 </br> * Version: 1.0 </br> */ public class DSHWechatAPIHander { private static Logger logger = Logger.getLogger(DSHWechatAPIHander.class); /** * 主动推送信息接口(群发) */ private static String SEND_MSG_URL = "https://api.weixin.qq.com/cgi-bin/message/mass/sendall?access_token={0}"; private static PoolingHttpClientConnectionManager connectionManager = null; private static HttpClientBuilder httpBuilder = null; private static RequestConfig requestConfig = null; private static int MAXCONNECTION = 10; private static int DEFAULTMAXCONNECTION = 5; private static String IP = "127.0.0.1"; private static int PORT = 8888; static { // 设置http的状态参数 requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).setConnectionRequestTimeout(5000).build(); HttpHost target = new HttpHost(IP, PORT); connectionManager = new PoolingHttpClientConnectionManager(); connectionManager.setMaxTotal(MAXCONNECTION);// 客户端总并行链接最大数 connectionManager.setDefaultMaxPerRoute(DEFAULTMAXCONNECTION);// 每个主机的最大并行链接数 connectionManager.setMaxPerRoute(new HttpRoute(target), 20); httpBuilder = HttpClients.custom(); httpBuilder.setConnectionManager(connectionManager); } public static CloseableHttpClient getConnection() { CloseableHttpClient httpClient = httpBuilder.build(); return httpClient; } public static HttpUriRequest getRequestMethod(Map<String, String> map, String url, String method) { List<NameValuePair> params = new ArrayList<NameValuePair>(); Set<Map.Entry<String, String>> entrySet = map.entrySet(); for (Map.Entry<String, String> e : entrySet) { String name = e.getKey(); String value = e.getValue(); NameValuePair pair = new BasicNameValuePair(name, value); params.add(pair); } HttpUriRequest reqMethod = null; if ("post".equals(method)) { reqMethod = RequestBuilder.post().setUri(url).addParameters(params.toArray(new BasicNameValuePair[params.size()])).setConfig(requestConfig).build(); } else if ("get".equals(method)) { reqMethod = RequestBuilder.get().setUri(url).addParameters(params.toArray(new BasicNameValuePair[params.size()])).setConfig(requestConfig).build(); } return reqMethod; } /** * @desc 推送信息 * @param token * @param msg * @return */ public static void sendMessage(String token, String msg) { try { logger.info("\n\nsendMessage start.token:" + token + ",msg:" + msg); String url = MessageFormat.format(SEND_MSG_URL, token); // 创建默认的httpClient实例. CloseableHttpClient httpclient = HttpClients.createDefault(); // 创建httppost HttpPost httppost = new HttpPost(url); // 设置发送消息的参数 // 这里必须是一个合法的json格式数据,每个字段的意义可以查看上面连接的说明,content后面的test是要发送给用户的数据,这里是群发给所有人 // msg = "{\"filter\":{\"is_to_all\":true},\"text\":{\"content\":\"test\"},\"msgtype\":\"text\"}\""; StringEntity sEntity; try { sEntity = new StringEntity(msg); // 解决中文乱码的问题 sEntity.setContentEncoding("UTF-8"); sEntity.setContentType("application/json"); httppost.setEntity(sEntity); System.out.println("executing request " + httppost.getURI()); // 发送请求 CloseableHttpResponse response = httpclient.execute(httppost); try { HttpEntity hEntity = response.getEntity(); if (hEntity != null) { System.out.println("--------------------------------------"); System.out.println("Response content: " + EntityUtils.toString(hEntity, "UTF-8")); System.out.println("--------------------------------------"); } } finally { response.close(); } } catch (ClientProtocolException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { // 关闭连接,释放资源 try { httpclient.close(); } catch (IOException e) { e.printStackTrace(); } } } catch (Exception e) { logger.error("get user info exception", e); } } public static void main(String args[]) throws IOException { Map<String, String> map = new HashMap<String, String>(); map.put("account", ""); map.put("password", ""); HttpClient client = getConnection(); HttpUriRequest post = getRequestMethod(map, "http://baidu.com", "post"); HttpResponse response = client.execute(post); if (response.getStatusLine().getStatusCode() == 200) { HttpEntity entity = response.getEntity(); String message = EntityUtils.toString(entity, "utf-8"); System.out.println(message); } else { System.out.println("请求失败"); } } }