java并发实战:连接池实现

栏目: 编程工具 · 发布时间: 5年前

内容简介:在我们使用数据库的过程中,我们往往使用数据库连接池而不是直接使用数据库连接进行操作,这是因为每一个数据库连接的创建和销毁的代价是昂贵的,而池化技术则预先创建了资源,这些资源是可复用的,这样就保证了在多用户情况下只能使用指定数目的资源,避免了一个用户创建一个连接资源,造成程序运行开销过大。这里只实现一个简易的连接池,更多复杂的需求可根据该连接池进行改进,该连接池主要参数如下:程序流程图如下:

池化技术简介

在我们使用数据库的过程中,我们往往使用数据库连接池而不是直接使用数据库连接进行操作,这是因为每一个数据库连接的创建和销毁的代价是昂贵的,而池化技术则预先创建了资源,这些资源是可复用的,这样就保证了在多用户情况下只能使用指定数目的资源,避免了一个用户创建一个连接资源,造成程序运行开销过大。

连接池实现原理

这里只实现一个简易的连接池,更多复杂的需求可根据该连接池进行改进,该连接池主要参数如下:

  1. 一个繁忙队列busy
  2. 一个空闲队列idle
  3. 连接池最大活动连接数maxActive
  4. 连接池最大等待时间maxWait
  5. 连接池的活动连接数activeSize

程序流程图如下:

java并发实战:连接池实现

代码实现

泛型接口ConnectionPool.java

public interface ConnectionPool<T> {

    /**
     * 初始化池资源
     * @param maxActive 池中最大活动连接数
     * @param maxWait 最大等待时间
     */
    void init(Integer maxActive, Long maxWait);

    /**
     * 从池中获取资源
     * @return 连接资源
     */
    T getResource() throws Exception;

    /**
     * 释放连接
     * @param connection 正在使用的连接
     */
    void release(T connection) throws Exception;

    /**
     * 释放连接池资源
     */
    void close();


}

以zookeeper为例,实现zookeeper连接池,ZookeeperConnectionPool.java

public class ZookeeperConnectionPool implements ConnectionPool<ZooKeeper> {
    //最大活动连接数
    private Integer maxActive; 
    //最大等待时间
    private Long maxWait; 
    //空闲队列
    private LinkedBlockingQueue<ZooKeeper> idle = new LinkedBlockingQueue<>();
    //繁忙队列
    private LinkedBlockingQueue<ZooKeeper> busy = new LinkedBlockingQueue<>();
    //连接池活动连接数
    private AtomicInteger activeSize = new AtomicInteger(0);
    //连接池关闭标记
    private AtomicBoolean isClosed = new AtomicBoolean(false);
    //总共获取的连接记数
    private AtomicInteger createCount = new AtomicInteger(0);
    //等待zookeeper客户端创建完成的计数器
    private static ThreadLocal<CountDownLatch> latchThreadLocal = ThreadLocal.withInitial(() -> new CountDownLatch(1));

    public ZookeeperConnectionPool(Integer maxActive, Long maxWait) {
        this.init(maxActive, maxWait);
    }

    @Override
    public void init(Integer maxActive, Long maxWait) {
        this.maxActive = maxActive;
        this.maxWait = maxWait;
    }

    @Override
    public ZooKeeper getResource() throws Exception {
        ZooKeeper zooKeeper;
        Long nowTime = System.currentTimeMillis();
        final CountDownLatch countDownLatch = latchThreadLocal.get();
        
        //空闲队列idle是否有连接
        if ((zooKeeper = idle.poll()) == null) {
            //判断池中连接数是否小于maxActive
            if (activeSize.get() < maxActive) {
                //先增加池中连接数后判断是否小于等于maxActive
                if (activeSize.incrementAndGet() <= maxActive) {
                    //创建zookeeper连接
                    zooKeeper = new ZooKeeper("localhost", 5000, (watch) -> {
                        if (watch.getState() == Watcher.Event.KeeperState.SyncConnected) {
                            countDownLatch.countDown();
                        }
                    });
                    countDownLatch.await();
                    System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接:" + createCount.incrementAndGet() + "条");
                    busy.offer(zooKeeper);
                    return zooKeeper;
                } else {
                    //如增加后发现大于maxActive则减去增加的
                    activeSize.decrementAndGet();
                }
            }
            //若活动线程已满则等待busy队列释放连接
            try {
                System.out.println("Thread:" + Thread.currentThread().getId() + "等待获取空闲资源");
                Long waitTime = maxWait - (System.currentTimeMillis() - nowTime);
                zooKeeper = idle.poll(waitTime, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                throw new Exception("等待异常");
            }
            //判断是否超时
            if (zooKeeper != null) {
                System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接:" + createCount.incrementAndGet() + "条");
                busy.offer(zooKeeper);
                return zooKeeper;
            } else {
                System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接超时,请重试!");
                throw new Exception("Thread:" + Thread.currentThread().getId() + "获取连接超时,请重试!");
            }
        }
        //空闲队列有连接,直接返回
        busy.offer(zooKeeper);
        return zooKeeper;
    }

    @Override
    public void release(ZooKeeper connection) throws Exception {
        if (connection == null) {
            System.out.println("connection 为空");
            return;
        }
        if (busy.remove(connection)){
            idle.offer(connection);
        } else {
            activeSize.decrementAndGet();
            throw new Exception("释放失败");
        }
    }

    @Override
    public void close() {
        if (isClosed.compareAndSet(false, true)) {
            idle.forEach((zooKeeper) -> {
                try {
                    zooKeeper.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            busy.forEach((zooKeeper) -> {
                try {
                    zooKeeper.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

测试用例

这里创建20个线程并发测试连接池,Test.java

public class Test {

    public static void main(String[] args) throws Exception {
        int threadCount = 20;
        Integer maxActive = 10;
        Long maxWait = 10000L;
        ZookeeperConnectionPool pool = new ZookeeperConnectionPool(maxActive, maxWait);
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                countDownLatch.countDown();
                try {
                    countDownLatch.await();
                    ZooKeeper zooKeeper = pool.getResource();
                    Thread.sleep(2000);
                    pool.release(zooKeeper);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }).start();
        }
        while (true){

        }
    }
}

以上所述就是小编给大家介绍的《java并发实战:连接池实现》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

JavaScript设计模式

JavaScript设计模式

Ross Harmes、Dustin Diaz / 谢廷晟 / 人民邮电出版社 / 2008 / 45.00元

本书共有两部分。第一部分给出了实现具体设计模式所需要的面向对象特性的基础知识,主要包括接口、封装和信息隐藏、继承、单体模式等内容。第二部分则专注于各种具体的设计模式及其在JavaScript语言中的应用,主要介绍了工厂模式、桥接模式、组合模式、门面模式等几种常见的模式。为了让每一章中的示例都尽可能地贴近实际应用,书中同时列举了一些JavaScript 程序员最常见的任务,然后运用设计模式使其解决方......一起来看看 《JavaScript设计模式》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具