博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【原】通过多线程分批处理派发任务
阅读量:5323 次
发布时间:2019-06-14

本文共 5389 字,大约阅读时间需要 17 分钟。

 前言:

   前几天有其他部门人员反应派发红包很慢,经常出现504或者无响应,于是由我这边进行一个优化后,发放速度由原来的超时或者1分钟变为几秒。

  发放流程:

         活动后台导入一个xls表格,大概2W左右条,经过后台的筛选处理等逻辑后会循环调用插入数据库的代码。

   优化过程:

         分析慢的原因:

                                  1.这套系统是N年前的系统,旧的功能无人维护优化,里面大量使用了jdbcTemplate。

                                  2.发放的时候循环发放,导致要循环2W次,而且每次插入都是需要新建一个对象,然后往里面set数据,然后调用 jdbcTemplate 入库。

          优化思路 :

      1. jdbcTemplate 有个batchUpdate的api,可以通过这个api完成批处理                 

      2. 测试后发现batchUpdate还是会有瓶颈,在数据大的时候还是有点慢,是否考虑通过多线程拆分大的单元,类似于 jdk的 forkJoin,然后每个线程处理一批,最后的结果通过回调统一归并。

   代码片段:

  •  #抽取出一个公共的接口,用于调用具体的处理方法。

    

public interface ITask
{ * @param e 传入对象 * @param params 其他辅助参数 * @return T
返回值类型 * @exception
* @since 2.0 T execute(List e, Map
params);}
  •     #由于发放红包需要实时展示给发放人员看,所以需要有回调处理函数,可以将不同结果的线程收集起来统一给主线程返回,所以新建一个类实现 Callable。
public class HandleCallable
implements Callable
{ private static Logger logger = LoggerFactory.getLogger(HandleCallable.class); // 线程名称 private String threadName = ""; // 需要处理的数据 private List
data; // 辅助参数 private Map
params; // 具体执行任务 private ITask
, E> task; public HandleCallable(String threadName, List
data, Map
params, ITask
, E> task) { this.threadName = threadName; this.data = data; this.params = params; this.task = task; } @Override public ResultBean
>> call() throws Exception { // 该线程中所有数据处理返回结果 ResultBean
>> resultBean = ResultBean.newInstance(); if (data != null && data.size() > 0) { logger.info("线程:{},共处理:{}个数据,开始处理......", threadName, data.size()); // 返回结果集 List
> resultList = new ArrayList<>(); resultList.add(task.execute(data, params)); /*resultList.add(task.execute(data, params));*/ // 循环处理每个数据 /* for (int i = 0; i < data.size(); i++) { // 需要执行的数据 E e = data.get(i); // 将数据执行结果加入到结果集中 resultList.add(task.execute(e, params)); logger.info("线程:{},第{}个数据,处理完成", threadName, (i + 1)); }*/ logger.info("线程:{},共处理:{}个数据,处理完成......", threadName, data.size()); resultBean.setData(resultList); resultBean.setCode(data.size()); } return resultBean; }}
 
  •        #具体的处理线程的类有了,那么还需要考虑谁来切分任务,所以新建一个 线程工具类,主要业务就是切分任务,创建具体的线程个数,统一收集结果。
package com.i2p.util;import java.util.ArrayList;import java.util.List; import java.util.Map; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.i2p.util.thread.ITask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * * MultiThreadUtils
* 创建人:dd
* 时间:2019年1月11日 14:09:38
* @version 2.0 * */ public class MultiThreadUtils
{ private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class); // 线程个数,如不赋值,默认为5 private int threadCount = 20; // 具体业务任务 // private ITask
, T> task; // 线程池管理器 private CompletionService
pool = null; /** * * 初始化线程池和线程个数
* 方法名:newInstance
* 创建人:dd
* 时间:2018年8月8日-下午8:22:00
* @param threadCount * @return MultiThreadUtils
* @exception
* @since 2.0 */ public static MultiThreadUtils newInstance(int threadCount) { MultiThreadUtils instance = new MultiThreadUtils(); threadCount = threadCount; instance.setThreadCount(threadCount); return instance; } /** * * 多线程分批执行list中的任务
* 方法名:execute
* 创建人:dd
* 时间:2019年1月10日 09:00:24
* @param data 线程处理的大数据量list * @param params 处理数据是辅助参数传递 * @param task 具体执行业务的任务接口 * @return ResultBean
* @exception
* @since 2.0 */ @SuppressWarnings("rawtypes") public ResultBean execute(List
data, Map
params, ITask
, T> task) { // 创建线程池 int num = 0; ExecutorService threadpool = Executors.newFixedThreadPool(threadCount); // 根据线程池初始化线程池管理器 pool = new ExecutorCompletionService
(threadpool); // 开始时间(ms) long l = System.currentTimeMillis(); // 数据量大小 int length = data.size(); // 每个线程处理的数据个数 int taskCount = length / threadCount; // 划分每个线程调用的数据 for (int i = 0; i < threadCount; i++) { // 每个线程任务数据list List
subData = null; if (i == (threadCount - 1)) { subData = data.subList(i * taskCount, length); } else { subData = data.subList(i * taskCount, (i + 1) * taskCount); } // 将数据分配给各个线程 HandleCallable execute = new HandleCallable
(String.valueOf(i), subData, params, task); // 将线程加入到线程池 pool.submit(execute); } // 总的返回结果集 List
> result = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { // 每个线程处理结果集 ResultBean
>> threadResult; try { threadResult = pool.take().get(); if(threadResult!=null && threadResult.getData()!=null){ System.out.println("======线程" + i + "执行完毕,返回结果数据:" + threadResult.getCode()); List
> list = threadResult.getData(); num+=threadResult.getCode(); } System.out.println("每个线程处理结果集"+threadResult.getData()); result.addAll(threadResult.getData()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 关闭线程池 threadpool.shutdownNow(); // 执行结束时间 long end_l = System.currentTimeMillis(); logger.info("总耗时:{}ms", (end_l - l)); logger.info("总数量:{}num:", num); return ResultBean.newInstance().setData(num); } public int getThreadCount() { return threadCount; } public void setThreadCount(int threadCount) { this.threadCount = threadCount; } }

转载于:https://www.cnblogs.com/zdd-java/p/10306194.html

你可能感兴趣的文章
如何将域名部署到Tomcat中,用域名访问服务器
查看>>
08.08 web字体 :语法 兼容性写法 字体格式 工具 字体颜图标 多列布局:相关属性 伸缩盒:概念 相关属性...
查看>>
南阳737----石子合并(一)
查看>>
js、jquery中全局替换replace
查看>>
一次U9身份验证http数据对接
查看>>
使用WCF进行跨平台开发之一(WCF的实现、控制台托管与.net平台的调用)
查看>>
Android 发展思路
查看>>
Pythonic
查看>>
contentprovider的学习实例总结
查看>>
Sharepoint 自定义字段
查看>>
TQ2440之中断
查看>>
MySQL 触发器简单实例
查看>>
codeforces 712A. Memory and Crow
查看>>
Latex Undefined control sequence. ...\bm
查看>>
MySQL------报错Access denied for user 'root'@'localhost' (using password:NO)解决方法
查看>>
车牌识别LPR(三)-- LPR系统整体结构
查看>>
log4j异常:WARN No appenders could be found for logger
查看>>
新手村之顺序与分支
查看>>
4.秋招复习简单整理之java支持多继承吗?
查看>>
BZOJ2002: [Hnoi2010]Bounce 弹飞绵羊(LCT)
查看>>