diff --git a/customerWorker/1-broker.js b/customerWorker/1-broker.js new file mode 100644 index 0000000..b329ef0 --- /dev/null +++ b/customerWorker/1-broker.js @@ -0,0 +1,109 @@ +'use strict'; + +global.api = {}; +api.net = require('net'); +api.os = require('os'); +api.metasync = require('metasync'); +api.queue = require('./queue.js'); + +const workersQueue = new api.queue.Queue(); +let workerId = 0; +let taskId = 0; + +const newWorker = (socket) => { + socket.on('data', (data) => { + const connParams = JSON.parse(data); + //add new worker + workersQueue.put({ + id: workerId++, + port: connParams.port, + host: socket.remoteAddress + }); + console.dir(workersQueue.queue); + socket.end(); + }); +}; + + +const createConnection = (task) => (data, datacb) => { + workersQueue.use((workerParams, qcb) => { + const workerSocket = new api.net.Socket(); + workerSocket.on('data', (readData) => { + console.log('Data received (by broker): ' + readData); + const res = JSON.parse(readData); + data[parseInt(res.index)] = res.answer; + qcb(); + datacb(data); + }); + workerSocket.connect({ + port: workerParams.port, + host: workerParams.host + }, () => { + console.log('Data send (by broker): ' + JSON.stringify(task)); + workerSocket.write(JSON.stringify(task)); + }); + }); +}; + +const mergeResult = (data) => { + console.log('Merging - '); + console.dir(data); + //merge array of results in one + const res = data.reduce((a, b) => a.concat(b)); + // console.log(task); + console.log(res); + return res; +}; + +const createTasks = (arr, workers) => { + const tasks = []; + const elemsByTask = Math.ceil(arr.length / workers); + let i = 0; + while (arr.length > 0) { + tasks.push({ + index: i++, + funcId: 1, + task: arr.splice(0, elemsByTask), + taskId: taskId++ + }); + } + return tasks; +}; + +const sendTasks = (tasks, clientSocket) => { + api.metasync.map( + tasks, + (curr, cb) => { + cb(null, createConnection(curr)); + }, + (err, res) => { + if (err) console.error(err); + api.metasync.parallel( + res, + (data) => { + const res = mergeResult(data); + const response = { answer: res }; + clientSocket.end(JSON.stringify(response)); + }, + [] + ); + }); +}; + +const newTasks = (data, clientSocket) => { + const obj = JSON.parse(data); + sendTasks(createTasks(obj.task, workersQueue.size()), clientSocket); +}; + +//for clients +api.net.createServer((clientSocket) => { + clientSocket.on('error', (err) => { + console.error(err); + }); + clientSocket.on('data', (data) => { + newTasks(data, clientSocket); + }); +}).listen(50000); + +//for workers +api.net.createServer(newWorker).listen(51000); diff --git a/customerWorker/2-worker.js b/customerWorker/2-worker.js new file mode 100644 index 0000000..e974c60 --- /dev/null +++ b/customerWorker/2-worker.js @@ -0,0 +1,13 @@ +'use strict'; + +global.api = {}; +api.cluster = require('cluster'); +api.os = require('os'); +api.net = require('net'); + +global.application = {}; +application.master = require('./workersMaster.js'); +application.worker = require('./worker.js'); + +if (api.cluster.isMaster) application.master(); +else application.worker(); diff --git a/customerWorker/3-customer.js b/customerWorker/3-customer.js new file mode 100644 index 0000000..d68281b --- /dev/null +++ b/customerWorker/3-customer.js @@ -0,0 +1,27 @@ +'use strict'; + +global.api = {}; +api.net = require('net'); +api.os = require('os'); + +const broker = { host: '127.0.0.1', port: 50000 }; +const task = [2, 17, 3, 2, 5, 7, 15, 22, 1, 14, 15, 9, 0, 11, 2, 17, 3, 2, 5]; + +const sendTask = (task) => { + const conn = { port: broker.port, host: broker.host }; + const socket = new api.net.Socket(); + socket.on('data', (readData) => { + console.log('Data received (by client): ' + readData); + const res = JSON.parse(readData); + console.dir(task); + console.dir(res.answer); + }); + socket.connect(conn, () => { + console.log('Data send (by client): ' + JSON.stringify({ task })); + socket.write(JSON.stringify({ task })); + }); +}; + +sendTask(task); +sendTask(task); +sendTask(task); diff --git a/customerWorker/queue.js b/customerWorker/queue.js new file mode 100644 index 0000000..6a5fbe3 --- /dev/null +++ b/customerWorker/queue.js @@ -0,0 +1,48 @@ +'use strict'; + +exports.Queue = Queue; +//simple queue for workers +function Queue() { + //free workers + this.queue = []; + //functions waiting for worker + this.process = []; +} + +Queue.prototype.size = function() { + return this.queue.length; +}; + +Queue.prototype.canGet = function() { + return this.queue.length > 0; +}; + +Queue.prototype.put = function(data) { + this.queue.push(data); + if (this.process.length > 0) { + const fn = this.process.shift(); + const item = this.get(); + fn(item, () => { + this.put(item); + }); + } +}; + +Queue.prototype.get = function() { + return this.queue.shift(); +}; + +Queue.prototype.use = function(func) { + if (this.queue.length > 0) { + const worker = this.get(); + func(worker, () => { + this.put(worker); + }); + } else { + this.process.push(func); + } +}; + +Queue.prototype.clear = function() { + this.queue = []; +}; diff --git a/customerWorker/worker.js b/customerWorker/worker.js new file mode 100644 index 0000000..57b9ad2 --- /dev/null +++ b/customerWorker/worker.js @@ -0,0 +1,41 @@ +'use strict'; + +module.exports = function() { + process.on('message', (message) => { + const broker = { port: 51000, host: '127.0.0.1' }; + const funcs = { + 0: (item) => (item * 2), + 1: (item) => (item * item) + }; + const calculate = (funcId, array) => array.map(funcs[funcId]); + const createHandler = (socket) => (data) => { + const dataObj = JSON.parse(data); + const taskId = parseInt(dataObj.funcId); + console.log('Data received (by worker): ' + data); + const result = calculate(taskId, dataObj.task); + const answer = { index: dataObj.index, answer: result }; + console.log('Task complete. Send result - ' + JSON.stringify(answer)); + socket.end(JSON.stringify(answer)); + }; + const registerInBroker = (myPort, connParams) => { + const brokerConn = new api.net.Socket(); + const connParam = { port: myPort }; + brokerConn.connect(connParams, () => { + brokerConn.write(JSON.stringify(connParam)); + }); + }; + + const port = parseInt(message); + + registerInBroker(port, broker); + console.log('Listen port # ' + port); + api.net.createServer((socket) => { + console.log('Conn: ' + socket.remoteAddress + ':' + socket.remotePort); + const handler = createHandler(socket); + socket.on('error', (err) => { + console.error(err); + }); + socket.on('data', handler); + }).listen(port); + }); +}; diff --git a/customerWorker/workersMaster.js b/customerWorker/workersMaster.js new file mode 100644 index 0000000..fd74ea1 --- /dev/null +++ b/customerWorker/workersMaster.js @@ -0,0 +1,19 @@ +'use strict'; + +module.exports = function() { + const workersCount = 5; + let startPort = 52000; + + const workers = []; + for (let i = 0; i < workersCount; i++) { + const worker = api.cluster.fork(); + workers.push(worker); + } + + workers.forEach((worker) => { + worker.send(startPort++); + worker.on('exit', (code) => { + console.log('exit ' + worker.process.pid + ' ' + code); + }); + }); +};