内部manager.js定义了目前所需要的http对外接口
主要是为了演示和测试的方便,定义了MongoDB里面的数据关系
JSONWebToken.js是登录app用户的token存放集合和关系定义
User.js是登录app用户的存放集合和关系定义
后期转换为正式项目应该直接使用MySql
不再需要这部分代码了
内部manager.js定义了目前所需要的mq处理代码,里面主要包括了MQ的初始化连接和建立需要的交换器和队列
内部包含了你所需要的protobuf的协议文件,今后的协议文件都可以放在这里面
内部包含了一个AngularJS的Client示例代码
内部主要都是证书文件和初始化证书文件的代码,后期证书修改在此处做,用于HTTPS通讯
app.js: 启动文件
conf.js: 程序配置文件
convertor.js: ProtoBuf协议编解码文件
resource.js: 程序所需资源文件,如数据库 MQ HTTP等
var sc = require("smartacjs"); var app = sc.app(); function main(){...} sc.runMain(main);
function main(){ var resouce = require("./resource"); if( !!!resouce ){ console.error("load resource failed!"); return false; }else app.resource = resouce; var httpInterface = require("./http_interface/manager"); if( !!!httpInterface ){ console.error("load resource failed!"); return false; }else app.httpInterface = httpInterface; var mqInterface = require("./mq_interface/manager"); if( !!!mqInterface.init() ){ console.error("load resource failed!"); return false; } else app.mqInterface = mqInterface; }
var Resource = function () { } Resource.prototype.init = function(){ //... } module.exports=new Resource(); if( !module.exports.init() ){ module.exports = null; }
var ssl = require('./ssl/sslkey.js'); //加载证书 this._webapp = express(); // 建立express实例 var concat = require('concat-stream'); // 加载stream连接组件,用于组合post数据 // 组合post数据,主要是由于protobuf的关系,不使用express原生功能 this._webapp.use(function(req, res, next){ req.pipe(concat(function(data){ req.raw_body = data.toString(); next(); })); });
// 创建中间件,在HTTP的发送头部加入必要的信息 // 主要是加入需要认证的信息和允许跨域访问 this._webapp.use(function (req, res, next) { res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'GET, POST'); res.setHeader('Access-Control-Allow-Headers', 'X-Requested-With,content-type, Authorization'); next(); }); // 静态资源主要是为了演示, 后期也可以用于管理界面或者申请界面 this._webapp.use('/static', express.static(__dirname + '/public')); // 启动HTTP服务 this._webapp.listen(app.conf.http_listen_port, function () { console.log("HTTP server listening on port " + app.conf.http_listen_port); });
// 启动HTTPS服务 https.createServer(ssl.options,this._webapp).listen(app.conf.https_listen_port, function(){ console.log("HTTPS server listening on port " + app.conf.https_listen_port); }); }); // 启动MQ连接 this._mqConnection=sc.RabbitMQ().createConnect(app.conf.mqUrl); if (!!!this._mqConnection){ return false; } this._mqConnection.start(); // 启动MongoDB服务, 同时把Mongoose的Promise换成When的 this._mongoose = require("mongoose"); this._mongoose.Promise = Promise; this._mongoose.connect(app.conf.mongoUrl); // 如果有数据库服务, 也在这里启动
conf.options = { //定义一个funciton的主名字,在外部调用时,需要再URL中指定该名字,我们以支付为例 pay: { notify: // 定义了一个function的副名字, 在URL中表现位pay.notify { // 使用哪个mq, 在conf文件中可以定义多个mq, 可以为每个mq指定一个名字 mq: "default", type: "exchange",// 配置类型:exchange/rpc_client exchange_type: "direct",// 交换器类型:topic/fanout/direct exchange_name: "pay.notify", // 因为是交换机类型, 指定交换机名字 durable: true, // 交换机的属性, 是否永久 autoDelete: false // 交换机的属性, 是否自动删除 } //... }, }
conf.options = { //定义一个funciton的主名字,在外部调用时,需要再URL中指定该名字,我们以支付为例 pay: { //... server: // 定义了一个function的副名字, 在URL中表现位pay.server { // 使用哪个mq, 在conf文件中可以定义多个mq, 可以为每个mq指定一个名字 mq: "default", type: "rpc_client", // 配置类型:exchange/rpc_client rpc_name: "cf.pay.rpc" // 因为是rpc_client, 指定rpc_client名字 // 指定协议编解码文件, 以及协议的一些参数, 非常重要哦 convertor: convertor.protobuff(path.resolve(__dirname, 'protocol/sr_customer_coupon.proto.js'), 'sr.customer.coupon','Message') } }, }
exports.default=function(content){ try{ if (content instanceof Buffer){ return content.toString(); } else{ return new Buffer(content); } } catch(e) { console.error("convertor_default crash!error=%s,content=%s", e,content); } return null; }
exports.protobuff=function(file,ns,msg) { var encoder = sc.createPBEncoder(); encoder.init(file,ns,msg); // 返回真正的转换函数 return function(content) { if (content instanceof Buffer) { return encoder.decode(content); } else { return encoder.encode(content); } return null; } }
// 定义jwt的密码,可以在配置文件中指定 var jwtsecret = (typeof app.conf.jwtsecret===typeof '')?app.conf.jwtsecret :'1234567890abcdefghijklmnopq@#$'; // 定义认证检测中间件函数 httpInterface.prototype.ensureAuthorized = function(){//...} // 初始化函数 httpInterface.prototype.init = function(){ // 定义授权接口 app.resource._webapp.post('/authenticate',function (req, res) {//...}); // 定义其他接口,需要认证中间件, 如果是公共资源,就不需要加入此中间件 app.resource._webapp.all('/notify',this.ensureAuthorized, function(request, response){//...}); }); module.exports = new httpInterface(); // 调用初始化 if( !module.exports.init() ){ module.exports = null; }
// 如果是OPTION方法,先回复可以操作的HTTP方法, 如果有特殊需求可以修改此处 if(req.method=='OPTIONS'){ res.setHeader('Allow','GET, POST, DELETE, HEAD, OPTIONS'); res.end(); return ; } var bearerToken; // 其他请求就需要获取头部认证信息 var bearerHeader = req.headers["authorization"]; if (typeof bearerHeader !== 'undefined') { var bearer = bearerHeader.split(" "); // 得到 Token bearerToken = bearer[1]; req.token = bearerToken; //... } else { res.sendStatus(403); // 没有头部信息,返回403 }
var j2a = null; // 从mongoDB中查询 Token jwt2thirdapp.findOne({token: req.token}).exec().then(function (ret) { if (j2a) { // 解码 Token var tokendata = jwt.decode(j2a.token, jwtsecret); // 查询Token是否过期, Token本身就含有此信息,所以其实不需要数据库存储 if (tokendata.expire < Math.floor(new Date() / 1000)) { throw({errcode: 1001, errmessage: "token is timeout."}); } else { //.... next(); }
// 从conf中获取function的配置,大家还记得那堆繁琐的配置吗,这里有用哦 var option=sc.getSubObject(app.conf.options,request.query.fun); // 如果没有配置就返回了 if (!!!option){ response.sendStatus(400); response.end("fun error!"); return; } // 分析参数,进行MQ消息的装配 if (!!!request.query.routekey) request.query.routekey=""; if (!!!request.query.msgid) request.query.msgid=""; var msgOption=undefined; if (request.query.msgid!="") msgOption={messageId:request.query.msgid};
// 获取编码转换器,如果没有配置,就是默认的 if (!!!option.convertor) option.convertor=convertor.default; postData=option.convertor(request.raw_body); // 发送MQ消息通知 option.object.publishMessage(option.exchange_name,request.query.routekey, postData,msgOption); // 发送HTTP回应 response.end("");
// 获取编码转换器, 这里有配置的哦,要具体看协议文件 if (!!!option.convertor) option.convertor=convertor.default; postData=option.convertor(request.raw_body); if (postData!=null){ // 发送RPC消息,记得设置超时时间哦 option.object.sendRequest(postData,request.query.msgid,request.query.timeout) .then(function(res){ // RPC回应 response.writeHead(200,'OK'); // 解码回复的消息,如果不需要返回直接的协议数据体, 应该在这里处理下再发送 var content=option.convertor(res.content); response.end(JSON.stringify(content),'utf8'); }).catch(function(err){ response.statusCode=500; response.end(err); }); }
req.body = JSON.parse(req.raw_body);// 解析数据体 // 从MongoDB中查询是否有效 thirdapp.findOne({app_id: req.body.app_id, app_secret: req.body.app_secret}).exec() .then(function (ret) { if (ret) { _thirdapp = ret; var j2a = new jwt2thirdapp(); j2a.app_id = _thirdapp._id; j2a.issudate = new Date(); // 设置过期时间 j2a.expiredate = Math.floor(j2a.issudate) / 1000 + app.conf.token_timeout; // 进行加密 j2a.token = jwt.sign( {app_id: _thirdapp.app_id, expire: j2a.expiredate}, jwtsecret); // 存入MongoDB, 正式应该是存到Redis return j2a.save(); //...
res.json({ type: true, data: {account_id: _thirdapp.app_id}, token: j2a1.token });
app.mq={};// 全局MQ连接字典 // 创建MQ连接 var createConnection=function(){ // 从配置文件中的获取所有的mq连接串设置, 建立所有的连接 try{ for (var key in app.conf.mq){ var url=app.conf.mq[key]; var conn=scRabbit.createConnect(url); if (conn){ conn.start(); app.mq[key]=conn; } } return true; } catch(e){} return false; }
for (var key in config) { var confObject=config[key]; if (typeof(confObject)=="object") { if(confObject.type=="rpc_client"){ // 创建为RPC if (!confObject.mq) confObject.mq="default"; if (!confObject.rpc_name || confObject.rpc_name==""){ return false; } confObject.object=scRabbit.createRPCClient( app.mq[confObject.mq],confObject.rpc_name); } else if (confObject.type=="rpc_server"){//...} else if (confObject.type=="exchange"){//...} } }