nodejs微服務框架解決方案

NO IMAGE

前言

seneca是一個nodejs微服務工具集,它賦予系統易於連續構建和更新的能力。下面會逐一和大家一起了解相關技術入門以及實踐。

這裡插入一段硬廣。小子再進行簡單整合之後擼了個vastify框架 —- 輕量級nodejs微服務框架,有興趣的同學過目一下,歡迎順手star一波,另外有疑問或者代碼有毛病歡迎在博文下方留言。

環境

  • 基礎環境
"node": "^10.0.0"
"npm": "^6.0.0"
"pm2": "^2.10.3"
"rabbitmq": "^3.7.5"
"consul": "^1.1.0"
"mongodb": "^3.6"
  • 微服務工程
"bluebird": "^3.5.1"
"koa": "^2.5.1"
"koa-router": "^7.4.0"
"seneca": "^3.4.3"
"seneca-web": "^2.2.0"
"seneca-web-adapter-koa2": "^1.1.0"
"amqplib": "^0.5.2"
"winston": "^2.4.2"
"mongoose": "^5.1.2"

FEATURES

  • 模式匹配做服務間調用:略微不同於SpringCloud服務發現(http協議、IP + PORT模式),它使用更加靈活的模式匹配(Patrun模塊)原則去進行微服務間的調用
  • 接入koa2對C端提供RESTFUl API
  • 插件:更靈活編寫小而微的可複用模塊
  • seneca內置日誌輸出
  • 第三方日誌庫比較winston(選用)、bunyan、log4js
  • RabbitMQ消息隊列
  • PM2:node服務部署(服務集群)、管理與監控
  • PM2:自動化部署
  • PM2集成docker
  • 請求追蹤(重建用戶請求流程)
  • 梳理Consul 服務註冊與發現基本邏輯
  • 框架集成node-consul
  • mongodb持久化存儲
  • 結合seneca與consul的路由服務中間件(可支持多個相同名字服務集群路由,通過$$version區別)
  • 支持流處理(文件上傳/下載等)
  • jenkins自動化部署
  • nginx負載均衡
  • 持續集成方案
  • redis緩存
  • prisma提供GraphQL接口

模式匹配(Patrun模塊)

index.js(accout-server/src/index.js)

const seneca = require('seneca')()
seneca.use('cmd:login', (msg, done) => {
const { username, pass } = msg
if (username === 'asd' && pass === '123') {
return done(null, { code: 1000 })
}
return done(null, { code: 2100 })
})
const Promise = require('bluebird')
const act = Promise.promisify(seneca.act, { context: 'seneca' })
act({
cmd: 'login',
username: 'asd',
pass: '123'
}).then(res => {
console.log(res)
}).catch(err => {
console.log(err)
})

執行後

{ code: 1000 }
{"kind":"notice","notice":"hello seneca k5i8j1cvw96h/1525589223364/10992/3.4.3/-","level":"info","seneca":"k5i8j1cvw96h/1525589223364/10992/3.4.3/-","when":1525589223563}

seneca.add方法,添加一個action pattern到Seneca實例中,它有三個參數:

  1. pattern: 用於Seneca中JSON的消息匹配模式,對象或格式化字符串
  2. sub_pattern: 子模式,優先級低於主模式(可選)
  3. action: 當匹配成功後的動作函數

seneca.act方法,執行Seneca實例中匹配成功的動作,它也有兩個參數:

  1. msg: JSON消息
  2. sub_pattern: 子消息,優先級低於主消息(可選)
  3. response: 用於接收服務調用結果

seneca.use方法,為Seneca實例添加一個插件,它有兩個參數:(此處插件的原理和中間件有一些不同)

  1. func: 插件執行方法
  2. options: 插件所需options(可選)

核心是利用JSON對象進行模式匹配。這個JSON對象既包含某個微服務所需要調取另一個微服務的特徵,同時也包含傳參。和Java微服務發現有些類似不過是用模式代替ip+port,目前為止模式是完全可以實現服務發現功能,但是否更加靈活還有待去挖掘。

所需注意的點

  • 各微服務之間模式需通過設計來區分

啟動第一個微服務

index.js(config-server/src/index.js)

const seneca = require('seneca')()
const config = {
SUCCESS_NORMAL_RES: {
code: 1000,
desc: '服務端正常響應'
}}
seneca.add('$target$:config-server', (msg, done) => {
return done(null, config)
}).listen(10011)

運行此腳本後可在瀏覽器中輸入http://localhost:10011/act?cmd=config發起請求獲取全局配置信息

OR

const seneca = require('seneca')()
const Promise = require('bluebird')
const act = Promise.promisify(seneca.act, { context: seneca })
seneca.client(10011)
act('$$target:config-server, default$:{msg:404}').then(res => {
console.log(res)
}).catch(err => {
console.log(err)
})

對內:多個微服務相互調用(關鍵)

noname-server

const seneca = require('seneca')()
seneca.add('$$target:account-server', (msg, done) => {
done(null, { seneca: '666' })
})
seneca.listen(10015)

config-server(同上)

call

