src.zip

  • mynameniubi
    了解作者
  • C/C++
    开发工具
  • 5KB
    文件大小
  • zip
    文件格式
  • 0
    收藏次数
  • 1 积分
    下载积分
  • 0
    下载次数
  • 2021-04-08 20:01
    上传日期
1.定制化线程池实现高并发数据处理。 2.线程池实现了拒绝策略,自动扩缩容,线程安全,管道队列,高并发数据处理等。 3.完全按照Java封装,继承,多态的特性进行开发,实现了多个接口,用户可自行扩展。
src.zip
  • src
  • ThreadPoolUtil
  • DenyPolicy.java
    710B
  • RunnableDenyException.java
    169B
  • RunnableQueue.java
    141B
  • ThreadPool.java
    272B
  • ThreadFactory.java
    112B
  • ThreadPool
  • BasicThreadPool.java
    5KB
  • InternalTask.java
    700B
  • LinkedRunnableQueue.java
    1.4KB
  • TestThreadPool
  • TestThreadPool.java
    551B
  • module-info.java
    22B
内容介绍
package ThreadPool; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import ThreadPoolUtil.DenyPolicy; import ThreadPoolUtil.RunnableQueue; import ThreadPoolUtil.ThreadFactory; import ThreadPoolUtil.ThreadPool; /** * 实现了ThreadPool接口的线程池 * @author 14005 * */ public class BasicThreadPool extends Thread implements ThreadPool{ /** * 工作线程的一个组合 * @author 14005 * */ private static class ThreadTask{ Thread thread; InternalTask internalTask; public ThreadTask(Thread thread,InternalTask internalTask) { this.thread = thread; this.internalTask = internalTask; } } /** * 负责命名的线程工厂 * @author 14005 * */ private static class DefaultThreadFactory implements ThreadFactory{ private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1); private static final ThreadGroup group = new ThreadGroup("MyThreadPool-"+GROUP_COUNTER.getAndIncrement()); private static final AtomicInteger COUNTER = new AtomicInteger(0); @Override public Thread createThread(Runnable runnable) { return new Thread(group,runnable,"thread-pool-"+COUNTER.getAndIncrement()); } } private final int initSize;//初始化线程数量 private final int coreSize;//期望的线程数量 private final int maxSize;//最大的线程数量 private int activeCount;//活跃的线程数量 private final ThreadFactory threadFactory; private final RunnableQueue runnableQueue; //任务队列 private volatile boolean isShutDown = false; private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();//工作线程队列 private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy(); private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory(); private final long keepAliveTime; private final TimeUnit timeUnit; public BasicThreadPool(int initSize, int coreSize, int maxSize, int queueSize) { this(initSize, coreSize, maxSize, queueSize, DEFAULT_THREAD_FACTORY, DEFAULT_DENY_POLICY, 10, TimeUnit.SECONDS); } public BasicThreadPool(int initSize, int coreSize, int maxSize,int queueSize, ThreadFactory threadFactory, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) { super(); this.initSize = initSize; this.coreSize = coreSize; this.maxSize = maxSize; this.threadFactory = threadFactory; this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this); this.keepAliveTime = keepAliveTime; this.timeUnit = timeUnit; this.init(); } /** * 初始化若干个线程 */ private void init() { start(); for(int i=0;i<initSize;i++) { newThread(); } } /** * 新建一个线程 */ private void newThread() { InternalTask internalTask = new InternalTask(runnableQueue); Thread thread = this.threadFactory.createThread(internalTask); ThreadTask threadTask = new ThreadTask(thread, internalTask); threadQueue.offer(threadTask); this.activeCount++; thread.start(); } /** * 销毁一个线程 */ private void removeThread() { ThreadTask threadTask = threadQueue.remove(); threadTask.internalTask.stop(); this.activeCount--; } /** * 提交任务方法 */ @Override public void execute(Runnable runnable) { if(this.isShutDown) throw new IllegalStateException("The thread pool is destory"); this.runnableQueue.offer(runnable); } /** * 自动管理线程池,进行扩容,回收操作等 */ @Override public void run() { while(!isShutDown && !isInterrupted()) { synchronized(this){ if(isShutDown) break; if(runnableQueue.size() > 0 && activeCount < coreSize) { for(int i=activeCount; i<coreSize; i++) { newThread(); } continue; } if(runnableQueue.size() > 0 && activeCount < maxSize) { for(int i=activeCount; i<maxSize; i++) { newThread(); } } if(runnableQueue.size() == 0 && activeCount > coreSize) { for(int i=coreSize; i<activeCount; i++) { removeThread(); } } } } } /** * 销毁线程池 */ @Override public void shutDown() { synchronized(this) { if(isShutDown) return; isShutDown = true; threadQueue.forEach(threadTask - rel='nofollow' onclick='return false;'>{ threadTask.internalTask.stop(); threadTask.thread.interrupt(); }); this.interrupt(); } } /** * 未实现的方法 */ @Override public int getInitSize() { // TODO Auto-generated method stub return 0; } @Override public int getMaxSize() { // TODO Auto-generated method stub return 0; } @Override public int getCoreSize() { // TODO Auto-generated method stub return 0; } @Override public int getQueueSize() { // TODO Auto-generated method stub return 0; } @Override public int getActiveSize() { // TODO Auto-generated method stub return 0; } @Override public boolean isShutDown() { // TODO Auto-generated method stub return false; } }
评论
    相关推荐
    • 线程池代码
      线程池代码共享,使用的语言为C++,此代码符合线程池所使用的技术
    • 线程池示例
      DELPHI的线程池示例,版本1.06 ThreadPool is a abstract class framework for creating specialized pool of workers (TPoolWorker, separate threads) which are managed by a manager (TPoolManager, also a ...
    • Linux 线程池
      可实现线程池的基本功能,用多线程实现对文件的读取,可读取大文件,经实测代码没有任何bug
    • 线程池
      NULL 博文链接:https://rainyear.iteye.com/blog/1182310
    • c++ 线程池
      自己根据搜索到的资源整理的线程池写法,最精简的代码,100多行。最不喜欢故意把代码写的很多,搞的自己很厉害似得。c++版的,已经用在项目中。
    • 线程池管理
      简单实现了线程池的框架构建,并简单进行了测试。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。...
    • 线程池
      NULL 博文链接:https://duxingrufeng.iteye.com/blog/2081420
    • 线程池源码
      线程池,能极大的提高程序的效率,本附件中有源码以及实例程序。有一定C++基础的人看看就应该知道怎么用。
    • 线程池
      NULL 博文链接:https://chinaxxren.iteye.com/blog/866051
    • 线程池.zip
      linux线程池C语言编程,通过简单的例子,可以很好理解到线程池的作用