爆款云主机2核4G限时秒杀,88元/年起!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 618智算钜惠季 爆款云主机2核4G限时秒杀,88元/年起!
  • 免费体验DeepSeek,上天翼云息壤 NEW 新老用户均可免费体验2500万Tokens,限时两周
  • 云上钜惠 HOT 爆款云主机全场特惠,更有万元锦鲤券等你来领!
  • 算力套餐 HOT 让算力触手可及
  • 天翼云脑AOne NEW 连接、保护、办公,All-in-One!
  • 中小企业应用上云专场 产品组合下单即享折上9折起,助力企业快速上云
  • 息壤高校钜惠活动 NEW 天翼云息壤杯高校AI大赛,数款产品享受线上订购超值特惠
  • 天翼云电脑专场 HOT 移动办公新选择,爆款4核8G畅享1年3.5折起,快来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

智算服务

打造统一的产品能力,实现算网调度、训练推理、技术架构、资源管理一体化智算服务
智算云(DeepSeek专区)
科研助手
  • 算力商城
  • 应用商城
  • 开发机
  • 并行计算
算力互联调度平台
  • 应用市场
  • 算力市场
  • 算力调度推荐
一站式智算服务平台
  • 模型广场
  • 体验中心
  • 服务接入
智算一体机
  • 智算一体机
大模型
  • DeepSeek-R1-昇腾版(671B)
  • DeepSeek-R1-英伟达版(671B)
  • DeepSeek-V3-昇腾版(671B)
  • DeepSeek-R1-Distill-Llama-70B
  • DeepSeek-R1-Distill-Qwen-32B
  • Qwen2-72B-Instruct
  • StableDiffusion-V2.1
  • TeleChat-12B

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场创新解决方案
办公协同
  • WPS云文档
  • 安全邮箱
  • EMM手机管家
  • 智能商业平台
财务管理
  • 工资条
  • 税务风控云
企业应用
  • 翼信息化运维服务
  • 翼视频云归档解决方案
工业能源
  • 智慧工厂_生产流程管理解决方案
  • 智慧工地
建站工具
  • SSL证书
  • 新域名服务
网络工具
  • 翼云加速
灾备迁移
  • 云管家2.0
  • 翼备份
资源管理
  • 全栈混合云敏捷版(软件)
  • 全栈混合云敏捷版(一体机)
行业应用
  • 翼电子教室
  • 翼智慧显示一体化解决方案

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
  • 天翼云EasyCoding平台
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼云东升计划
  • 适配中心
  • 东升计划
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
开放能力
  • EasyCoding敏捷开发平台
培训与认证
  • 天翼云学堂
  • 天翼云认证
