CLickhouse MemoryTracker模块详解

MemoryTracker模块位于ClickHouse\dbms\src\Common目录下,文件为:
MemoryTracker.h和MemoryTracker.cpp

如MemoryTracker.h中描述:

/** Tracks memory consumption.
* It throws an exception if amount of consumed memory become greater than certain limit.
* The same memory tracker could be simultaneously used in different threads.
*/

用于跟踪内存消耗。

如果消耗的内存量超过某个限制,则抛出异常。

同一个内存跟踪器可以在不同的线程中同时使用。

class MemoryTracker
{
std::atomic<Int64> amount {0};
std::atomic<Int64> peak {0};
std::atomic<Int64> hard_limit {0};
std::atomic<Int64> profiler_limit {0};

Int64 profiler_step = 0;

/// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.

//为了测试调用代码的异常安全性,内存跟踪器以指定的频率对每个内存分配抛出异常。
double fault_probability = 0;

/// Singly-linked list. All information will be passed to subsequent memory trackers also (it allows to implement trackers hierarchy).

//单链表。所有信息也将传递给后续的内存跟踪器(它允许实现跟踪器层次结构)。
/// In terms of tree nodes it is the list of parents. Lifetime of these trackers should "include" lifetime of current tracker.

就树节点而言,它是父节点的列表。这些跟踪器的生命周期应该包括当前跟踪器的生命周期。
std::atomic<MemoryTracker *> parent {};

/// You could specify custom metric to track memory usage.

//您可以指定自定义指标来跟踪内存使用情况。
CurrentMetrics::Metric metric = CurrentMetrics::end();

/// This description will be used as prefix into log messages (if isn't nullptr)

//不是nullptr时,该描述将用做日志消息的前缀。
const char * description = nullptr;

public:
MemoryTracker(VariableContext level_ = VariableContext::Thread) : level(level_) {}
MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread) : parent(parent_), level(level_) {}

~MemoryTracker();

VariableContext level;

/** Call the following functions before calling of corresponding operations with memory allocators.
*/

//在使用内存分配器调用相应的操作之前,请先调用以下函数
void alloc(Int64 size);

void realloc(Int64 old_size, Int64 new_size)
{
Int64 addition = new_size - old_size;
if (addition > 0)
alloc(addition);
else
free(-addition);
}

/** This function should be called after memory deallocation.
*/

//此函数应在内存释放后调用。
void free(Int64 size);

Int64 get() const
{
return amount.load(std::memory_order_relaxed);
}

Int64 getPeak() const
{
return peak.load(std::memory_order_relaxed);
}

/** Set limit if it was not set.
* Otherwise, set limit to new value, if new value is greater than previous limit.
*/

//如果没有设置limit,请设置limit。否则,如果新的值大于之前的值,则将limit设置为新值。
void setOrRaiseHardLimit(Int64 value);
void setOrRaiseProfilerLimit(Int64 value);

void setFaultProbability(double value)
{
fault_probability = value;
}

void setProfilerStep(Int64 value)
{
profiler_step = value;
}

/// next should be changed only once: from nullptr to some value.
/// NOTE: It is not true in MergeListElement

//next只应更改一次:从nullptr更改为某个值

//注意:在MergeListElement中不是这样的
void setParent(MemoryTracker * elem)
{
parent.store(elem, std::memory_order_relaxed);
}

MemoryTracker * getParent()
{
return parent.load(std::memory_order_relaxed);
}

/// The memory consumption could be shown in realtime via CurrentMetrics counter

//内存消耗可以通过CurrentMetrics计数器实时显示。
void setMetric(CurrentMetrics::Metric metric_)
{
metric = metric_;
}

void setDescription(const char * description_)
{
description = description_;
}

/// Reset the accumulated data

//重置累积数据
void resetCounters();

/// Reset the accumulated data and the parent.

//重置累积数据和父节点数据
void reset();

/// Prints info about peak memory consumption into log.

//将有关内存消耗峰值的信息打印到日志中
void logPeakMemoryUsage() const;

/// To be able to temporarily stop memory tracker

//能够暂时停止内存跟踪器
DB::SimpleActionBlocker blocker;
};


/// Convenience methods, that use current thread's memory_tracker if it is available.

//如果可能的话,更简单的方法是使用当前线程的内存跟踪器
namespace CurrentMemoryTracker
{
void alloc(Int64 size);
void realloc(Int64 old_size, Int64 new_size);
void free(Int64 size);
}


/// Holding this object will temporarily disable memory tracking.

//保存此对象将暂时禁用内存跟踪
DB::SimpleActionLock getCurrentMemoryTrackerActionLock();

 

 

#include "MemoryTracker.h"

#include <IO/WriteHelpers.h>
#include "Common/TraceCollector.h"
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <common/logger_useful.h>

#include <atomic>
#include <cmath>
#include <cstdlib>


namespace DB
{
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
}


static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
/// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters.

//每个线程都可以在(-untracked_memory_limit,untracked_memory_limit)范围内新建/删除内存,而无需访问公共计数器。
static constexpr Int64 untracked_memory_limit = 4 * 1024 * 1024;