const seneca = require('seneca')()
const Promise = require('blurebird')
const act = Promise.promisify(seneca.act, { context: seneca })
seneca.client({
port: '10011',
pin: '$$target:account-server'
})
seneca.client({
port: '10015',
pin: '$$target:noname-server'
})
act('$$target:account-server').then(res => {
console.log(res)
}).catch(err => {
console.log(err)
})
act('$$target:noname-server').then(res => {
console.log(res)
}).catch(err => {
console.log(err)
})

對外:提供REST服務(關鍵)

集成koa

const seneca = require('seneca')()
const Promise = require('bluebird')
const SenecaWeb = require('seneca-web')
const Koa = require('koa')
const Router = require('koa-router')
const app = new Koa()
const userModule = require('./modules/user.js')
// 初始化用戶模塊
seneca.use(userModule.init)
// 初始化seneca-web插件,並適配koa
seneca.use(SenecaWeb, {
context: Router(),
adapter: require('seneca-web-adapter-koa2'),
routes: [...userModule.routes]
})
// 將routes導出給koa app
seneca.ready(() => {
app.use(seneca.export('web/context')().routes())
})
app.listen(3333)

user模塊

const $module = 'module:user'
let userCount = 3
const REST_Routes = [
{
prefix: '/user',
pin: `${$module},if:*`,
map: {
list: {
GET: true,
name: ''
},
load: {
GET: true,
name: '',
suffix: '/:id'
},
edit: {
PUT: true,
name: '',
suffix: '/:id'
},
create: {
POST: true,
name: ''
},
delete: {
DELETE: true,
name: '',
suffix: '/:id'
}
}
}
]
const db = {
users: [{
id: 1,
name: '甲'
}, {
id: 2,
name: '乙'
}, {
id: 3,
name: '丙'
}]
}
function user(options) {
this.add(`${$module},if:list`, (msg, done) => {
done(null, db.users)
})
this.add(`${$module},if:load`, (msg, done) => {
const { id } = msg.args.params
done(null, db.users.find(v => Number(id) === v.id))
})
this.add(`${$module},if:edit`, (msg, done) => {
let { id } = msg.args.params
id = +id
const { name } = msg.args.body
const index = db.users.findIndex(v => v.id === id)
if (index !== -1) {
db.users.splice(index, 1, {
id,
name
})
done(null, db.users)
} else {
done(null, { success: false })
}
})
this.add(`${$module},if:create`, (msg, done) => {
const { name } = msg.args.body
db.users.push({
id: ++userCount,
name
})
done(null, db.users)
})
this.add(`${$module},if:delete`, (msg, done) => {
let { id } = msg.args.params
id = +id
const index = db.users.findIndex(v => v.id === id)
if (index !== -1) {
db.users.splice(index, 1)
done(null, db.users)
} else {
done(null, { success: false })
}
})
}
module.exports = {
init: user,
routes: REST_Routes
}

vscode-restclient(vscode的restclient插件,用於發起RESTFUL請求)

### 1
POST http://localhost:3333/user HTTP/1.1
Content-Type: application/json
{
"name": "測試添加用戶"
}
### delete
DELETE http://localhost:3333/user/2 HTTP/1.1
### PUT
PUT http://localhost:3333/user/2 HTTP/1.1
Content-Type: application/json
{
"name": "測試修改用戶信息"
}
### GET
GET http://localhost:3333/user HTTP/1.1
### GET
GET http://localhost:3333/user/3 HTTP/1.1

seneca內置日誌輸出

可在構造函數中傳入配置,log屬性可以控制日誌級別

例1:傳字符串

require('seneca')({
// quiet silent any all print standard test
log: 'all'
})

例2:傳對象

require('seneca')({
log: {
// none debug+ info+ warn+
level: 'debug+'
},
// 設置為true時,seneca日誌功能會encapsulate senecaId,senecaTag,actId等字段後輸出(一般為兩字符)
short: true
})

建議例2代碼,因為seneca-web-adapter-koa2插件打印的日誌level為debug,利於做web接口訪問日誌記錄。

winston日誌模塊

傳送門

Logger.js

const { createLogger, format, transports } = require('winston')
const { combine, timestamp, label, printf } = format
const logger = createLogger({
level: 'info',
format: combine(
label({label: 'microservices'}),
timestamp(),
printf(info => {
return `${info.timestamp} [${info.label}] ${info.level}: ${info.message}`
})
),
transports: [ new transports.Console() ]
})
// highest to lowest
const levels = {
error: 0,
warn: 1,
info: 2,
verbose: 3,
debug: 4,
silly: 5
}
module.exports = logger

日誌輸出格式

2018-05-17T14:43:28.330Z [microservices] info: 接收到rpc客戶端的調用請求
2018-05-17T14:43:28.331Z [microservices] warn: warn message
2018-05-17T14:43:28.331Z [microservices] error: error message

RabbitMQ消息隊列服務

1. 單任務單consumer,生產者消費者模式

producer.js

