package com.tuchaoshi.base.concurrent; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; public abstract class ListeningExecutorServiceJob<K, V> { private final static Logger logger = LoggerFactory .getLogger(ListeningExecutorServiceJob.class); public static final int AVAILABLE_PROCESSORS_SIZE = Runtime.getRuntime() .availableProcessors(); private static ListeningExecutorService executorService = MoreExecutors .listeningDecorator(Executors .newFixedThreadPool(AVAILABLE_PROCESSORS_SIZE * 2)); public ListeningExecutorServiceJob() { } /** * 多线程执行商品生成信息 * * @description * @return List<TbArtsGoods> * @Exception */ public List<V> executeTask(List<K> keyList) { if (keyList == null) { return null; } List<V> vList = Lists.newArrayList(); long gstartTime = System.currentTimeMillis(); List<ListenableFuture<List<V>>> futures = Lists.newArrayList(); for (K k : keyList) {// 3级类别分组 futures.add(executeTask(k)); } ListenableFuture<List<List<V>>> successfulQueries = Futures .successfulAsList(futures); try { // 获取所有线程的执行结果 List<List<V>> lists = successfulQueries.get(); if (!CollectionUtils.isEmpty(lists)) { for (List<V> list : lists) { if (!CollectionUtils.isEmpty(list)) { vList.addAll(list); } } } } catch (Exception e) { logger.error(e.getMessage(), e); } logger.info(" executeTask ! cost time:" + (System.currentTimeMillis() - gstartTime)); return vList; } /** * * @description * @return ListenableFuture<List<TbArtsGoods>> * @Exception */ private ListenableFuture<List<V>> executeTask(final K k) { ListenableFuture<List<V>> listenableFuture = executorService .submit(new Callable<List<V>>() { @Override public List<V> call() throws Exception { return createTask(k); } }); return listenableFuture; } public abstract List<V> createTask(K k); /** * 拆分任务 * * @param tasks * @param 拆分数量 * @return */ public static <T> List<List<T>> splitTask(List<T> tasks, Integer taskSize) { List<List<T>> list = Lists.newArrayList(); int baseNum = tasks.size() / taskSize; // 每个list的最小size int remNum = tasks.size() % taskSize; // 得到余数 int index = 0; for (int i = 0; i < taskSize; i++) { int arrNum = baseNum; // 每个list对应的size if (i < remNum) { arrNum += 1; } List<T> ls = Lists.newArrayList(); for (int j = index; j < arrNum + index; j++) { ls.add(tasks.get(j)); } list.add(ls); index += arrNum; } return list; } public static void main(String[] args) { ListeningExecutorServiceJob<String, String> executorServiceTask = new ListeningExecutorServiceJob<String, String>() { @Override public List<String> createTask(String k) { List<String> list = Lists.newArrayList(); list.add(k + "===="); return list; } }; List<String> keyList = Lists.newArrayList(); keyList.add("1"); keyList.add("2"); keyList.add("3"); keyList.add("4"); keyList.add("5"); keyList.add("6"); keyList.add("7"); keyList.add("8"); keyList.add("9"); List<String> valueList = executorServiceTask.executeTask(keyList); System.out.println(valueList); System.out.println(splitTask(keyList, 6)); System.out.println(splitTask(keyList, 5)); System.out.println(splitTask(keyList, 4)); System.out.println(splitTask(keyList, 3)); System.out.println(splitTask(keyList, 2)); System.out.println(splitTask(keyList, 1)); ListeningExecutorServiceJob<List<String>, String> executorServiceTask2 = new ListeningExecutorServiceJob<List<String>, String>() { @Override public List<String> createTask(List<String> k) { List<String> list = Lists.newArrayList(); for (String s : k) { list.add(s + "====="); } return list; } }; List<List<String>> strings = splitTask(keyList, 3); System.out.println(executorServiceTask2.executeTask(strings)); } }
相关推荐
java 并发包 java 并发包 java 并发包
ConcurrentSkipListMap等类的概念、原理、数据结构、使用场景描述。
Java并发包思维导图.xmind Java并发包思维导图.xmind Java并发包思维导图.xmind Java并发包思维导图.xmind Java并发包思维导图.xmind Java并发包思维导图.xmind
并发包总结 思维导图 concurrent
java并发包源码分析(3)ThreadLocal 详细配图讲解 java并发包源码分析(3)ThreadLocal 详细配图讲解
java并发包之Callable和Future java并发包之Callable和Future java并发包之Callable和Future java并发包之Callable和Future java并发包之Callable和Future java并发包之Callable和Future
java并发包讲解 可以找我要代码,qq 3341386488 ## 线程安全-并发容器JUC--原理以及分析 1.arrayList --copyonWriteArraylist 优缺点 2.HashSet,TreeSet -- CopyONWriteArraySet,ConcurrentSkipListSet 3....
Java 7并发包最新思维导图,基于《深入浅出Java concurrency》修改
本资源包含两个 pdf 文档,一本根据 Jakob Jenkov 最新博客 (http://tutorials.jenkov.com/java-util-concurrent/index.html) 整理的 java_util_concurrent_user_guide_en.pdf,一个中文翻译的 java_util_concurrent...
Java分布式应用学习笔记04JDK的并发包的集合总结
java.util.concurrent包提供了创建并发应用程序的工具,本资源主要是对该api进行详细的解读,并对api的使用做出安全高效的引用建议.
对象交换器:当两个线程处理的结果需要交换进行处理的时候,使用并发包中适合多线程情况下的数据结构:非阻塞队列:ConcurrentLinkedQueue Conc
并发包 同步容器类 Vector与ArrayList区别 1.ArrayList是最常用的List实现类,内部是通过数组实现的,它允许对元素进行快速随机访问。数组的缺点是每个元素之间不能有间隔,当数组大小不满足时需要增加存储能力,...
java实现的8583发包解包,里边是一个详细的demo,包含socket通讯和银联加密算法
1、Java并发体系-第一阶段-多线程基础知识 2、Java并发体系-第二阶段-锁与同步-[1] ...5、Java并发包-第三阶段-JUC并发包-[1] 6、Java并发包-第三阶段-JUC并发包-[2] 7、Java并发体系-第四阶段-AQS源码解读-[1]
CountDownLatch计数器闭锁是一个能...类似于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭着的,不允许任何线程通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。且当门打开了,就永远保持打开状态。
包含了java1.6 API的中文工具,能对API进行查找,查看中文描述,对API类里面的结构,构造器,方法,成员变量都能一目了然,并且打包concurrent并发包,助你在java多线程道路上一帆风顺。
Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...
Java面试通常涵盖多个...并发编程:了解Java中的线程、同步、锁等机制,以及Java并发包中的工具类。 JVM与性能调优:对Java虚拟机(JVM)有一定了解,包括内存管理、垃圾回收等方面,并知道如何进行基本的性能调优。
第三章 使用 JDK 并发包构建程序第三章使用JDK并发包构建程序 13.13.2原子量23.2.1锁同步法33.2.2比较并交换43.2.3原子变量类63.2