MemoryTracker::~MemoryTracker()
{
if (static_cast<int>(level) < static_cast<int>(VariableContext::Process) && peak)
{
try
{
logPeakMemoryUsage();
}
catch (...)
{
/// Exception in Logger, intentionally swallow.
}
}

/** This is needed for next memory tracker to be consistent with sum of all referring memory trackers.
*下一个内存跟踪器需要和所有引用的内存跟踪器的总和一致
* Sometimes, memory tracker could be destroyed before memory was freed, and on destruction, amount > 0.
* For example, a query could allocate some data and leave it in cache.
*有时内存跟踪器可能在释放内存之前被销毁,销毁时,数量>0。例如,查询可以分配一些数据并将其保留在内存中。
* If memory will be freed outside of context of this memory tracker,
* but in context of one of the 'next' memory trackers,
* then memory usage of 'next' memory trackers will be underestimated,
* because amount will be decreased twice (first - here, second - when real 'free' happens).
*/如果内存在当前内存跟踪器的上下文之外被释放,比如在下一个内存跟踪器中被释放,下一个内存跟踪器的内存使用量将被低估,

//因为内存使用量将被减少两次(第一次在这里,第二次在内存真正释放时)

if (auto value = amount.load(std::memory_order_relaxed))
free(value);
}


void MemoryTracker::logPeakMemoryUsage() const
{
LOG_DEBUG(&Logger::get("MemoryTracker"),
"Peak memory usage" << (description ? " " + std::string(description) : "")
<< ": " << formatReadableSizeWithBinarySuffix(peak) << ".");
}

static void logMemoryUsage(Int64 amount)
{
LOG_DEBUG(&Logger::get("MemoryTracker"),
"Current memory usage: " << formatReadableSizeWithBinarySuffix(amount) << ".");
}


void MemoryTracker::alloc(Int64 size)
{
if (blocker.isCancelled())
return;

/** Using memory_order_relaxed means that if allocations are done simultaneously,
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* So, we allow over-allocations.
*/使用memory_order_relaxed,意味着当并发申请内存时,超出内存限制的异常会在下一次申请内存时抛出来,

//所以我们允许超额分配。
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);

if (metric != CurrentMetrics::end())
CurrentMetrics::add(metric, size);

Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);

/// Using non-thread-safe random number generator. Joint distribution in different threads would not be uniform.
/// In this case, it doesn't matter.
if (unlikely(fault_probability && drand48() < fault_probability))
{
free(size);

/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
auto untrack_lock = blocker.cancel(); // NOLINT

std::stringstream message;
message << "Memory tracker";
if (description)
message << " " << description;
message << ": fault injected. Would use " << formatReadableSizeWithBinarySuffix(will_be)
<< " (attempt to allocate chunk of " << size << " bytes)"
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);

throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}

if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{
auto no_track = blocker.cancel();
DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
}

if (unlikely(current_hard_limit && will_be > current_hard_limit))
{
free(size);

/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
auto untrack_lock = blocker.cancel(); // NOLINT

std::stringstream message;
message << "Memory limit";
if (description)
message << " " << description;
message << " exceeded: would use " << formatReadableSizeWithBinarySuffix(will_be)
<< " (attempt to allocate chunk of " << size << " bytes)"
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);

throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}

auto peak_old = peak.load(std::memory_order_relaxed);
if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth.
{
peak.store(will_be, std::memory_order_relaxed);

if (level == VariableContext::Process && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every)
logMemoryUsage(will_be);
}

if (auto loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->alloc(size);
}


void MemoryTracker::free(Int64 size)
{
if (blocker.isCancelled())
return;

if (level == VariableContext::Thread)
{
/// Could become negative if memory allocated in this thread is freed in another one
amount.fetch_sub(size, std::memory_order_relaxed);
}
else
{
Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size;

/** Sometimes, query could free some data, that was allocated outside of query context.
* Example: cache eviction.
* To avoid negative memory usage, we "saturate" amount.
* Memory usage will be calculated with some error.
* NOTE: The code is not atomic. Not worth to fix.
*/
if (unlikely(new_amount < 0))
{
amount.fetch_sub(new_amount);
size += new_amount;
}
}

if (auto loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->free(size);

if (metric != CurrentMetrics::end())
CurrentMetrics::sub(metric, size);
}


void MemoryTracker::resetCounters()
{
amount.store(0, std::memory_order_relaxed);
peak.store(0, std::memory_order_relaxed);
hard_limit.store(0, std::memory_order_relaxed);
profiler_limit.store(0, std::memory_order_relaxed);
}


void MemoryTracker::reset()
{
if (metric != CurrentMetrics::end())
CurrentMetrics::sub(metric, amount.load(std::memory_order_relaxed));

resetCounters();
}


void MemoryTracker::setOrRaiseHardLimit(Int64 value)
{
/// This is just atomic set to maximum.
Int64 old_value = hard_limit.load(std::memory_order_relaxed);
while (old_value < value && !hard_limit.compare_exchange_weak(old_value, value))
;
}


void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
{
Int64 old_value = profiler_limit.load(std::memory_order_relaxed);
while (old_value < value && !profiler_limit.compare_exchange_weak(old_value, value))
;
}


namespace CurrentMemoryTracker
{
void alloc(Int64 size)
{
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
{
Int64 & untracked = DB::CurrentThread::getUntrackedMemory();
untracked += size;
if (untracked > untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = untracked;
untracked = 0;
memory_tracker->alloc(tmp);
}
}
}

void realloc(Int64 old_size, Int64 new_size)
{
Int64 addition = new_size - old_size;
addition > 0 ? alloc(addition) : free(-addition);
}

void free(Int64 size)
{
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
{
Int64 & untracked = DB::CurrentThread::getUntrackedMemory();
untracked -= size;
if (untracked < -untracked_memory_limit)
{
memory_tracker->free(-untracked);
untracked = 0;
}
}
}
}

DB::SimpleActionLock getCurrentMemoryTrackerActionLock()
{
auto memory_tracker = DB::CurrentThread::getMemoryTracker();
if (!memory_tracker)
return {};
return memory_tracker->blocker.cancel();
}

上一篇:php后端实现分页查询


下一篇:mysql limit的优化