// 創建一個amqp對等體
const amqp = require('amqplib/callback_api')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const q = 'taskQueue1'
const msg = process.argv.slice(2).join(' ') || 'hello world'
// 為方式RabbitMQ退出或者崩潰時重啟後丟失隊列信息,這裡配置durable:true(同時在消費者腳本中也要配置durable:true)後,
ch.assertQueue(q, { durable: true })
// 這裡配置persistent:true,通過閱讀官方文檔,我理解為當程序重啟後,會斷點續傳之前未send完成的數據消息。(但此功能並不可靠,因為不會為所有消息執行同步IO,會緩存在cache並在某個恰當時機write到disk)
ch.sendToQueue(q, Buffer.from(msg), { persistent: true })
setTimeout(() => {
conn.close(); process.exit(0)
}, 100)
})
})
// 創建一個amqp對等體
const amqp = require('amqplib/callback_api')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const q = 'taskQueue1'
// 為方式RabbitMQ退出或者崩潰時重啟後丟失隊列信息,這裡配置durable:true(同時在消費者腳本中也要定義durable:true)後,
ch.assertQueue(q, { durable: true })
ch.prefetch(1)
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q)
ch.consume(q, msg => {
const secs = msg.content.toString().split('.').length - 1
console.log(" [x] Received %s", msg.content.toString())
setTimeout(() => {
console.log(" [x] Done")
ch.ack(msg)
}, secs * 1000)
})
// noAck配置(默認為false)表明consumer是否需要在處理完後反饋ack給producer,如果設置為true,則RabbitMQ服務如果將任務send至此consumer後不關心任務實際處理結果,send任務後直接標記已完成;否則,RabbiMQ得到ack反饋後才標記為已完成,如果一直未收到ack默認會一直等待ack然後標記,另外如果接收到nack或者該consumer進程退出則繼續dispatcher任務
})
})

檢驗過程

  • 執行rabbitmqctl list_queues查看當前隊列
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
  • node producer.js(rabbitMQ執行過程為會先創建一個匿名exchange,一個指定queue然後將queue與該匿名exchange綁定)

  • rabbitmqctl list_bindings

Listing bindings for vhost /...
exchange        taskQueue1      queue   taskQueue1      []
  • rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1      1
  • node consumer.js
Waiting for messages in taskQueue1. To exit press CTRL+C
[x] Received hello world
[x] Done
  • rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1      0

知識點

  • 生產者消費者模式(一個生產者的消息在同一時間只交由一個消費者處理)
  • ACK機制(rabbitmq的確認機制)
  • 創建隊列{durable:true}以及向隊列發送消息{persistent:true}(消息持久化存儲,但不完全能保證,比如當某消息未從緩存中寫到磁盤中而程序崩潰時則會丟失)
  • Round-robin Dispatch(公平分發)
  • 處理窗口控制(prefetch來控制分發窗口)
  • 異步多任務處理機制(比如一個大任務分解,分而治之)
  • 整個消息流流程(某個生產者進程 -> 匿名exchange -> 通過binding -> 指定queue -> 某一個消費者進程)

2. 單任務多consumer,發佈/訂閱模式(全消息模型)

publisher.js

const amqp = require('amqplib/callback_api')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const ex = 'logs'
const msg = process.argv.slice(2).join(' ') || 'Hello World!'
// ex為exchange名稱(唯一)
// 模式為fanout
// 不對消息持久化存儲
ch.assertExchange(ex, 'fanout', { durable: false })
// 第二個參數為指定某一個binding,如為空則由RabbitMQ隨機指定
ch.publish(ex, '', Buffer.from(msg))
console.log(' [x] Send %s', msg)
})
setTimeout(() => {
conn.close()
process.exit(0)
}, 100)
})

subscriber.js

const amqp = require('amqplib/callback_api')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const ex = 'logs'
// ex -> exchange是發佈/訂閱消息的載體,
// fanout -> 分發消息的模式,fanout,direct,topic,headers
// durable設置為false降低一些可靠性,提高性能,因為不需要磁盤IO持久化存儲消息,另外
ch.assertExchange(ex, 'fanout', { durable: false })
// 使用匿名(也就是RabbitMQ自動生成隨機名的queue)隊列
// exclusive設置為true,即可以當其寄生的connection被close的時候自動deleted
ch.assertQueue('', { exclusive: true }, (err, q) => {
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue)
// 綁定隊列到某個exchange載體(監聽某個exchange的消息)
// 第三個入參為binding key
ch.bindQueue(q.queue, ex, '')
// 消費即訂閱某個exchange的消息並設置處理句柄
// 因為發佈/訂閱消息的模式就是非可靠性,只有當訂閱者訂閱才能收到相關的消息而且發佈者不關心該消息的訂閱者是誰以及處理結果如何,所以這裡noAck會置為true
ch.consume(q.queue, (msg) => {
console.log(' [x] %s', msg.content.toString())
}, { noAck: true })
})
})
})

檢驗過程

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空之前測試使用的queues、echanges、bindings)

node subscriber.js

[*] Waiting for messages in amq.gen-lgNW51IeEfj9vt1yjMUuaw. To exit press CTRL+C

rabbitmqctl list_exchanges