魔乐社区
  • 魔乐社区

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 建议与反馈
  • 用户体验官
  • 服务保障
  • 客户公告
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 智算服务
  • 产品
  • 解决方案
  • 应用商城
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心

      并发队列-无界阻塞队列LinkedBlockingQueue原理探究

      首页 知识中心 软件开发 文章详情页

      并发队列-无界阻塞队列LinkedBlockingQueue原理探究

      2023-03-21 10:31:48 阅读次数:594

      多线程同步

      一、前言

      前面介绍了使用CAS实现的非阻塞队列ConcurrentLinkedQueue,下面就来介绍下使用独占锁实现的阻塞队列LinkedBlockingQueue的实现。

      二、 LinkedBlockingQueue类图结构

      并发队列-无界阻塞队列LinkedBlockingQueue原理探究

      如图LinkedBlockingQueue中也有两个Node分别用来存放首尾节点,并且里面有个初始值为0的原子变量count用来记录队列元素个数,另外里面有两个ReentrantLock的独占锁,分别用来控制元素入队和出队加锁,其中takeLock用来控制同时只有一个线程可以从队列获取元素,其他线程必须等待,putLock控制同时只能有一个线程可以获取锁去添加元素,其他线程必须等待。另外notEmpty和notFull用来实现入队和出队的同步。 另外由于出入队是两个非公平独占锁,所以可以同时又一个线程入队和一个线程出队,其实这个是个生产者-消费者模型。

      /** Lock held by take, poll, etc */
      
      private final ReentrantLock takeLock = new ReentrantLock();
      
      /** Wait queue for waiting takes */
      
      private final Condition notEmpty = takeLock.newCondition();
      
      /** Lock held by put, offer, etc */
      
      private final ReentrantLock putLock = new ReentrantLock();
      
      /** Wait queue for waiting puts */
      
      private final Condition notFull = putLock.newCondition();
      

      / ** Current number of elements * /

      private final AtomicInteger count = new AtomicInteger(0);
      
      
      public static final int   MAX_VALUE = 0x7fffffff;
      
      public LinkedBlockingQueue() {
      
          this(Integer.MAX_VALUE);
      
      }
      
        public LinkedBlockingQueue(int capacity) {
      
          if (capacity <= 0) throw new IllegalArgumentException();
      
          this.capacity = capacity;
      
          //初始化首尾节点
      
          last = head = new Node<E>(null);
      
      }
      

      如图默认队列容量为0x7fffffff;用户也可以自己指定容量。

      三、必备基础

      3.1 ReentrantLock

      可以参考https://www.ctyun.cn/portal/link.html?target=https%3A%2F%2Fwww.atatech.org%2Farticles%2F80539%3Fflag_data_from%3Dactive

      3.2 条件变量(Condition)

      条件变量这里使用的是takeLock.newCondition()获取也就是说调用ReentrantLock的方法获取的,那么可预见Condition使用了ReentrantLock的state。上面的参考没有提到所以这里串串讲下

      • 首先看下类图结构

      并发队列-无界阻塞队列LinkedBlockingQueue原理探究

      如图ConditionObject中两个node分别用来存放条件队列的首尾节点,条件队列就是调用条件变量的await方法被阻塞后的节点组成的单向链表。另外ConditionObject还要依赖AQS的state,ConditionObject是AQS类的一个内部类。

      • awaitNanos操作
      
      public final long awaitNanos(long nanosTimeout)
      
              throws InterruptedException {
      
          //如果中断标志被设置了,则抛异常
      
          if (Thread.interrupted())
      
              throw new InterruptedException();
      
          //添加当前线程节点到条件队列,
      
          Node node = addConditionWaiter();
      
          //当前线程释放独占锁
      
          int savedState = fullyRelease(node);
      
          long lastTime = System.nanoTime();
      
          int interruptMode = 0;
      
          while (!isOnSyncQueue(node)) {
      
              if (nanosTimeout <= 0L) {
      
                  transferAfterCancelledWait(node);
      
                  break;
      
              }
      
              //挂起当前线程直到超时
      
              LockSupport.parkNanos(this, nanosTimeout);
      
              if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      
                  break;
      
              long now = System.nanoTime();
      
              nanosTimeout -= now - lastTime;
      
              lastTime = now;
      
          }
      
          //unpark后,当前线程重新获取锁,有可能获取不到被放到AQS的队列
      
          if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
      
              interruptMode = REINTERRUPT;
      
          if (node.nextWaiter != null)
      
              unlinkCancelledWaiters();
      
          if (interruptMode != 0)
      
              reportInterruptAfterWait(interruptMode);
      
          return nanosTimeout - (System.nanoTime() - lastTime);
      
      }
      
          final int fullyRelease(Node node) {
      
              boolean failed = true;
      
              try {
      
                  int savedState = getState();
      
                  //释放锁,如果失败则抛异常
      
                  if (release(savedState)) {
      
                      failed = false;
      
                      return savedState;
      
                  } else {
      
                      throw new IllegalMonitorStateException();
      
                  }
      
              } finally {
      
                  if (failed)
      
                      node.waitStatus = Node.CANCELLED;
      
              }
      
          }
      

      首先如果当前线程中断标志被设置了,直接抛出异常。添加当前线程节点(状态为:-2)到条件队列。

      然后尝试释放当前线程拥有的锁并保存当前计数,可知如果当前线程调用awaitNano前没有使用当前条件变量所在的Reetenlock变量调用lock或者lockInterruptibly获取到锁,会抛出IllegalMonitorStateException异常。

      然后调用park挂起当前线程直到超时或者其他线程调用了当前线程的unpark方法,或者调用了当前线程的interupt方法(这时候会抛异常)。

      如果超时或者其他线程调用了当前线程的unpark方法,则当前线程从挂起变为激活,获取cpu资源后会继续执行,会重新获取锁。

      • signal操作
      
      public final void signal() {
      
          //如果当前线程没有持有锁,抛异常
      
          if (!isHeldExclusively())
      
              throw new IllegalMonitorStateException();
      
          //从条件队列找第一个状态为CONDITION的,然后把状态变为0
      
          Node first = firstWaiter;
      
          if (first != null)
      
              doSignal(first);
      
      }
      
      private void doSignal(Node first) {
      
          do {
      
              if ( (firstWaiter = first.nextWaiter) == null)
      
                  lastWaiter = null;
      
              first.nextWaiter = null;
      
          } while (!transferForSignal(first) &&
      
                   (first = firstWaiter) != null);
      
      }
      
      final boolean transferForSignal(Node node) {
      
      
      
          //状态为CONDITION的,然后把状态变为0
      
          if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
      
              return false;
      
      
      
          //把条件队列的上面状态为0的节点放入AQS阻塞队列
      
          Node p = enq(node);
      
          int ws = p.waitStatus;
      
          //调用unpark激活挂起的线程
      
          if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
      
              LockSupport.unpark(node.thread);
      
          return true;
      
      }
      

      首先看调用signal的线程是不是持有了独占锁,没有则抛出异常。

      然后获取在条件队列里面待的时间最长的node,把它移动到线程持有的锁所在的AQS队列。

      其中enq方法就是把当前节点放入了AQS队列,但是这时候该节点还是在条件队列里面那,那么什么时候从条件队列移除那?其实在await里面的unlinkCancelledWaiters方法。

      总结: 无论是条件变量的await和singal都是需要先获取独占锁才能调用,因为条件变量使用的就是独占锁里面的state管理状态,否者会报异常。

      四 、带超时时间的offer操作-生产者

      在队尾添加元素,如果队列满了,那么等待timeout时候,如果时间超时则返回false,如果在超时前队列有空余空间,则插入后返回true。

      
      public boolean offer(E e, long timeout, TimeUnit unit)
      
          throws InterruptedException {
      
          //空元素抛空指针异常
      
          if (e == null) throw new NullPointerException();
      
          long nanos = unit.toNanos(timeout);
      
          int c = -1;
      
          final ReentrantLock putLock = this.putLock;
      
          final AtomicInteger count = this.count;
      
          //获取可被中断锁,只有一个线程克获取
      
          putLock.lockInterruptibly();
      
          try {
      
              //如果队列满则进入循环
      
              while (count.get() == capacity) {
      
                  //nanos<=0直接返回
      
                  if (nanos <= 0)
      
                      return false;
      
                  //否者调用await进行等待,超时则返回<=0(1)
      
                  nanos = notFull.awaitNanos(nanos);
      
              }
      
              //await在超时时间内返回则添加元素(2)
      
              enqueue(new Node<E>(e));
      
              c = count.getAndIncrement();
      
              //队列不满则激活其他等待入队线程(3)
      
              if (c + 1 < capacity)
      
                  notFull.signal();
      
          } finally {
      
              //释放锁
      
              putLock.unlock();
      
          }
      
          //c==0说明队列里面有一个元素,这时候唤醒出队线程(4)
      
          if (c == 0)
      
              signalNotEmpty();
      
          return true;
      
      }
      
      private void enqueue(Node<E> node) {   
      
          last = last.next = node;
      
      }
      
          private void signalNotEmpty() {
      
              final ReentrantLock takeLock = this.takeLock;
      
              takeLock.lock();
      
              try {
      
                  notEmpty.signal();
      
              } finally {
      
                  takeLock.unlock();
      
              }
      
          }
      

      如果获取锁前面有线程调用了putLock. interrupt(),并且后面没有调用interrupted()重置中断标志,调用lockInterruptibly时候会抛出InterruptedException异常。

      队列满的时候调用notFull.awaitNanos阻塞当前线程,当前线程会释放获取的锁,然后等待超时或者其他线程调用了notFull.signal()才会返回并重新获取锁,或者其他线程调用了该线程的interrupt方法设置了中断标志,这时候也会返回但是会抛出InterruptedException异常。

      如果超时则直接返回false,如果超时前调用了notFull.signal()则会退出循环,执行(2)添加元素到队列,然后执行(3),(3)的目的是为了激活其他入队等待线程。(4)的话c==0说明队列里面已经有一个元素了,这时候就可以激活等待出队线程了。

      另外signalNotEmpty函数是先获取独占锁,然后在调用的signal这也证明了3.2节的结论。

      五、 带超时时间的poll操作-消费者

      获取并移除队首元素,在指定的时间内去轮询队列看有没有首元素有则返回,否者超时后返回null

      
      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
      
          E x = null;
      
          int c = -1;
      
          long nanos = unit.toNanos(timeout);
      
          final AtomicInteger count = this.count;
      
          final ReentrantLock takeLock = this.takeLock;
      
          //出队线程获取独占锁
      
          takeLock.lockInterruptibly();
      
          try {
      
              //循环直到队列不为空
      
              while (count.get() == 0) {
      
                  //超时直接返回null
      
                  if (nanos <= 0)
      
                      return null;
      
                  nanos = notEmpty.awaitNanos(nanos);
      
              }
      
              //出队,计数器减一
      
              x = dequeue();
      
              c = count.getAndDecrement();
      
              //如果出队前队列不为空则发送信号,激活其他阻塞的出队线程
      
              if (c > 1)
      
                  notEmpty.signal();
      
          } finally {
      
              //释放锁
      
              takeLock.unlock();
      
          }
      
          //当前队列容量为最大值-1则激活入队线程。
      
          if (c == capacity)
      
              signalNotFull();
      
          return x;
      
      }
      

      首先获取独占锁,然后进入循环当当前队列有元素才会退出循环,或者超时了,直接返回null。

      超时前退出循环后,就从队列移除元素,然后计数器减去一,如果减去1前队列元素大于1则说明当前移除后队列还有元素,那么就发信号激活其他可能阻塞到当前条件信号的线程。

      最后如果减去1前队列元素个数=最大值,那么移除一个后会腾出一个空间来,这时候可以激活可能存在的入队阻塞线程。

      六、put操作-生产者

      与带超时时间的poll类似不同在于put时候如果当前队列满了它会一直等待其他线程调用notFull.signal才会被唤醒。

      七、 take操作-消费者

      与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才会被唤醒。

      八、 size操作

      当前队列元素个数,如代码直接使用原子变量count获取。

      
          public int size() {
      
              return count.get();
      
          }
      

      九、peek操作

      获取但是不移除当前队列的头元素,没有则返回null

      
          public E peek() {
      
              //队列空,则返回null
      
              if (count.get() == 0)
      
                  return null;
      
              final ReentrantLock takeLock = this.takeLock;
      
              takeLock.lock();
      
              try {
      
                  Node<E> first = head.next;
      
                  if (first == null)
      
                      return null;
      
                  else
      
                      return first.item;
      
              } finally {
      
                  takeLock.unlock();
      
              }
      
          }
      

      十、 remove操作

      删除队列里面的一个元素,有则删除返回true,没有则返回false,在删除操作时候由于要遍历队列所以加了双重锁,也就是在删除过程中不允许入队也不允许出队操作

      
      public boolean remove(Object o) {
      
          if (o == null) return false;
      
          //双重加锁
      
          fullyLock();
      
          try {
      
              //遍历队列找则删除返回true
      
              for (Node<E> trail = head, p = trail.next;
      
                   p != null;
      
                   trail = p, p = p.next) {
      
                  if (o.equals(p.item)) {
      
                      unlink(p, trail);
      
                      return true;
      
                  }
      
              }
      
              //找不到返回false
      
              return false;
      
          } finally {
      
              //解锁
      
              fullyUnlock();
      
          }
      
      }
      
      void fullyLock() {
      
          putLock.lock();
      
          takeLock.lock();
      
      }
      
      void fullyUnlock() {
      
          takeLock.unlock();
      
          putLock.unlock();
      
      }
      
      void unlink(Node<E> p, Node<E> trail) {
      
      
      
          p.item = null;
      
          trail.next = p.next;
      
          if (last == p)
      
              last = trail;
      
          //如果当前队列满,删除后,也不忘记最快的唤醒等待的线程
      
          if (count.getAndDecrement() == capacity)
      
              notFull.signal();
      
      }
      

      十一、开源框架中使用

      首先线程池Executors的newFixedThreadPool和newSingleThreadExecutor的工作队列就是使用的这个。

      然后tomcat中任务队列TaskQueue是继承并扩展了的,下面看看TaskQueue

      11.1 类图结构

      并发队列-无界阻塞队列LinkedBlockingQueue原理探究

      可知TaskQueue继承了LinkedBlockingQueue并且泛化类型固定了为Runnalbe.重写了offer,poll,take方法。

      11.2 TaskQueue

      tomcat中有个线程池ThreadPoolExecutor,在NIOEndPoint中当acceptor线程接受到请求后,会把任务放入队列,然后poller 线程从队列里面获取任务,然后就吧任务放入线程池执行。这个ThreadPoolExecutor中的的一个参数就是TaskQueue。

      先看看ThreadPoolExecutor的参数如果是普通LinkedBlockingQueue是怎么样的执行逻辑:

      当调用线程池方法 execute() 方法添加一个任务时:

      • 如果当前运行的线程数量小于 corePoolSize,则创建新线程运行该任务
      • 如果当前运行的线程数量大于或等于 corePoolSize,则将这个任务放入阻塞队列。
      • 如果当前队列满了,并且当前运行的线程数量小于 maximumPoolSize,则创建新线程运行该任务;
      • 如果当前队列满了,并且当前运行的线程数量大于或等于 maximumPoolSize,那么线程池将会抛出RejectedExecutionException异常。

      如果线程执行完了当前任务,那么会去队列里面获取一个任务来执行,如果任务执行完了,并且当前线程数大于corePoolSize,那么会根据线程空闲时间keepAliveTime回收一些线程保持线程池corePoolSize个线程。

      首先看下线程池中exectue添加任务时候的逻辑:

      
      public void execute(Runnable command) {
      
          if (command == null)
      
              throw new NullPointerException();
      
      
      
          //当前工作线程个数小于core个数则开新线程执行(1)
      
          int c = ctl.get();
      
          if (workerCountOf(c) < corePoolSize) {
      
              if (addWorker(command, true))
      
                  return;
      
              c = ctl.get();
      
          }
      
          //放入队列(2)
      
          if (isRunning(c) && workQueue.offer(command)) {
      
              int recheck = ctl.get();
      
              if (! isRunning(recheck) && remove(command))
      
                  reject(command);
      
              else if (workerCountOf(recheck) == 0)
      
                  addWorker(null, false);
      
          }
      
          //如果队列满了则开新线程,但是个数要不超过最大值,超过则返回false
      
          //然后执行reject handler(3)
      
          else if (!addWorker(command, false))
      
              reject(command);
      
      }
      

      可知当当前工作线程个数为corePoolSize后,如果在来任务会把任务添加到队列,队列满了或者入队失败了则开启新线程。

      然后看看TaskQueue中重写的offer方法的逻辑:

      
      public boolean offer(Runnable o) {
      
          // 如果parent为null则直接调用父类方法
      
          if (parent==null) return super.offer(o);
      
          //如果当前线程池中线程个数达到最大,则无条件调用父类方法
      
          if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
      
          //如果当前提交的任务小于当前线程池线程数,说明线程用不完,没必要重新开线程
      
          if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
      
          //如果当前线程池线程个数>core个数但是小于最大个数,则开新线程代替放入队列
      
          if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
      
          //到了这里,无条件调用父类
      
          return super.offer(o);
      
      }
      

      可知parent.getPoolSize()<parent.getMaximumPoolSize()普通队列会把当前任务放入队列,TAskQueue则是返回false,因为这会开启新线程执行任务,当然前提是当前线程个数没有达到最大值。

      然后看下Worker线程中如果从队列里面获取任务执行的:

      
       final void runWorker(Worker w) {
      
                     ...
      
              try {
      
                  while (task != null || (task = getTask()) != null) {
      
                     ...
      
                  }
      
                  completedAbruptly = false;
      
              } finally {
      
                    ...
      
              }
      
      }
      
      private Runnable getTask() {
      
              boolean timedOut = false; // Did the last poll() time out?
      
              for (;;) {
      
                  int c = ctl.get();
      
                  int rs = runStateOf(c);
      
                  ...
      
                  int wc = workerCountOf(c);
      
                  ...
      
                  try {
      
                      //根据timed决定调用poll还是take
      
                      Runnable r = timed ?
      
                          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
      
                          workQueue.take();
      
                      if (r != null)
      
                          return r;
      
                      timedOut = true;
      
                  } catch (InterruptedException retry) {
      
                      timedOut = false;
      
                  }
      
              }
      
      }
      

      十二、总结

      12.1 并发安全总结

      仔细思考下阻塞队列是如何实现并发安全的维护队列链表的,先分析下简单的情况就是当队列里面有多个元素时候,由于同时只有一个线程(通过独占锁putLock实现)入队元素并且是操作last节点(,而同时只有一个出队线程(通过独占锁takeLock实现)操作head节点,所以不存在并发安全问题。

      并发队列-无界阻塞队列LinkedBlockingQueue原理探究

      • 考虑当队列为空的时候队列状态为:

      并发队列-无界阻塞队列LinkedBlockingQueue原理探究

      这时候假如一个线程调用了take方法,由于队列为空,所以count.get()==0所以当前线程会调用notEmpty.await()把自己挂起,并且放入notEmpty的条件队列,并且释放当前条件变量关联的通过takeLock.lockInterruptibly()获取的独占锁。由于释放了锁,所以这时候其他线程调用take时候就会通过takeLock.lockInterruptibly()获取独占锁,然后同样阻塞到notEmpty.await(),同样会被放入notEmpty的条件队列,也就说在队列为空的情况下可能会有多个线程因为调用take被放入了notEmpty的条件队列。

      这时候如果有一个线程调用了put方法,那么就会调用enqueue操作,该操作会在last节点后面添加新元素并且设置last为新节点。然后count.getAndIncrement()先获取当前队列元个数为0保存到c,然后自增count为1,由于c==0所以调用signalNotEmpty激活notEmpty的条件队列里面的阻塞时间最长的线程,这时候take中调用notEmpty.await()的线程会被激活await内部会重新去获取独占锁获取成功则返回,否者被放入AQS的阻塞队列,如果获取成功,那么count.get() >0因为可能多个线程put了,所以调用dequeue从队列获取元素(这时候一定可以获取到),然后调用c = count.getAndDecrement() 把当前计数返回后并减去1,如果c>1 说明当前队列还有其他元素,那么就调用 notEmpty.signal()去激活 notEmpty的条件队列里面的其他阻塞线程。

      • 考虑当队列满的时候:

      当队列满的时候调用put方法时候,会由于notFull.await()当前线程被阻塞放入notFull管理的条件队列里面,同理可能会有多个调用put方法的线程都放到了notFull的条件队列里面。

      这时候如果有一个线程调用了take方法,调用dequeue()出队一个元素,c = count.getAndDecrement();count值减一;c==capacity;现在队列有一个空的位置,所以调用signalNotFull()激活notFull条件队列里面等待最久的一个线程。

      12.2简单对比

      LinkedBlockingQueue与ConcurrentLinkedQueue相比前者前者是阻塞队列使用可重入独占的非公平锁来实现通过使用put锁和take锁使得入队和出队解耦可以同时进行处理,但是同时只有一个线程可以入队或者出队,其他线程必须等待,另外引入了条件变量来进行入队和出队的同步,每个条件变量维护一个条件队列用来存放阻塞的线程,要注意这个队列和AQS的队列不是一个东东。LinkedBlockingQueue的size操作通过使用原子变量count获取能够比较精确的获取当前队列的元素个数,另外remove方法使用双锁保证删除时候队列元素保持不变,另外其实这个是个生产者-消费者模型。

      而ConcurrentLinkedQueue则使用CAS非阻塞算法来实现,使用CAS原子操作保证链表构建的安全性,当多个线程并发时候CAS失败的线程不会被阻塞,而是使用cpu资源去轮询CAS直到成功,size方法先比LinkedBlockingQueue的获取的个数是不精确的,因为获取size的时候是通过遍历队列进行的,而遍历过程中可能进行增加删除操作,remove方法操作时候也没有对整个队列加锁,remove时候可能进行增加删除操作,这就可能删除了一个刚刚新增的元素,而不是删除的想要位置的。

      欢迎看官们拍砖,让我们共同进步!

      版权声明:本文内容来自第三方投稿或授权转载,原文地址:http://ifeve.com/%e5%b9%b6%e5%8f%91%e9%98%9f%e5%88%97-%e6%97%a0%e7%95%8c%e9%98%bb%e5%a1%9e%e9%98%9f%e5%88%97linkedblockingqueue%e5%8e%9f%e7%90%86%e6%8e%a2%e7%a9%b6/,作者:并发编程网,版权归原作者所有。本网站转在其作品的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如因作品内容、版权等问题需要同本网站联系,请发邮件至ctyunbbs@chinatelecom.cn沟通。

      上一篇:并发队列-无界阻塞延迟队列DelayQueue原理探究

      下一篇:LockSupport 源码阅读

      相关文章

      2024-09-24 06:30:42

      JDK的sql设计不合理导致的驱动类初始化死锁问题

      JDK的sql设计不合理导致的驱动类初始化死锁问题

      2024-09-24 06:30:42
      java在线 , 多线程同步
      2024-06-27 09:21:24

      泥瓦匠聊并发编程基础篇:线程中断和终止

      线程中断是线程的标志位属性。而不是真正终止线程,和线程的状态无关。线程中断过程表示一个运行中的线程,通过其他线程调用了该线程的 interrupt() 方法,使得该线程中断标志位属性改变。

      2024-06-27 09:21:24
      多线程同步
      2024-06-27 09:21:24

      泥瓦匠聊并发编程:线程与多线程必知必会(基础篇)

      线程(Thread)是一个对象(Object)。Java 线程(也称 JVM 线程)是 Java 进程内允许多个同时进行的任务。该进程内并发的任务成为线程(Thread),一个进程里至少一个线程。

      2024-06-27 09:21:24
      多线程 , 多线程同步
      2023-03-21 10:32:27

      非阻塞算法

      在并发上下文中,非阻塞算法是一种允许线程在阻塞其他线程的情况下访问共享状态的算法。在绝大多数项目中,在算法中如果一个线程的挂起没有导致其它的线程挂起,我们就说这个算法是非阻塞的。为了

      2023-03-21 10:32:27
      数据结构 , 多线程同步
      2023-03-21 10:32:10

      Java内存模型

      Java内存模型规范了Java虚拟机与计算机内存是如何协同工作的。Java虚拟机是一个完整的计算机的一个模型,因此这个模型自然也包含一个内存模型——又称为Java内存模型。如果你想设计表

      2023-03-21 10:32:10
      java在线 , 多线程同步
      2023-03-21 10:32:09

      LockSupport 源码阅读

      在java中,要让线程等待最普通的方法是调用Object.wait()方法,Causes the current thread to wait until another thread invokes the notify() method

      2023-03-21 10:32:09
      多线程同步
      2023-03-21 10:31:48

      并发队列-无界阻塞延迟队列DelayQueue原理探究

      一、前言DelayQueue队列中每个元素都有个过期时间,并且队列是个优先级队列,当从队列获取元素时候,只有过期元素才会出队列。二、 DelayQueue类图结构如图DelayQueue中内部使用的是PriorityQueue存放数据,使用

      2023-03-21 10:31:48
      多线程同步
      2023-03-21 10:31:48

      《 Java并发编程从入门到精通》 Java线程池的监控

      本文是《 Java并发编程从入门到精通》第9章 线程的监控及其日常工作中如何分析的9.1节 Java线程池的监控。 看不到不等于不存在!让我们来看看工作中是如何找问题解决问题的。

      2023-03-21 10:31:48
      多线程同步
      2023-03-21 10:31:48

      Java构造器必知必会

      在面向对象编程中,编程人员应该在意“资源”。比如 在代码中,我们很在意在内存中String类型的hello,它是有一个生命周期的。在它生命周期中,初始化(initializ

      2023-03-21 10:31:48
      the public , 多线程同步
      2023-03-21 10:31:48

      《Java并发编程从入门到精通》显示锁Lock和ReentrantLock

      显示锁Lock和ReentrantLockLock是一个接口提供了无条件的、可轮询的、定时的、可中断的锁获取操作,所有加锁和解锁的方法都是显式的。包路径是:java.util.

      2023-03-21 10:31:48
      多线程同步 , the public
      查看更多
      推荐标签

      作者介绍

      东哥玩云
      天翼云用户

      文章

      10

      阅读量

      4470

      查看更多

      最新文章

      JDK的sql设计不合理导致的驱动类初始化死锁问题

      2024-09-24 06:30:42

      泥瓦匠聊并发编程:线程与多线程必知必会(基础篇)

      2024-06-27 09:21:24

      泥瓦匠聊并发编程基础篇:线程中断和终止

      2024-06-27 09:21:24

      非阻塞算法

      2023-03-21 10:32:27

      Java内存模型

      2023-03-21 10:32:10

      LockSupport 源码阅读

      2023-03-21 10:32:09

      查看更多

      热门文章

      Java内存模型

      2023-03-21 10:32:10

      Java IO: FileReader和FileWriter

      2022-11-08 07:33:31

      Oracle官方并发教程之活跃度

      2022-11-08 07:33:31

      Java IO: 流

      2022-11-08 07:35:02

      Java踩坑记系列之线程池

      2023-03-16 08:48:22

      非阻塞算法

      2023-03-21 10:32:27

      查看更多

      热门标签

      java Java python 编程开发 代码 开发语言 算法 线程 Python html 数组 C++ 元素 javascript c++
      查看更多

      相关产品

      弹性云主机

      随时自助获取、弹性伸缩的云服务器资源

      天翼云电脑(公众版)

      便捷、安全、高效的云电脑服务

      对象存储

      高品质、低成本的云上存储服务

      云硬盘

      为云上计算资源提供持久性块存储

      查看更多

      随机文章

      Java并发之AQS详解

      泥瓦匠聊并发编程基础篇:线程中断和终止

      Java Volatile关键字

      Java IO: 流

      并发队列-无界阻塞延迟队列DelayQueue原理探究

      浅谈AutoCloseable接口

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2025 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号