Skip to content

feat: Refactor the ipc module. #7076

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions packages/@vue/cli-shared-utils/lib/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const fs = require('fs')
const path = require('path')
const LRU = require('lru-cache')
const semver = require('semver')
const { Buffer } = require('buffer')

let _hasYarn
const _yarnProjects = new LRU({
Expand All @@ -14,6 +15,7 @@ const _gitProjects = new LRU({
max: 10,
maxAge: 1000
})
const CRLF = '\r\n'

// env detection
exports.hasYarn = () => {
Expand Down Expand Up @@ -216,3 +218,71 @@ exports.getInstalledBrowsers = () => {

return browsers
}

exports.getPipePath = (id) => {
if (exports.isWindows && !id.startsWith('\\\\.\\pipe\\')) {
id = id.replace(/^\//, '')
id = id.replace(/\//g, '-')
id = `\\\\.\\pipe\\${id}`
} else {
id = `/tmp/app.${id}`
}
return id
}

exports.encodeIpcData = (type, data) => {
const jsonstr = JSON.stringify({
data,
type
})
const massage = `Content-Length: ${Buffer.byteLength(jsonstr)}${CRLF + CRLF}${jsonstr}`
return Buffer.from(massage)
}

exports.parseIpcData = (data, reserveData) => {
let { contentLength, rawData } = reserveData
rawData += data
const messages = []
while (true) {
if (contentLength >= 0) {
if (rawData.length >= contentLength) {
const message = rawData.slice(0, contentLength)
rawData = rawData.slice(contentLength)
contentLength = -1
if (message.length > 0) {
let msg
try {
msg = JSON.parse(message)
} catch (error) {
msg = {
type: 'error',
data: `Error handling data: ${error}`
}
}
messages.push(msg)
}
continue
}
} else {
const idx = rawData.indexOf(CRLF + CRLF)
if (idx !== -1) {
const header = rawData.slice(0, idx)
const lines = header.split(CRLF)
for (let i = 0; i < lines.length; i++) {
const pair = lines[i].split(/: +/)
if (pair[0] === 'Content-Length') {
contentLength = +pair[1]
}
}
rawData = rawData.slice(idx + (CRLF + CRLF).length)
continue
}
}
break
}

reserveData.contentLength = contentLength
reserveData.rawData = rawData

return messages
}
112 changes: 83 additions & 29 deletions packages/@vue/cli-shared-utils/lib/ipc.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const ipc = require('@achrinza/node-ipc')
const { getPipePath, encodeIpcData, parseIpcData } = require('./env')
const net = require('net')

const DEFAULT_ID = process.env.VUE_CLI_IPC || 'vue-cli'
const DEFAULT_IDLE_TIMEOUT = 3000
Expand All @@ -15,12 +16,18 @@ const PROJECT_ID = process.env.VUE_CLI_PROJECT_ID
exports.IpcMessenger = class IpcMessenger {
constructor (options = {}) {
options = Object.assign({}, DEFAULT_OPTIONS, options)
ipc.config.id = this.id = options.networkId
ipc.config.retry = 1500
ipc.config.silent = true
this.id = options.networkId
this.retry = 1500
this.ipcTimer = null
this.reserveData = {
contentLength: -1,
rawData: ''
}
this.socket = null

this.connected = false
this.connecting = false
this.disconnected = false
this.disconnecting = false
this.queue = null
this.options = options
Expand All @@ -40,7 +47,7 @@ exports.IpcMessenger = class IpcMessenger {
}

checkConnection () {
if (!ipc.of[this.id]) {
if (!this.socket) {
this.connected = false
}
}
Expand All @@ -55,7 +62,8 @@ exports.IpcMessenger = class IpcMessenger {
}
}

ipc.of[this.id].emit(type, data)
const massages = encodeIpcData(type, data)
this.socket.write(massages)

clearTimeout(this.idleTimer)
if (this.options.disconnectOnIdle) {
Expand All @@ -76,14 +84,7 @@ exports.IpcMessenger = class IpcMessenger {
if (this.connected || this.connecting) return
this.connecting = true
this.disconnecting = false
ipc.connectTo(this.id, () => {
this.connected = true
this.connecting = false
this.queue && this.queue.forEach(data => this.send(data))
this.queue = null

ipc.of[this.id].on('message', this._onMessage)
})
this._connectTo()
}

disconnect () {
Expand All @@ -92,18 +93,11 @@ exports.IpcMessenger = class IpcMessenger {
this.disconnecting = true
this.connecting = false

const ipcTimer = setTimeout(() => {
this.ipcTimer = setTimeout(() => {
this._disconnect()
}, this.disconnectTimeout)

this.send({ done: true }, 'ack')

ipc.of[this.id].on('ack', data => {
if (data.ok) {
clearTimeout(ipcTimer)
this._disconnect()
}
})
}

on (listener) {
Expand All @@ -118,25 +112,85 @@ exports.IpcMessenger = class IpcMessenger {
_reset () {
this.queue = []
this.connected = false
this.socket = null
}

_disconnect () {
if (!this.socket) {
return
}
this.connected = false
this.disconnecting = false
ipc.disconnect(this.id)
this.disconnected = true
this.socket.destroy()
this._reset()
}

_onMessage (data) {
this.listeners.forEach(fn => {
if (this.options.namespaceOnProject && data._projectId) {
if (data._projectId === PROJECT_ID) {
data = data._data
_onMessage (massage) {
let { type, data } = massage
if (type === 'ack') {
if (data.ok) {
clearTimeout(this.ipcTimer)
this._disconnect()
}
} else {
this.listeners.forEach((resolve, reject) => {
if (this.options.namespaceOnProject && data._projectId) {
if (data._projectId === PROJECT_ID) {
data = data._data
} else {
return
}
}
if (type === 'error') {
reject(data)
} else {
resolve(data)
}
})
}
}

_connectTo () {
const pipPath = getPipePath(this.id)
const socket = net.createConnection({ path: pipPath })
socket.setEncoding('utf-8')

socket.on('connect', () => {
this.connected = true
this.connecting = false
this.queue && this.queue.forEach(data => this.send(data))
this.queue = null
})

socket.on('data', (massages) => {
const queue = parseIpcData(massages, this.reserveData)
queue.forEach(massage => {
this._onMessage(massage)
})
})

socket.on('close', () => {
if (this.disconnected) {
return
}
setTimeout(() => {
if (this.disconnected) {
this._disconnect()
return
}
this._connectTo()
}, this.retry)
})

socket.on('error', (error) => {
const massage = {
type: 'error',
data: error
}
fn(data)
this._onMessage(massage)
})

this.socket = socket
}
}
1 change: 0 additions & 1 deletion packages/@vue/cli-shared-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
},
"homepage": "https://github.com/vuejs/vue-cli/tree/dev/packages/@vue/cli-shared-utils#readme",
"dependencies": {
"@achrinza/node-ipc": "9.2.2",
"chalk": "^4.1.2",
"execa": "^1.0.0",
"joi": "^17.4.0",
Expand Down
100 changes: 78 additions & 22 deletions packages/@vue/cli-ui/apollo-server/util/ipc.js
Original file line number Diff line number Diff line change
@@ -1,36 +1,62 @@
const ipc = require('@achrinza/node-ipc')
const net = require('net')
const fs = require('fs')

// Utils
const { log, dumpObject } = require('../util/logger')
const { getPipePath, encodeIpcData, parseIpcData } = require('@vue/cli-shared-utils')

ipc.config.id = process.env.VUE_CLI_IPC || 'vue-cli'
ipc.config.retry = 1500
ipc.config.silent = true
const id = process.env.VUE_CLI_IPC || 'vue-cli'

const listeners = []

ipc.serve(() => {
ipc.server.on('message', (data, socket) => {
log('IPC message', dumpObject(data))
for (const listener of listeners) {
listener({
data,
emit: data => {
ipc.server.emit(socket, 'message', data)
}
})
const pipePath = getPipePath(id)

let curSocket = null

let reserveData = {
contentLength: -1,
rawData: ''
}

fs.unlink(pipePath, () => {
const server = net.createServer((socket) => {
curSocket = socket
if (socket.setEncoding) {
socket.setEncoding('utf-8')
}

socket.on('data', (massages) => {
const queue = parseIpcData(massages, reserveData)
queue.forEach(massage => {
_onMessage(massage)
})
})

socket.on('close', () => {
if (curSocket && curSocket.destroy) {
curSocket.destroy()
}
reserveData = {
contentLength: -1,
rawData: ''
}
curSocket = null
})

socket.on('error', (error) => {
const massage = {
type: 'error',
data: error
}
_onMessage(massage)
})
})

ipc.server.on('ack', (data, socket) => {
log('IPC ack', dumpObject(data))
if (data.done) {
ipc.server.emit(socket, 'ack', { ok: true })
}
server.listen({
path: pipePath
})
})

ipc.server.start()

function on (cb) {
listeners.push(cb)
return () => off(cb)
Expand All @@ -43,7 +69,37 @@ function off (cb) {

function send (data) {
log('IPC send', dumpObject(data))
ipc.server.broadcast('message', data)
const massages = encodeIpcData('message', data)
curSocket.write(massages)
}

function _onMessage (massage) {
const { type, data } = massage
if (type === 'ack') {
log('IPC ack', dumpObject(data))
if (data.done) {
curSocket.write(encodeIpcData('ack', { ok: true }))
}
} else {
log('IPC message', dumpObject(data))
listeners.forEach((resolve, reject) => {
if (type === 'error') {
reject({
data,
emit: data => {
curSocket.write(encodeIpcData('error', data))
}
})
} else {
resolve({
data,
emit: data => {
curSocket.write(encodeIpcData('message', data))
}
})
}
})
}
}

module.exports = {
Expand Down
Loading