Listing exchanges for vhost / ...
logs    fanout

rabbitmqctl list_bindings

Listing bindings for vhost /...
exchange        amq.gen-jDbfwJR8TbSNJT2a2a83Og  queue   amq.gen-jDbfwJR8TbSNJT2a2a83Og  []
logs    exchange        amq.gen-jDbfwJR8TbSNJT2a2a83Og  queue           []

node publisher.js tasks.........

[x] Send tasks......... // publiser.js
[x] tasks......... // subscriber.js

知識點

  • 發佈/訂閱模式(發佈者將消息以一對多的形式發送給訂閱者處理)
  • noAck(此模式下推薦用非Ack機制,因為發佈者往往不需要訂閱者如何處理消息以及其結果)
  • durable:false(此模式下推薦不需要做數據持久化存儲,原因如上)
  • exchange的工作模式(即路由類型,fanout,direct,topic,headers等,下節會講解到)
  • 整個消息流流程(某個發佈者進程 -> 指定exchange -> 通過binding以及工作模式 -> 某個或多個匿名queue即訂閱者進程)

3. Direct Routing

exchange.js

module.exports = {
name: 'ex1',
type: 'direct',
option: {
durable: false
},
ranks: ['info', 'error', 'warning', 'severity']
}

direct-routing.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
ch.assertExchange(ex.name, ex.type, ex.options)
setTimeout(() => {
conn.close()
process.exit(0)
}, 0)
})
})

subscriber.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const ranks = ex.ranks
ranks.forEach(rank => {
// 聲明一個非匿名queue
ch.assertQueue(`${rank}-queue`, { exclusive: false }, (err, q) => {
ch.bindQueue(q.queue, ex.name, rank)
ch.consume(q.queue, msg => {
console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
}, { noAck: true })
})
})
})
})

publisher.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const ranks = ex.ranks
ranks.forEach(rank => {
ch.publish(ex.name, rank, Buffer.from(`${rank} logs...`))
})
setTimeout(() => {
conn.close()
process.exit(0)
}, 0)
})
})

檢驗過程

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空之前測試使用的queues、echanges、bindings)

node direct-routing.js
rabbitmqctl list_exchanges

Listing exchanges for vhost / ...
amq.headers	headers
ex1	direct
amq.fanout	fanout
amq.rabbitmq.trace	topic
amq.topic	topic
direct
amq.direct	direct
amq.match	headers

node subscriber.js
rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
severity-queue	0
error-queue	0
info-queue	0
warning-queue	0
Listing bindings for vhost /...
exchange	error-queue	queue	error-queue	[]
exchange	info-queue	queue	info-queue	[]
exchange	severity-queue	queue	severity-queue	[]
exchange	warning-queue	queue	warning-queue	[]
ex1	exchange	error-queue	queue	error	[]
ex1	exchange	info-queue	queue	info	[]
ex1	exchange	severity-queue	queue	severity	[]
ex1	exchange	warning-queue	queue	warning	[]

node publisher.js

 [x] info: 'info logs...'
[x] error: 'error logs...'
[x] severity: 'severity logs...'
[x] warning: 'warning logs...'

知識點

  • 路由key,用於exchange的direct工作模式下消息的路由
  • 每當assertQueue時,該queue會在以queue名稱當作路由key綁定到匿名exchange
  • 可用於日誌不同級別的log處理

4. Topic Routing

exchange.js

module.exports = {
name: 'ex2',
type: 'topic',
option: {
durable: false
},
ranks: ['info', 'error', 'warning', 'severity']
}

topic-routing.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
ch.assertExchange(exchangeConfig.name, exchangeConfig.type, exchangeConfig.option)
setTimeout(() => {
conn.close()
process.exit(0)
}, 0)
})
})

subscriber.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const args = process.argv.slice(2)
const keys = (args.length > 0) ? args : ['anonymous.info']
console.log(' [*] Waiting for logs. To exit press CTRL+C');
keys.forEach(key => {
ch.assertQueue('', { exclusive: true }, (err, q) => {
console.log(` [x] Listen by routingKey ${key}`)
ch.bindQueue(q.queue, exchangeConfig.name, key)
ch.consume(q.queue, msg => {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, { noAck: true })
})
})
})
})

publisher.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const args = process.argv.slice(2)
const key = (args.length > 1) ? args[0] : 'anonymous.info'
const msg = args.slice(1).join(' ') || 'hello world'
ch.publish(exchangeConfig.name, key, Buffer.from(msg))
setTimeout(() => {
conn.close()
process.exit(0)
}, 0)
})
})

檢驗過程

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空之前測試使用的queues、echanges、bindings)

node topic-routing.js

Listing exchanges for vhost / ...
amq.fanout	fanout
amq.rabbitmq.trace	topic
amq.headers	headers
amq.match	headers
ex2	topic
direct
amq.topic	topic
amq.direct	direct

node subscriber.js "#.info" "*.error"

[*] Waiting for logs. To exit press CTRL+C
[x] Listen by routingKey #.info
[x] Listen by routingKey *.error
  • node publisher.js “account-server.info” “用戶服務測試”
  • node publisher.js “config-server.info” “配置服務測試”
  • node publisher.js “config-server.error” “配置服務出錯”
