更新時(shí)間:2021-11-11 09:27:57 來源:動(dòng)力節(jié)點(diǎn) 瀏覽1035次
該多線程處理工具,只需要實(shí)現(xiàn)自己的業(yè)務(wù)邏輯就可以正常使用
主要是針對大數(shù)據(jù)量list,將list劃分多個(gè)線程處理的場景
第一步: ResultBean類,返回結(jié)果統(tǒng)一bean
package com.HM.eis.commons.multiThread;
import java.io.Serializable;
import com.alibaba.fastjson.JSON;
/**
* 返回結(jié)果統(tǒng)一bean
*/
public class ResultBean<T> implements Serializable {
private static final long serialVersionUID = 1L;
// 成功狀態(tài)
public static final int SUCCESS = 1;
// 處理中狀態(tài)
public static final int PROCESSING = 0;
// 失敗狀態(tài)
public static final int FAIL = -1;
// 描述
private String msg = "success";
// 狀態(tài)默認(rèn)成功
private int code = SUCCESS;
// 備注
private String remark;
// 返回?cái)?shù)據(jù)
private T data;
public ResultBean() {
super();
}
public ResultBean(T data) {
super();
this.data = data;
}
/**
* 使用異常創(chuàng)建結(jié)果
*/
public ResultBean(Throwable e) {
super();
this.msg = e.toString();
this.code = FAIL;
}
/**
* 實(shí)例化結(jié)果默認(rèn)成功狀態(tài)<BR>
* 方法名:newInstance<BR>
*/
public static <T> ResultBean<T> newInstance() {
ResultBean<T> instance = new ResultBean<T>();
//默認(rèn)返回信息
instance.code = SUCCESS;
instance.msg = "success";
return instance;
}
/**
* 實(shí)例化結(jié)果默認(rèn)成功狀態(tài)和數(shù)據(jù)<BR>
* 方法名:newInstance<BR>
*/
public static <T> ResultBean<T> newInstance(T data) {
ResultBean<T> instance = new ResultBean<T>();
//默認(rèn)返回信息
instance.code = SUCCESS;
instance.msg = "success";
instance.data = data;
return instance;
}
/**
* 實(shí)例化返回結(jié)果<BR>
* 方法名:newInstance<BR>
*/
public static <T> ResultBean<T> newInstance(int code, String msg) {
ResultBean<T> instance = new ResultBean<T>();
//默認(rèn)返回信息
instance.code = code;
instance.msg = msg;
return instance;
}
/**
* 實(shí)例化返回結(jié)果<BR>
* 方法名:newInstance<BR>
*/
public static <T> ResultBean<T> newInstance(int code, String msg, T data) {
ResultBean<T> instance = new ResultBean<T>();
//默認(rèn)返回信息
instance.code = code;
instance.msg = msg;
instance.data = data;
return instance;
}
/**
* 設(shè)置返回?cái)?shù)據(jù)<BR>
* 方法名:setData<BR>
*/
public ResultBean<T> setData(T data){
this.data = data;
return this;
}
/**
* 設(shè)置結(jié)果描述<BR>
* 方法名:setMsg<BR>
*/
public ResultBean<T> setMsg(String msg){
this.msg = msg;
return this;
}
/**
* 設(shè)置狀態(tài)<BR>
* 方法名:setCode<BR>
*/
public ResultBean<T> setCode(int code){
this.code = code;
return this;
}
/**
* 設(shè)置備注)<BR>
* 方法名:setRemark<BR>
*/
public ResultBean<T> setRemark(String remark){
this.remark = remark;
return this;
}
/**
* 設(shè)置成功描述和返回?cái)?shù)據(jù)<BR>
* 方法名:success<BR>
*/
public ResultBean<T> success(String msg, T data){
this.code = SUCCESS;
this.data = data;
this.msg = msg;
return this;
}
/**
* 設(shè)置成功返回結(jié)果描述<BR>
* 方法名:success<BR>
*/
public ResultBean<T> success(String msg){
this.code = SUCCESS;
this.msg = msg;
return this;
}
/**
* 設(shè)置處理中描述和返回?cái)?shù)據(jù)<BR>
* 方法名:success<BR>
*/
public ResultBean<T> processing(String msg, T data){
this.code = PROCESSING;
this.data = data;
this.msg = msg;
return this;
}
/**
* 設(shè)置處理中返回結(jié)果描述<BR>
* 方法名:success<BR>
*/
public ResultBean<T> processing(String msg){
this.code = PROCESSING;
this.msg = msg;
return this;
}
/**
* 設(shè)置失敗返回描述和返回?cái)?shù)據(jù)<BR>
* 方法名:fail<BR>
*/
public ResultBean<T> fail(String msg, T data){
this.code = FAIL;
this.data = data;
this.msg = msg;
return this;
}
/**
* 設(shè)置失敗返回描述<BR>
* 方法名:fail<BR>
*/
public ResultBean<T> fail(String msg){
this.code = FAIL;
this.msg = msg;
return this;
}
public T getData() {
return data;
}
public String getMsg() {
return msg;
}
public int getCode() {
return code;
}
public String getRemark() {
return remark;
}
/**
* 生成json字符串<BR>
* 方法名:json<BR>
*/
public String json(){
return JSON.toJSONString(this);
}
}
第二步 :ITask接口: 實(shí)現(xiàn)自己的業(yè)務(wù)
package com.HM.eis.commons.multiThread;
import java.util.Map;
/**
* 任務(wù)處理接口
* 具體業(yè)務(wù)邏輯可實(shí)現(xiàn)該接口
* T 返回值類型
* E 傳入值類型
*/
public interface ITask<T, E> {
/**
* 任務(wù)執(zhí)行方法接口<BR>
* 方法名:execute<BR>
* @param e 傳入對象
* @param params 其他輔助參數(shù)
* @return T<BR> 返回值類型
*/
T execute(E e, Map<String, Object> params);
}
第三步 : HandleCallable類: 實(shí)現(xiàn)Callable接口,來處理任務(wù)
package com.HM.eis.commons.multiThread;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @ClassName: HandleCallable.java
* @Description: 線程調(diào)用
*/
@SuppressWarnings("rawtypes")
public class HandleCallable<E> implements Callable<ResultBean> {
private static Logger logger = LoggerFactory.getLogger(HandleCallable.class);
// 線程名稱
private String threadName = "";
// 需要處理的數(shù)據(jù)
private List<E> data;
// 輔助參數(shù)
private Map<String, Object> params;
// 具體執(zhí)行任務(wù)
private ITask<ResultBean<Object>, E> task;
public HandleCallable(String threadName, List<E> data, Map<String, Object> params,
ITask<ResultBean<Object>, E> task) {
this.threadName = threadName;
this.data = data;
this.params = params;
this.task = task;
}
@Override
public ResultBean<List<ResultBean<Object>>> call() throws Exception {
// 該線程中所有數(shù)據(jù)處理返回結(jié)果
ResultBean<List<ResultBean<Object>>> resultBean = ResultBean.newInstance();
if (data != null && data.size() > 0) {
logger.info("線程:{},共處理:{}個(gè)數(shù)據(jù),開始處理......", threadName, data.size());
// 返回結(jié)果集
List<ResultBean<Object>> resultList = new ArrayList<>();
// 循環(huán)處理每個(gè)數(shù)據(jù)
for (int i = 0; i < data.size(); i++) {
// 需要執(zhí)行的數(shù)據(jù)
E e = data.get(i);
// 將數(shù)據(jù)執(zhí)行結(jié)果加入到結(jié)果集中
resultList.add(task.execute(e, params));
logger.info("線程:{},第{}個(gè)數(shù)據(jù),處理完成", threadName, (i + 1));
}
logger.info("線程:{},共處理:{}個(gè)數(shù)據(jù),處理完成......", threadName, data.size());
resultBean.setData(resultList);
}
return resultBean;
}
}
第四步: MultiThreadUtils類,多線程工具類
package com.HM.eis.commons.multiThread;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @ClassName: MultiThreadUtils.java
* @Description: 多線程工具類
*/
@SuppressWarnings({ "all" })
public class MultiThreadUtils<T> {
private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class);
// 線程個(gè)數(shù),如不賦值,默認(rèn)為5
private int threadCount = 5;
// 具體業(yè)務(wù)任務(wù)
private ITask<ResultBean<String>, T> task;
// 線程池管理器
private CompletionService<ResultBean> pool = null;
/**
* 初始化線程池和線程個(gè)數(shù)<BR>
*/
public static MultiThreadUtils newInstance(int threadCount) {
MultiThreadUtils instance = new MultiThreadUtils();
threadCount = threadCount;
instance.setThreadCount(threadCount);
return instance;
}
/**
*
* 多線程分批執(zhí)行l(wèi)ist中的任務(wù)<BR>
* 方法名:execute<BR>
*/
public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<Object>, T> task) {
// 創(chuàng)建線程池
ExecutorService threadpool = Executors.newFixedThreadPool(threadCount);
// 根據(jù)線程池初始化線程池管理器
pool = new ExecutorCompletionService<ResultBean>(threadpool);
// 開始時(shí)間(ms)
long l = System.currentTimeMillis();
// 數(shù)據(jù)量大小
int length = data.size();
// 每個(gè)線程處理的數(shù)據(jù)個(gè)數(shù)
int taskCount = length / threadCount;
// 劃分每個(gè)線程調(diào)用的數(shù)據(jù)
for (int i = 0; i < threadCount; i++) {
// 每個(gè)線程任務(wù)數(shù)據(jù)list
List<T> subData = null;
if (i == (threadCount - 1)) {
subData = data.subList(i * taskCount, length);
} else {
subData = data.subList(i * taskCount, (i + 1) * taskCount);
}
// 將數(shù)據(jù)分配給各個(gè)線程
HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task);
// 將線程加入到線程池
pool.submit(execute);
}
// 總的返回結(jié)果集
List<ResultBean<String>> result = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
// 每個(gè)線程處理結(jié)果集
ResultBean<List<ResultBean<String>>> threadResult;
try {
threadResult = pool.take().get();
result.addAll(threadResult.getData());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 關(guān)閉線程池
threadpool.shutdownNow();
// 執(zhí)行結(jié)束時(shí)間
long end_l = System.currentTimeMillis();
logger.info("總耗時(shí):{}ms", (end_l - l));
return ResultBean.newInstance().setData(result);
}
public int getThreadCount() {
return threadCount;
}
public void setThreadCount(int threadCount) {
this.threadCount = threadCount;
}
}
第五步: 測試
package com.HM.eig.thread;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.HM.eig.commons.multiThread.ITask;
import com.HM.eig.commons.multiThread.MultiThreadUtils;
import com.HM.eig.commons.multiThread.ResultBean;
/**
* 具體執(zhí)行業(yè)務(wù)任務(wù) 需要 實(shí)現(xiàn)ITask接口 在execute中重寫業(yè)務(wù)邏輯
*/
public class TestTask implements ITask<ResultBean<String>, Integer> {
@Override
public ResultBean execute(Integer e, Map<String, Object> params) {
/**
* 具體業(yè)務(wù)邏輯:將list中的元素加上輔助參數(shù)中的數(shù)據(jù)返回
*/
int addNum = Integer.valueOf(String.valueOf(params.get("addNum")));
e = e + addNum;
ResultBean<String> resultBean = ResultBean.newInstance();
resultBean.setData(e.toString());
return resultBean;
}
public static void main(String[] args) {
// 需要多線程處理的大量數(shù)據(jù)list
List<Integer> data = new ArrayList<>(10000);
for(int i = 0; i < 10000; i ++){
data.add(i + 1);
}
// 創(chuàng)建多線程處理任務(wù)
MultiThreadUtils<Integer> threadUtils = MultiThreadUtils.newInstance(5);
ITask<ResultBean<String>, Integer> task = new TestTask();
// 輔助參數(shù) 加數(shù)
Map<String, Object> params = new HashMap<>();
params.put("addNum", 4);
// 執(zhí)行多線程處理,并返回處理結(jié)果
ResultBean<List<ResultBean<String>>> resultBean = threadUtils.execute(data, params, task);
}
}
建議以內(nèi)部類的形式使用:
public class TestTask {
public static void main(String[] args) {
// 需要多線程處理的大量數(shù)據(jù)list
List<Integer> data = new ArrayList<>(100);
for(int i = 0; i < 100; i ++){
data.add(i + 1);
}
// 創(chuàng)建多線程處理任務(wù)
MultiThreadUtils<Integer> threadUtils = MultiThreadUtils.newInstance(4);
// 輔助參數(shù) 加數(shù)
Map<String, Object> params = new HashMap<>();
params.put("addNum", 4);
// 執(zhí)行多線程處理,并返回處理結(jié)果
ResultBean<List<ResultBean<Object>>> resultBean = threadUtils.execute(data, params, new ITask<ResultBean<Object>, Integer>(){
@Override
public ResultBean<Object> execute(Integer e, Map<String, Object> params) {
/**
* 具體業(yè)務(wù)邏輯:將list中的元素加上輔助參數(shù)中的數(shù)據(jù)返回
*/
int addNum = Integer.valueOf(String.valueOf(params.get("addNum")));
e = e + addNum;
ResultBean<Object> resultBean = ResultBean.newInstance();
resultBean.setData(e.toString());
return resultBean;
}
});
}
}
以上就是關(guān)于“Java多線程工具類的使用方法”的介紹,如果想了解更多信息,可以關(guān)注一下動(dòng)力節(jié)點(diǎn)的Java開發(fā)工具,里面有更多的豐富在等著大家去學(xué)習(xí),希望對大家能夠有所幫助。
初級 202925
初級 203221
初級 202629
初級 203743