Clickhouse负载均衡客户端BalancedClickhouseDataSource源码分析

文章目录

BalancedClickhouseDataSource源码分析

BalancedClickhouseDataSource的完整路径是ru.yandex.clickhouse.BalancedClickhouseDataSource,源码主要包括三部分,构造方法、获取连接、以及生成可用的地址列表。

BalancedClickhouseDataSource实现了javax.sql.DataSource,参数中allUrls是构造方法中传入的地址列表,enabledUrls是可用的地址列表。

public class BalancedClickhouseDataSource implements javax.sql.DataSource {
    private final ThreadLocal<Random> randomThreadLocal = new ThreadLocal<Random>();
    private final List<String> allUrls;
    private volatile List<String> enabledUrls;
}

BalancedClickhouseDataSource的构造方法,有多个,但是最终调用的都是BalancedClickhouseDataSource(final List urls, ClickHouseProperties properties),如果像jdbc:clickhouse://10.170.4.81:8123,10.170.4.82:8123,10.170.4.83:8123,10.170.4.84:8123/datasets这样配置多个地址,则会先进行切分。拆分成像jdbc:clickhouse://10.170.4.81:8123/datasets、jdbc:clickhouse://10.170.4.82:8123/datasets多个地址。

public BalancedClickhouseDataSource(final String url, Properties properties) {
        this(splitUrl(url), new ClickHouseProperties(properties));
    }
    
    static List<String> splitUrl(final String url) {
        Matcher m = URL_TEMPLATE.matcher(url);
        if (!m.matches()) {
            throw new IllegalArgumentException("Incorrect url");
        }
        String database = m.group(2);
        if (database == null) {
            database = "";
        }
        String[] hosts = m.group(1).split(",");
        final List<String> result = new ArrayList<String>(hosts.length);
        for (final String host : hosts) {
            result.add(JDBC_CLICKHOUSE_PREFIX + "//" + host + database);
        }
        return result;
    }

    private BalancedClickhouseDataSource(final List<String> urls, ClickHouseProperties properties) {
        if (urls.isEmpty()) {
            throw new IllegalArgumentException("Incorrect ClickHouse jdbc url list. It must be not empty");
        }

        try {
            ClickHouseProperties localProperties = ClickhouseJdbcUrlParser.parse(urls.get(0), properties.asProperties());
            localProperties.setHost(null);
            localProperties.setPort(-1);

            this.properties = localProperties;
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException(e);
        }


        List<String> allUrls = new ArrayList<String>(urls.size());
        for (final String url : urls) {
            try {
                if (driver.acceptsURL(url)) {
                    allUrls.add(url);
                } else {
                    log.error("that url is has not correct format: {}", url);
                }
            } catch (SQLException e) {
                throw new IllegalArgumentException("error while checking url: " + url, e);
            }
        }

        if (allUrls.isEmpty()) {
            throw new IllegalArgumentException("there are no correct urls");
        }

        this.allUrls = Collections.unmodifiableList(allUrls);
        this.enabledUrls = this.allUrls;
    }

初始化完成后,会提供getConnection()方法获取连接,获取连接时会通过getAnyUrl()方法从enabledUrls可用列表中随机获取一个可用的连接。

   @Override
    public ClickHouseConnection getConnection() throws SQLException {
        return driver.connect(getAnyUrl(), properties);
    }
    
    private String getAnyUrl() throws SQLException {
        List<String> localEnabledUrls = enabledUrls;
        if (localEnabledUrls.isEmpty()) {
            throw new SQLException("Unable to get connection: there are no enabled urls");
        }
        Random random = this.randomThreadLocal.get();
        if (random == null) {
            this.randomThreadLocal.set(new Random());
            random = this.randomThreadLocal.get();
        }

        int index = random.nextInt(localEnabledUrls.size());
        return localEnabledUrls.get(index);
    }

最后说一下可用地址列表的获取,scheduleActualization()方法会启动一个线程定时去调用actualize()方法检测可用列表。actualize()方法时通过执行SELECT查询SELECT 1去测试节点是否可用。

/**
     * set time period for checking availability connections
     *
     * @param delay    value for time unit
     * @param timeUnit time unit for checking
     * @return this datasource with changed settings
     */
    public BalancedClickhouseDataSource scheduleActualization(int delay, TimeUnit timeUnit) {
        ClickHouseDriver.ScheduledConnectionCleaner.INSTANCE.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    actualize();
                } catch (Exception e) {
                    log.error("Unable to actualize urls", e);
                }
            }
        }, 0, delay, timeUnit);

        return this;
    }
    
    /**
     * Checks if clickhouse on url is alive, if it isn't, disable url, else enable.
     *
     * @return number of avaliable clickhouse urls
     */
    public synchronized int actualize() {
        List<String> enabledUrls = new ArrayList<String>(allUrls.size());

        for (String url : allUrls) {
            log.debug("Pinging disabled url: {}", url);
            if (ping(url)) {
                log.debug("Url is alive now: {}", url);
                enabledUrls.add(url);
            } else {
                log.debug("Url is dead now: {}", url);
            }
        }

        this.enabledUrls = Collections.unmodifiableList(enabledUrls);
        return enabledUrls.size();
    }
    
    private boolean ping(final String url) {
        try {
            driver.connect(url, properties).createStatement().execute("SELECT 1");
            return true;
        } catch (Exception e) {
            return false;
        }
    }

结论

Clickhouse-jdbc是使用负载均衡客户端ru.yandex.clickhouse.BalancedClickhouseDataSource来保证的,本质上是通过后台启动一个线程定时去探测clickhouse服务端,生成可用的地址列表。然后获取连接的时候从可用地址列表中随机选择一个节点来建立连接。

但是,坑爹的是,scheduleActualization方法没有地方调用,也就是说必须手动调用,否则,即使你配置了多个地址,如果某个节点宕机,仍然后很大的概率建立连接失败。

最后,BalancedClickhouseDataSource仅仅保证大部分情况下连接可用,根据ping的频率和超时时间的不同,总有一小段时间不能保证可用地址列表中所有地址都可用。因此想实现故障转移,保证高可用,还必须有客户端的配合,最好增加重试机制。

上一篇:django 静态文件及路由分发


下一篇:linux下压力测试工具http_load