[x] account-server.info:'用戶服務測試'
[x] config-server.info:'配置服務測試'
[x] config-server.error:'配置服務出錯'

知識點

  • key最長為255字節
  • #可匹配0或多個單詞,*可精確匹配1個單詞

5. RPC

rpc_server.js

const amqp = require('amqplib/callback_api')
const logger = require('./Logger')
let connection = null
amqp.connect('amqp://localhost', (err, conn) => {
connection = conn
conn.createChannel((err, ch) => {
const q = 'account_rpc_queue'
ch.assertQueue(q, { durable: true })
ch.prefetch(2)
ch.consume(q, msg => {
let data = {}
let primitiveContent = msg.content.toString()
try {
data = JSON.parse(primitiveContent)
} catch (e) {
logger.error(new Error(e))
}
logger.info('接收到rpc客戶端的調用請求')
if (msg.properties.correlationId === '10abc') {
logger.info(primitiveContent)
const uid = Number(data.uid) || -1
let r = getUserById(uid)
ch.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(r)), { persistent: true })
ch.ack(msg)
} else {
logger.info('不匹配的調用請求')
}
})
})
})
function getUserById (uid) {
let result = ''
if (uid === +uid && uid > 0) {
result = {
state: 1000,
msg: '成功',
data: {
uid: uid,
name: '小強',
sex: 1
}
}
} else {
result = {
state: 2000,
msg: '傳參格式錯誤'
}
}
return result
}
process.on('SIGINT', () => {
logger.warn('SIGINT')
connection && connection.close()
process.exit(0)
})

rpc_client.js

const amqp = require('amqplib/callback_api')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const q = 'account_rpc_queue'
const callback = 'callback_queue'
ch.assertQueue(callback, { durable: true })
ch.consume(callback, msg => {
const result = msg.content.toString()
console.log(`接收到回調的消息啦!`)
console.log(result)
ch.ack(msg)
setTimeout(() => {
conn.close()
process.exit(0)
}, 0)
})
ch.assertQueue(q, { durable: true })
const msg = {
uid: 2
}
ch.sendToQueue(q, Buffer.from(JSON.stringify(msg)), {
persistent: true,
correlationId: '10abc',
replyTo: 'callback_queue'
})
})
})

檢驗過程

node rpc_server.js

rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
account_rpc_queue	0

node rpc_client.js

rpc_client的CLI打印

接收到回調的消息啦!
{"state":1000,"msg":"成功","data":{"uid":2,"name":"小強","sex":1}}

rpc_server的CLI打印

接收到rpc客戶端的調用請求
{ uid: 2 }

PM2:node服務部署(服務集群)、管理與監控

pm2官網

啟動

pm2 start app.js

  • -w --watch:監聽目錄變化,如變化則自動重啟應用
  • --ignore-file:監聽目錄變化時忽略的文件。如pm2 start rpc_server.js --watch --ignore-watch="rpc_client.js"
  • -n --name:設置應用名字,可用於區分應用
  • -i --instances:設置應用實例個數,0與max相同
  • -f --force: 強制啟動某應用,常常用於有相同應用在運行的情況
  • -o --output <path>:標準輸出日誌文件的路徑
  • -e --error <path>:錯誤輸出日誌文件的路徑
  • --env <path>:配置環境變量

pm2 start rpc_server.js -w -i max -n s1 --ignore-watch="rpc_client.js" -e ./server_error.log -o ./server_info.log

在cluster-mode,也就是-i max下,日誌文件會自動在後面追加-${index}保證不重複

其他簡單且常用命令

pm2 stop app_name|app_id
pm2 restart app_name|app_id
pm2 delete app_name|app_id
pm2 show app_name|app_id OR pm2 describe app_name|app_id
pm2 list
pm2 monit
pm2 logs app_name|app_id --lines <n> --err

Graceful Stop

pm2 stop app_name|app_id

process.on('SIGINT', () => {
logger.warn('SIGINT')
connection && connection.close()
process.exit(0)
})

當進程結束前,程序會攔截SIGINT信號從而在進程即將被殺掉前去斷開數據庫連接等等佔用內存的操作後再執行process.exit()從而優雅的退出進程。(如在1.6s後進程還未結束則繼續發送SIGKILL信號強制進程結束)

Process File

ecosystem.config.js

