核心参考网站:https://node-postgres.com/
1.pgsql-pool.js
const Pool = require('pg-pool');
const config = {
user: 'postgres',
password: 'XXXX',
host: '121.5.xx.xx',
port: 5432,
database: 'postgres',
// ssl: true,
max: 20, // set pool max size to 20
idleTimeoutMillis: 1000, // close idle clients after 1 second
connectionTimeoutMillis: 1000, // return an error after 1 second if connection could not be established
maxUses: 7500,
};
const pool = new Pool(config);
var async = require("async");
var datiConfig = require('./datiConfig');
function query(sql, options, callback) {
if (datiConfig.debug) {
console.log("sql:" + sql + " === " + options);
}
pool.connect(function (err, conn) {
if (err) {
callback(err, null, null);
} else {
conn.query(sql, options, function (err, results, fields) {
console.error('err:::', err)
// console.error('result:::',results)
//释放连接
conn.release();
//事件驱动回调
callback(err, results ? results.rows : null, fields);
});
}
});
};
function getNewSqlParamEntity(sql, params, callback) {
if (callback) {
return callback(null, {
sql: sql,
params: params
});
}
return {
sql: sql,
params: params
};
}
function execTrans(sqlparamsEntities, callback) {
pool.connect(function (err, connection) {
//封装
const shouldAbort = err => {
if (err) {
console.error('Error in transaction', err.stack)
connection.query('ROLLBACK', err => {
if (err) {
console.error('Error rolling back client', err.stack)
// return callback(err, null);
}
// release the client back to the pool
connection.release()
})
}
return err
}
connection.query('BEGIN', err => {
if (shouldAbort(err) != null) callback(err, null);
console.log("开始执行transaction,共执行" + sqlparamsEntities.length + "条数据");
var funcAry = [];
sqlparamsEntities.forEach(function (sql_param) {
var temp = function (cb) {
var sql = sql_param.sql;
var param = sql_param.params;
connection.query(sql, param, function (tErr, rows, fields) {
if (tErr) {
if (shouldAbort(tErr) != null) {
console.log("事务失败," + sql_param + ",ERROR:" + tErr);
throw tErr;
}
} else {
return cb(null, 'ok');
}
})
};
funcAry.push(temp);
});
async.series(funcAry, function (err, result) {
console.log("transaction error: " + err);
if (err) {
if (shouldAbort(err) != null) callback(err, null);
} else {
connection.query('COMMIT', (err,info) => {
if (err) {
console.error('Error committing transaction', err.stack)
return callback(err, null);
} else {
return callback(null, info);
}
connection.release();
})
}
})
});
});
}
exports.query = query;
exports.getNewSqlParamEntity = getNewSqlParamEntity;
exports.execTrans = execTrans;
2.测试test.js
############query###########################################
var sql = "INSERT INTO sys_m_menus(name,icon,url,weight,parentId,i18n) VALUES ($1,$2,$3,$4,$5,$6)";
var options = [func.name, func.icon, func.url, func.weight, parentId, func.i18n];
db.query(sql, options, function (err, results, fields) {
if (err) {
console.log('error', err);
if (err.code === 'ER_DUP_ENTRY') {
apiUtil.responseResult(response, false, 301, "URL已存在", null);
} else {
apiUtil.responseResult(response, false, 301, "系统异常:" + err, null);
}
} else {
var obj = new Object();
obj.data = results;
apiUtil.responseResult(response, true, 0, "", obj);//查询成功
}
});
############事务###########################################
var sqlParamsEntity = [];
var sql1 = "delete from sys_m_org_menus where orgcode=?";
var param1 = [orgcode];
sqlParamsEntity.push(db.getNewSqlParamEntity(sql1, param1));
var sql2 = "delete from sys_m_org_menus_auth where orgcode=?";
var param2 = [orgcode];
sqlParamsEntity.push(db.getNewSqlParamEntity(sql2, param2));
db.execTrans(sqlParamsEntity, function (err, info) {
if (err) {
apiUtil.responseResult(response, false, 301, "系统异常:" + err, null);
} else {
apiUtil.responseResult(response, true, 0, "", null);
}
});