关于
深度域适应中,有一类方法是实现目标域和源域的特征对齐,特征对齐的衡量函数主要包括MMD,MK-MMD,A-distance,CORAL loss, Wasserstein distance等等。本文总结了常用的特征变换对齐的函数定义。
工具
Python
方法实现
MMD 多核函数定义
# Compute MMD (maximum mean discrepancy) using numpy and scikit-learn.
import numpy as np
from sklearn import metrics
def mmd_linear(X, Y):
"""MMD using linear kernel (i.e., k(x,y) = <x,y>)
Note that this is not the original linear MMD, only the reformulated and faster version.
The original version is:
def mmd_linear(X, Y):
XX = np.dot(X, X.T)
YY = np.dot(Y, Y.T)
XY = np.dot(X, Y.T)
return XX.mean() + YY.mean() - 2 * XY.mean()
Arguments:
X {[n_sample1, dim]} -- [X matrix]
Y {[n_sample2, dim]} -- [Y matrix]
Returns:
[scalar] -- [MMD value]
"""
delta = X.mean(0) - Y.mean(0)
return delta.dot(delta.T)
def mmd_rbf(X, Y, gamma=1.0):
"""MMD using rbf (gaussian) kernel (i.e., k(x,y) = exp(-gamma * ||x-y||^2 / 2))
Arguments:
X {[n_sample1, dim]} -- [X matrix]
Y {[n_sample2, dim]} -- [Y matrix]
Keyword Arguments:
gamma {float} -- [kernel parameter] (default: {1.0})
Returns:
[scalar] -- [MMD value]
"""
XX = metrics.pairwise.rbf_kernel(X, X, gamma)
YY = metrics.pairwise.rbf_kernel(Y, Y, gamma)
XY = metrics.pairwise.rbf_kernel(X, Y, gamma)
return XX.mean() + YY.mean() - 2 * XY.mean()
def mmd_poly(X, Y, degree=2, gamma=1, coef0=0):
"""MMD using polynomial kernel (i.e., k(x,y) = (gamma <X, Y> + coef0)^degree)
Arguments:
X {[n_sample1, dim]} -- [X matrix]
Y {[n_sample2, dim]} -- [Y matrix]
Keyword Arguments:
degree {int} -- [degree] (default: {2})
gamma {int} -- [gamma] (default: {1})
coef0 {int} -- [constant item] (default: {0})
Returns:
[scalar] -- [MMD value]
"""
XX = metrics.pairwise.polynomial_kernel(X, X, degree, gamma, coef0)
YY = metrics.pairwise.polynomial_kernel(Y, Y, degree, gamma, coef0)
XY = metrics.pairwise.polynomial_kernel(X, Y, degree, gamma, coef0)
return XX.mean() + YY.mean() - 2 * XY.mean()
if __name__ == '__main__':
a = np.arange(1, 10).reshape(3, 3)
b = [[7, 6, 5], [4, 3, 2], [1, 1, 8], [0, 2, 5]]
b = np.array(b)
print(a)
print(b)
print(mmd_linear(a, b)) # 6.0
print(mmd_rbf(a, b)) # 0.5822
print(mmd_poly(a, b)) # 2436.5
A-distance 函数定义
# Compute A-distance using numpy and sklearn
# Reference: Analysis of representations in domain adaptation, NIPS-07.
import numpy as np
from sklearn import svm
def proxy_a_distance(source_X, target_X, verbose=False):
"""
Compute the Proxy-A-Distance of a source/target representation
"""
nb_source = np.shape(source_X)[0]
nb_target = np.shape(target_X)[0]
if verbose:
print('PAD on', (nb_source, nb_target), 'examples')
C_list = np.logspace(-5, 4, 10)
half_source, half_target = int(nb_source/2), int(nb_target/2)
train_X = np.vstack((source_X[0:half_source, :], target_X[0:half_target, :]))
train_Y = np.hstack((np.zeros(half_source, dtype=int), np.ones(half_target, dtype=int)))
test_X = np.vstack((source_X[half_source:, :], target_X[half_target:, :]))
test_Y = np.hstack((np.zeros(nb_source - half_source, dtype=int), np.ones(nb_target - half_target, dtype=int)))
best_risk = 1.0
for C in C_list:
clf = svm.SVC(C=C, kernel='linear', verbose=False)
clf.fit(train_X, train_Y)
train_risk = np.mean(clf.predict(train_X) != train_Y)
test_risk = np.mean(clf.predict(test_X) != test_Y)
if verbose:
print('[ PAD C = %f ] train risk: %f test risk: %f' % (C, train_risk, test_risk))
if test_risk > .5:
test_risk = 1. - test_risk
best_risk = min(best_risk, test_risk)
return 2 * (1. - 2 * best_risk)
CORAL loss函数定义
# Compute CORAL loss using pytorch
# Reference: DCORAL: Correlation Alignment for Deep Domain Adaptation, ECCV-16.
import torch
def CORAL_loss(source, target):
d = source.data.shape[1]
ns, nt = source.data.shape[0], target.data.shape[0]
# source covariance
xm = torch.mean(source, 0, keepdim=True) - source
xc = xm.t() @ xm / (ns - 1)
# target covariance
xmt = torch.mean(target, 0, keepdim=True) - target
xct = xmt.t() @ xmt / (nt - 1)
# frobenius norm between source and target
loss = torch.mul((xc - xct), (xc - xct))
loss = torch.sum(loss) / (4*d*d)
return loss
# Another implementation:
# Two implementations are the same. Just different formulation format.
# def CORAL(source, target):
# d = source.size(1)
# ns, nt = source.size(0), target.size(0)
# # source covariance
# tmp_s = torch.ones((1, ns)).to(DEVICE) @ source
# cs = (source.t() @ source - (tmp_s.t() @ tmp_s) / ns) / (ns - 1)
# # target covariance
# tmp_t = torch.ones((1, nt)).to(DEVICE) @ target
# ct = (target.t() @ target - (tmp_t.t() @ tmp_t) / nt) / (nt - 1)
# # frobenius norm
# loss = torch.norm(cs - ct, p='fro').pow(2)
# loss = loss / (4 * d * d)
# return loss
Wasserstein loss函数定义
import math
import torch
import torch.linalg as linalg
def calculate_2_wasserstein_dist(X, Y):
'''
Calulates the two components of the 2-Wasserstein metric:
The general formula is given by: d(P_X, P_Y) = min_{X, Y} E[|X-Y|^2]
For multivariate gaussian distributed inputs z_X ~ MN(mu_X, cov_X) and z_Y ~ MN(mu_Y, cov_Y),
this reduces to: d = |mu_X - mu_Y|^2 - Tr(cov_X + cov_Y - 2(cov_X * cov_Y)^(1/2))
Fast method implemented according to following paper: https://arxiv.org/pdf/2009.14075.pdf
Input shape: [b, n] (e.g. batch_size x num_features)
Output shape: scalar
'''
if X.shape != Y.shape:
raise ValueError("Expecting equal shapes for X and Y!")
# the linear algebra ops will need some extra precision -> convert to double
X, Y = X.transpose(0, 1).double(), Y.transpose(0, 1).double() # [n, b]
mu_X, mu_Y = torch.mean(X, dim=1, keepdim=True), torch.mean(Y, dim=1, keepdim=True) # [n, 1]
n, b = X.shape
fact = 1.0 if b < 2 else 1.0 / (b - 1)
# Cov. Matrix
E_X = X - mu_X
E_Y = Y - mu_Y
cov_X = torch.matmul(E_X, E_X.t()) * fact # [n, n]
cov_Y = torch.matmul(E_Y, E_Y.t()) * fact
# calculate Tr((cov_X * cov_Y)^(1/2)). with the method proposed in https://arxiv.org/pdf/2009.14075.pdf
# The eigenvalues for M are real-valued.
C_X = E_X * math.sqrt(fact) # [n, n], "root" of covariance
C_Y = E_Y * math.sqrt(fact)
M_l = torch.matmul(C_X.t(), C_Y)
M_r = torch.matmul(C_Y.t(), C_X)
M = torch.matmul(M_l, M_r)
S = linalg.eigvals(M) + 1e-15 # add small constant to avoid infinite gradients from sqrt(0)
sq_tr_cov = S.sqrt().abs().sum()
# plug the sqrt_trace_component into Tr(cov_X + cov_Y - 2(cov_X * cov_Y)^(1/2))
trace_term = torch.trace(cov_X + cov_Y) - 2.0 * sq_tr_cov # scalar
# |mu_X - mu_Y|^2
diff = mu_X - mu_Y # [n, 1]
mean_term = torch.sum(torch.mul(diff, diff)) # scalar
# put it together
代码获取
相关项目和问题,欢迎沟通交流。
参考文献
Yan H, Ding Y, Li P, Wang Q, Xu Y, Zuo W. Mind the class weight bias: Weighted maximum mean discrepancy for unsupervised domain adaptation. InProceedings of the IEEE conference on computer vision and pattern recognition 2017 (pp. 2272-2281).
Sun B, Saenko K. Deep coral: Correlation alignment for deep domain adaptation. InComputer Vision–ECCV 2016 Workshops: Amsterdam, The Netherlands, October 8-10 and 15-16, 2016, Proceedings, Part III 14 2016 (pp. 443-450). Springer International Publishing.
Shen J, Qu Y, Zhang W, Yu Y. Wasserstein distance guided representation learning for domain adaptation. InProceedings of the AAAI conference on artificial intelligence 2018 Apr 29 (Vol. 32, No. 1).
Ben-David S, Blitzer J, Crammer K, Pereira F. Analysis of representations for domain adaptation. Advances in neural information processing systems. 2006;19.
Kramer O, Kramer O. Scikit-learn. Machine learning for evolution strategies. 2016:45-53.