const appCfg = {
args: '',
max_memory_restart: '150M',
env: {
NODE_ENV: 'development'
},
env_production: {
NODE_ENV: 'production'
},
// source map
source_map_support: true,
// 不合並日志輸出,用於集群服務
merge_logs: false,
// 常用於啟動應用時異常,超時時間限制
listen_timeout: 5000,
// 進程SIGINT命令時間限制,即進程必須在監聽到SIGINT信號後必須在以下設置時間結束進程
kill_timeout: 2000,
// 當啟動異常後不嘗試重啟,運維人員嘗試找原因後重試
autorestart: false,
// 不允許以相同腳本啟動進程
force: false,
// 在Keymetrics dashboard中執行pull/upgrade操作後執行的命令隊列
post_update: ['npm install'],
// 監聽文件變化
watch: false,
// 忽略監聽文件變化
ignore_watch: ['node_modules']
}
function GeneratePM2AppConfig({ name = '', script = '', error_file = '', out_file = '', exec_mode = 'fork', instances = 1, args = "" }) {
if (name) {
return Object.assign({
name,
script: script || `${name}.js`,
error_file: error_file || `${name}-err.log`,
out_file: out_file|| `${name}-out.log`,
instances,
exec_mode: instances > 1 ? 'cluster' : 'fork',
args
}, appCfg)
} else {
return null
}
}
module.exports = {
apps: [
GeneratePM2AppConfig({
name: 'client',
script: './rpc_client.js'
}),
GeneratePM2AppConfig({
name: 'server',
script: './rpc_server.js',
instances: 1
})
]
}

pm2 start ecosystem.config.js

避坑指南:processFile文件命名建議為*.config.js格式。否則後果自負。

監控

請移步app.keymetrics.io

PM2:自動化部署

ssh準備

  1. ssh-keygen -t rsa -C ‘qingf deployment’ -b 4096
  2. 如果有多密鑰、多用戶情況,建議配置~/.ssh/config文件,格式類似如下
// 用不同用戶對不同遠程主機發起ssh請求時指定私鑰
Host qingf.me
User deploy
IdentityFile ~/.ssh/qf_deployment_rsa
// 設置為no可去掉首次登陸(y/n)的選擇
StrictHostKeyChecking no
// 別名用法
Host deployment
User deploy
Hostname qingf.me
IdentityFile ~/.ssh/qingf_deployment_rsa
StrictHostKeyChecking no
  1. 將公鑰複製到遠程(一般為部署服務器)對應用戶目錄,比如/home/deploy/.ssh/authorized_keys文件(authorized_keys文件權限設置為600)

配置ecosystem.config.js

與上述apps同級增加deploy屬性,如下

deploy: {
production: {
'user': 'deploy',
'host': 'qingf.me',
'ref': 'remotes/origin/master',
'repo': 'https://github.com/Cecil0o0/account-server.git',
'path': '/home/deploy/apps/account-server',
// 生命週期鉤子,在ssh到遠端之後setup操作之前執行
'pre-setup': '',
// 生命週期鉤子,在初始化設置即git pull之後執行
'post-setup': 'ls -la',
// 生命週期鉤子,在遠端git fetch origin之前執行
'pre-setup': '',
// 生命週期鉤子,在遠端git修改HEAD指針到指定ref之後執行
'post-deploy': 'npm install && pm2 startOrRestart deploy/ecosystem.config.js --env production',
// 以下這個環境變量將注入到所有app中
"env"  : {
"NODE_ENV": "test"
}
}
}

tip:please make git working directory clean first!

此處如果不懂或者有疑問,請查閱Demo

然後先後執行以下兩條命令**(注意config文件路徑)**

  1. pm2 deploy deploy/ecosystem.config.js production setup
  2. pm2 deploy deploy/ecosystem.config.js production

其他命令

pm2 deploy <configuration_file>

  Commands:
setup                run remote setup commands
update               update deploy to the latest release
revert [n]           revert to [n]th last deployment or 1
curr[ent]            output current release commit
prev[ious]           output previous release commit
exec|run <cmd>       execute the given <cmd>
list                 list previous deploy commits
[ref]                deploy to [ref], the "ref" setting, or latest tag

推薦shell toolkit

oh my zsh

請求追蹤

如何?

  • 在seneca.add以及seneca.act中使用seneca.fixedargs[‘tx$’]值作為traceID標識處於某一條請求流程。另外seneca內置log系統會打印此值。

疑問?

seneca內置log系統如何做自定義日誌打印?

溫馨提示:請以正常的http請求開始,因為經過測試如果微服務自主發起act,其seneca.fixedargs[‘tx$’]值不同。

Consul 服務註冊與發現

Consul是一個分佈式集群服務註冊發現工具,並具有健康檢查、分級式KV存儲、多數據中心等高級特性。

安裝

  • 可選擇使用預編譯的安裝包
  • 也可選擇克隆源碼後編譯安裝

基礎使用

  • 以開發模式快速啟動服務模式代理並開啟web界面訪問http://localhost:8500

consul agent -dev -ui

  • 編寫服務定義文件
{
"service": {
// 服務名,稍後用於query服務
"name": "account-server",
// 服務標籤
"tags": ["account-server"],
// 服務元信息
"meta": {
"meta": "for my service"
},
// 服務端口
"port": 3333,
// 不允許標籤覆蓋
"enable_tag_override": false,
// 腳本檢測做health checks 與-enable-script-checks=true配合使用,有腳本模式、TCP模式、HTTP模式、TTL模式
"checks": [
{
"http": "http://localhost:3333/user",
"interval": "10s"
}
]
}
}
  • query定義的account-server服務

curl http://localhost:8500/v1/catalog/service/account-server

