MemoryTracker模块位于ClickHouse\dbms\src\Common目录下,文件为:
MemoryTracker.h和MemoryTracker.cpp
如MemoryTracker.h中描述:
/** Tracks memory consumption.
* It throws an exception if amount of consumed memory become greater than certain limit.
* The same memory tracker could be simultaneously used in different threads.
*/
用于跟踪内存消耗。
如果消耗的内存量超过某个限制,则抛出异常。
同一个内存跟踪器可以在不同的线程中同时使用。
class MemoryTracker
{
std::atomic<Int64> amount {0};
std::atomic<Int64> peak {0};
std::atomic<Int64> hard_limit {0};
std::atomic<Int64> profiler_limit {0};
Int64 profiler_step = 0;
/// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.
//为了测试调用代码的异常安全性,内存跟踪器以指定的频率对每个内存分配抛出异常。
double fault_probability = 0;
/// Singly-linked list. All information will be passed to subsequent memory trackers also (it allows to implement trackers hierarchy).
//单链表。所有信息也将传递给后续的内存跟踪器(它允许实现跟踪器层次结构)。
/// In terms of tree nodes it is the list of parents. Lifetime of these trackers should "include" lifetime of current tracker.
就树节点而言,它是父节点的列表。这些跟踪器的生命周期应该包括当前跟踪器的生命周期。
std::atomic<MemoryTracker *> parent {};
/// You could specify custom metric to track memory usage.
//您可以指定自定义指标来跟踪内存使用情况。
CurrentMetrics::Metric metric = CurrentMetrics::end();
/// This description will be used as prefix into log messages (if isn't nullptr)
//不是nullptr时,该描述将用做日志消息的前缀。
const char * description = nullptr;
public:
MemoryTracker(VariableContext level_ = VariableContext::Thread) : level(level_) {}
MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread) : parent(parent_), level(level_) {}
~MemoryTracker();
VariableContext level;
/** Call the following functions before calling of corresponding operations with memory allocators.
*/
//在使用内存分配器调用相应的操作之前,请先调用以下函数
void alloc(Int64 size);
void realloc(Int64 old_size, Int64 new_size)
{
Int64 addition = new_size - old_size;
if (addition > 0)
alloc(addition);
else
free(-addition);
}
/** This function should be called after memory deallocation.
*/
//此函数应在内存释放后调用。
void free(Int64 size);
Int64 get() const
{
return amount.load(std::memory_order_relaxed);
}
Int64 getPeak() const
{
return peak.load(std::memory_order_relaxed);
}
/** Set limit if it was not set.
* Otherwise, set limit to new value, if new value is greater than previous limit.
*/
//如果没有设置limit,请设置limit。否则,如果新的值大于之前的值,则将limit设置为新值。
void setOrRaiseHardLimit(Int64 value);
void setOrRaiseProfilerLimit(Int64 value);
void setFaultProbability(double value)
{
fault_probability = value;
}
void setProfilerStep(Int64 value)
{
profiler_step = value;
}
/// next should be changed only once: from nullptr to some value.
/// NOTE: It is not true in MergeListElement
//next只应更改一次:从nullptr更改为某个值
//注意:在MergeListElement中不是这样的
void setParent(MemoryTracker * elem)
{
parent.store(elem, std::memory_order_relaxed);
}
MemoryTracker * getParent()
{
return parent.load(std::memory_order_relaxed);
}
/// The memory consumption could be shown in realtime via CurrentMetrics counter
//内存消耗可以通过CurrentMetrics计数器实时显示。
void setMetric(CurrentMetrics::Metric metric_)
{
metric = metric_;
}
void setDescription(const char * description_)
{
description = description_;
}
/// Reset the accumulated data
//重置累积数据
void resetCounters();
/// Reset the accumulated data and the parent.
//重置累积数据和父节点数据
void reset();
/// Prints info about peak memory consumption into log.
//将有关内存消耗峰值的信息打印到日志中
void logPeakMemoryUsage() const;
/// To be able to temporarily stop memory tracker
//能够暂时停止内存跟踪器
DB::SimpleActionBlocker blocker;
};
/// Convenience methods, that use current thread's memory_tracker if it is available.
//如果可能的话,更简单的方法是使用当前线程的内存跟踪器
namespace CurrentMemoryTracker
{
void alloc(Int64 size);
void realloc(Int64 old_size, Int64 new_size);
void free(Int64 size);
}
/// Holding this object will temporarily disable memory tracking.
//保存此对象将暂时禁用内存跟踪
DB::SimpleActionLock getCurrentMemoryTrackerActionLock();
#include "MemoryTracker.h"
#include <IO/WriteHelpers.h>
#include "Common/TraceCollector.h"
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <common/logger_useful.h>
#include <atomic>
#include <cmath>
#include <cstdlib>
namespace DB
{
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
}
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
/// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters.
//每个线程都可以在(-untracked_memory_limit,untracked_memory_limit)范围内新建/删除内存,而无需访问公共计数器。
static constexpr Int64 untracked_memory_limit = 4 * 1024 * 1024;
MemoryTracker::~MemoryTracker()
{
if (static_cast<int>(level) < static_cast<int>(VariableContext::Process) && peak)
{
try
{
logPeakMemoryUsage();
}
catch (...)
{
/// Exception in Logger, intentionally swallow.
}
}
/** This is needed for next memory tracker to be consistent with sum of all referring memory trackers.
*下一个内存跟踪器需要和所有引用的内存跟踪器的总和一致
* Sometimes, memory tracker could be destroyed before memory was freed, and on destruction, amount > 0.
* For example, a query could allocate some data and leave it in cache.
*有时内存跟踪器可能在释放内存之前被销毁,销毁时,数量>0。例如,查询可以分配一些数据并将其保留在内存中。
* If memory will be freed outside of context of this memory tracker,
* but in context of one of the 'next' memory trackers,
* then memory usage of 'next' memory trackers will be underestimated,
* because amount will be decreased twice (first - here, second - when real 'free' happens).
*/如果内存在当前内存跟踪器的上下文之外被释放,比如在下一个内存跟踪器中被释放,下一个内存跟踪器的内存使用量将被低估,
//因为内存使用量将被减少两次(第一次在这里,第二次在内存真正释放时)
if (auto value = amount.load(std::memory_order_relaxed))
free(value);
}
void MemoryTracker::logPeakMemoryUsage() const
{
LOG_DEBUG(&Logger::get("MemoryTracker"),
"Peak memory usage" << (description ? " " + std::string(description) : "")
<< ": " << formatReadableSizeWithBinarySuffix(peak) << ".");
}
static void logMemoryUsage(Int64 amount)
{
LOG_DEBUG(&Logger::get("MemoryTracker"),
"Current memory usage: " << formatReadableSizeWithBinarySuffix(amount) << ".");
}
void MemoryTracker::alloc(Int64 size)
{
if (blocker.isCancelled())
return;
/** Using memory_order_relaxed means that if allocations are done simultaneously,
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* So, we allow over-allocations.
*/使用memory_order_relaxed,意味着当并发申请内存时,超出内存限制的异常会在下一次申请内存时抛出来,
//所以我们允许超额分配。
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);
if (metric != CurrentMetrics::end())
CurrentMetrics::add(metric, size);
Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);
/// Using non-thread-safe random number generator. Joint distribution in different threads would not be uniform.
/// In this case, it doesn't matter.
if (unlikely(fault_probability && drand48() < fault_probability))
{
free(size);
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
auto untrack_lock = blocker.cancel(); // NOLINT
std::stringstream message;
message << "Memory tracker";
if (description)
message << " " << description;
message << ": fault injected. Would use " << formatReadableSizeWithBinarySuffix(will_be)
<< " (attempt to allocate chunk of " << size << " bytes)"
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{
auto no_track = blocker.cancel();
DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
}
if (unlikely(current_hard_limit && will_be > current_hard_limit))
{
free(size);
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
auto untrack_lock = blocker.cancel(); // NOLINT
std::stringstream message;
message << "Memory limit";
if (description)
message << " " << description;
message << " exceeded: would use " << formatReadableSizeWithBinarySuffix(will_be)
<< " (attempt to allocate chunk of " << size << " bytes)"
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}
auto peak_old = peak.load(std::memory_order_relaxed);
if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth.
{
peak.store(will_be, std::memory_order_relaxed);
if (level == VariableContext::Process && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every)
logMemoryUsage(will_be);
}
if (auto loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->alloc(size);
}
void MemoryTracker::free(Int64 size)
{
if (blocker.isCancelled())
return;
if (level == VariableContext::Thread)
{
/// Could become negative if memory allocated in this thread is freed in another one
amount.fetch_sub(size, std::memory_order_relaxed);
}
else
{
Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size;
/** Sometimes, query could free some data, that was allocated outside of query context.
* Example: cache eviction.
* To avoid negative memory usage, we "saturate" amount.
* Memory usage will be calculated with some error.
* NOTE: The code is not atomic. Not worth to fix.
*/
if (unlikely(new_amount < 0))
{
amount.fetch_sub(new_amount);
size += new_amount;
}
}
if (auto loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->free(size);
if (metric != CurrentMetrics::end())
CurrentMetrics::sub(metric, size);
}
void MemoryTracker::resetCounters()
{
amount.store(0, std::memory_order_relaxed);
peak.store(0, std::memory_order_relaxed);
hard_limit.store(0, std::memory_order_relaxed);
profiler_limit.store(0, std::memory_order_relaxed);
}
void MemoryTracker::reset()
{
if (metric != CurrentMetrics::end())
CurrentMetrics::sub(metric, amount.load(std::memory_order_relaxed));
resetCounters();
}
void MemoryTracker::setOrRaiseHardLimit(Int64 value)
{
/// This is just atomic set to maximum.
Int64 old_value = hard_limit.load(std::memory_order_relaxed);
while (old_value < value && !hard_limit.compare_exchange_weak(old_value, value))
;
}
void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
{
Int64 old_value = profiler_limit.load(std::memory_order_relaxed);
while (old_value < value && !profiler_limit.compare_exchange_weak(old_value, value))
;
}
namespace CurrentMemoryTracker
{
void alloc(Int64 size)
{
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
{
Int64 & untracked = DB::CurrentThread::getUntrackedMemory();
untracked += size;
if (untracked > untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = untracked;
untracked = 0;
memory_tracker->alloc(tmp);
}
}
}
void realloc(Int64 old_size, Int64 new_size)
{
Int64 addition = new_size - old_size;
addition > 0 ? alloc(addition) : free(-addition);
}
void free(Int64 size)
{
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
{
Int64 & untracked = DB::CurrentThread::getUntrackedMemory();
untracked -= size;
if (untracked < -untracked_memory_limit)
{
memory_tracker->free(-untracked);
untracked = 0;
}
}
}
}
DB::SimpleActionLock getCurrentMemoryTrackerActionLock()
{
auto memory_tracker = DB::CurrentThread::getMemoryTracker();
if (!memory_tracker)
return {};
return memory_tracker->blocker.cancel();
}