[
{
"ID": "e66eb1ff-460c-e63f-b4ac-0cb42daed19c",
"Node": "haojiechen.local",
"Address": "127.0.0.1",
"Datacenter": "dc1",
"TaggedAddresses": {
"lan": "127.0.0.1",
"wan": "127.0.0.1"
},
"NodeMeta": {
"consul-network-segment": ""
},
"ServiceID": "account-server",
"ServiceName": "account-server",
"ServiceTags": [
"account-server"
],
"ServiceAddress": "",
"ServiceMeta": {
"meta": "for my service"
},
"ServicePort": 3333,
"ServiceEnableTagOverride": false,
"CreateIndex": 6,
"ModifyIndex": 6
}
]

生產級別使用(分佈式集群)

某一個結點啟動一個server模式代理,如下

consul agent -server -bootstrap-expect=1 \
-data-dir=/tmp/consul -node=agent-one -bind=valid extranet IP \
-enable-script-checks=true -config-dir=/usr/local/etc/consul.d

查看集群成員

consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-one  valid extranet IP:8301  alive   server  1.1.0  2         dc1  <all>

另一個結點啟動一個client模式代理,如下

consul agent \
-data-dir=/tmp/consul -node=agent-two -bind=139.129.5.228 \
-enable-script-checks=true -config-dir=/usr/local/etc/consul.d

查看集群成員

consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-two  139.129.5.228:8301  alive   server  1.1.0  2         dc1  <all>

加入Cluster

consul join 139.129.5.228
consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-one  valid extranet IP:8301  alive   server  1.1.0  2         dc1  <all>
agent-two  139.129.5.228:8301  alive   server  1.1.0  2         dc1  <all>

集成node-consul

config.js

// 服務註冊與發現
// https://github.com/silas/node-consul#catalog-node-services
'serverR&D': {
consulServer: {
type: 'consul',
host: '127.0.0.1',
port: 8500,
secure: false,
ca: [],
defaults: {
token: ''
},
promisify: true
},
bizService: {
name: 'defaultName',
id: 'defaultId',
address: '127.0.0.1',
port: 1000,
tags: [],
meta: {
version: '',
description: '註冊集群'
},
check: {
http: '',
// check間隔時間(ex: 15s)
interval: '10s',
// check超時時間(ex: 10s)
timeout: '2s',
// 處於臨界狀態後自動註銷服務的超時時間
deregistercriticalserviceafter: '30s',
// 初始化狀態值為成功
status: 'passing',
// 備註
notes: '{"version":"111","microservice-port":1115}'
}
}
}

server-register.js

/*
* @Author: Cecil
* @Last Modified by: Cecil
* @Last Modified time: 2018-06-02 11:26:49
* @Description 微服務註冊方法
*/
const defaultConf = require('../config')['serverR&D']
const { ObjectDeepSet, isString } = require('../helper/utils')
const Consul = require('consul')
const { generateServiceName, generateCheckHttp } = require('../helper/consul')
// 註冊服務
function register({ consulServer = {}, bizService = {} } = {}) {
if (!bizService.name && isString(bizService.name)) throw new Error('name is invalid!')
if (bizService.port !== +bizService.port) throw new Error('port is invalid!')
if (!bizService.host && isString(bizService.host)) throw new Error('host is invalid!')
if (!bizService.meta.$$version) throw new Error('meta.$$version is invalid!')
if (!bizService.meta.$$microservicePort) throw new Error('meta.$$microservicePort is invalid!')
const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
const service = defaultConf.bizService
service.name = generateServiceName(bizService.name)
service.id = service.name
service.address = bizService.host
service.port = bizService.port
service.check.http = generateCheckHttp(bizService.host, bizService.port)
service.check.notes = JSON.stringify(bizService.meta)
return new Promise((resolve, reject) => {
consul.agent.service.list().then(services => {
// 檢查主機+端口是否已被佔用
Object.keys(services).some(key => {
if (services[key].Address === service.address && services[key].Port === service.port) {
throw new Error(`該服務集群endpoint[${service.address}, ${service.port}]已被佔用!`)
}
})
// 註冊集群服務
consul.agent.service.register(service).then(() => {
logger.info(`${bizService.name}服務已註冊`)
resolve(services)
}).catch(err => {
console.log(err)
})
}).catch(err => {
throw new Error(err)
})
})
}
module.exports = class ServerRegister {
constructor() {
this.register = register
}
}

驗證

保證runtime中存在consul和mongodb服務後,clone該倉庫Demo,cd到工程根目錄下,運行node src即可。

框架集成node-consul

server-register.js

/*
* @Author: Cecil
* @Last Modified by: Cecil
* @Last Modified time: 2018-06-02 13:58:22
* @Description 微服務註冊方法
*/
const defaultConf = require('../config')['serverR&D']
const { ObjectDeepSet, isString } = require('../helper/utils')
const Consul = require('consul')
const { generateServiceName, generateCheckHttp } = require('../helper/consul')
const logger = new (require('./logger'))().generateLogger()
// 註冊服務方法定義
function register({ consulServer = {}, bizService = {} } = {}) {
if (!bizService.name && isString(bizService.name)) throw new Error('name is invalid!')
if (bizService.port !== +bizService.port) throw new Error('port is invalid!')
if (!bizService.host && isString(bizService.host)) throw new Error('host is invalid!')
if (!bizService.meta.$$version) throw new Error('meta.$$version is invalid!')
if (!bizService.meta.$$microservicePort) throw new Error('meta.$$microservicePort is invalid!')
const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
const service = defaultConf.bizService
service.name = generateServiceName(bizService.name)
service.id = service.name
service.address = bizService.host
service.port = bizService.port
service.check.http = generateCheckHttp(bizService.host, bizService.port)
service.check.notes = JSON.stringify(bizService.meta)
return new Promise((resolve, reject) => {
consul.agent.service.list().then(services => {
// 檢查主機+端口是否已被佔用
Object.keys(services).some(key => {
if (services[key].Address === service.address && services[key].Port === service.port) {
throw new Error(`該服務集群endpoint[${service.address}, ${service.port}]已被佔用!`)
}
})
// 註冊集群服務
consul.agent.service.register(service).then(() => {
logger.info(`${bizService.name}服務註冊成功`)
resolve(services)
}).catch(err => {
console.log(err)
})
}).catch(err => {
throw new Error(err)
})
})
}
module.exports = class ServerRegister {
constructor() {
this.register = register
}
}

account-server/src/index.js

const vastify = require('vastify')
const version = require('../package.json').version
const microservicePort = 10015
const httpPort = 3333
// 註冊服務
vastify.ServerRegister.register({
bizService: {
name: 'account-server',
host: '127.0.0.1',
port: httpPort,
meta: {
$$version: version,
$$microservicePort: microservicePort
}
}
})

Mongodb持久化存儲

  • 框架使用mongoose做mongoClient,當然你也可以選用原生nodejs mongoClient。

改造之前的user模塊,偷個懶就不貼代碼了,具體請查看Demo

結合seneca以及consul的路由服務中間件

microRouting.js

/*
* @Author: Cecil
* @Last Modified by: Cecil
* @Last Modified time: 2018-06-02 16:22:02
* @Description 微服務內部路由中間件,暫不支持自定義路由匹配策略
*/
'use strict'
const Consul = require('consul')
const defaultConf = require('../config')
const { ObjectDeepSet, isNumber } = require('../helper/utils')
const { getServiceNameByServiceKey, getServiceIdByServiceKey } = require('../helper/consul')
const logger = new (require('../tools/logger'))().generateLogger()
const { IPV4_REGEX } = require('../helper/regex')
let services = {}
let consul = null
/**
* @author Cecil0o0
* @description 同步consul服務中心的所有可用服務以及對應check並組裝成對象以方便取值
*/
function syncCheckList () {
return new Promise((resolve, reject) => {
consul.agent.service.list().then(allServices => {
if (Object.keys(allServices).length > 0) {
services = allServices
consul.agent.check.list().then(checks => {
Object.keys(checks).forEach(key => {
allServices[getServiceIdByServiceKey(key)]['check'] = checks[key]
})
resolve(services)
}).catch(err => {
throw new Error(err)
})
} else {
const errmsg = '未發現可用服務'
logger.warn(errmsg)
reject(errmsg)
}
}).catch(err => {
throw new Error(err)
})
})
}
function syncRoutingRule(senecaInstance = {}, services = {}) {
Object.keys(services).forEach(key => {
let service = services[key]
let name = getServiceNameByServiceKey(key)
let $$addr = service.Address
let $$microservicePort = ''
let $$version = ''
try {
let base = JSON.parse(service.check.Notes)
$$microservicePort = base.$$microservicePort
$$version = base.$$version
} catch (e) {
logger.warn(`服務名為${serviceName}。該服務check.Notes為非標準JSON格式,程序已忽略。請檢查服務註冊方式(請確保調用ServerRegister的register來註冊服務)`)
}
if (IPV4_REGEX.test($$addr) && isNumber($$microservicePort)) {
if (service.check.Status === 'passing') {
senecaInstance.client({
host: $$addr,
port: $$microservicePort,
pin: {
$$version,
$$target: name
}
})
} else {
logger.warn(`${$$target}@${$$version || '無'}服務處於critical,因此無法使用`)
}
} else {
logger.warn(`主機(${$$addr})或微服務端口號(${$$microservicePort})有誤,請檢查`)
}
})
}
function startTimeInterval() {
setInterval(syncCheckList, defaultConf.routing.servicesRefresh)
}
function microRouting(consulServer) {
var self = this
consul = Consul(ObjectDeepSet(defaultConf['serverR&D'].consulServer, consulServer))
syncCheckList().then(services => {
syncRoutingRule(self, services)
})
}
module.exports = microRouting

在保證有consul與mongodb的runtime後,請結合這兩個config-serveraccount-server Demo進行測試。

[未完待續….]

相關文章

概述nodejs模塊系統核心原理

Vue+GraphQL初試

概述nodejs核心機制

概述javascript部分核心機制