diff --git a/Java/src/JobInitializer.java b/Java/src/JobInitializer.java new file mode 100644 index 0000000..2ef4783 --- /dev/null +++ b/Java/src/JobInitializer.java @@ -0,0 +1,125 @@ +import java.util.Random; +import job.Job; + + +public class JobInitializer { + final Random random; + private Job readConfig; + private Job readFromFile; + private Job readFromDb; + private Job httpReq; + + public JobInitializer() { + this.random = new Random(); + initJobs(); + } + + private int timeToWait() { + return random.nextInt(2000) + 1000; + } + + private void initJobs() { + readConfig = (Job) params -> { + if (params != null) { + System.out.println("reading config...with param " + params[0]); + } else { + System.out.println("reading config..."); + } + try { + Thread.currentThread().sleep(timeToWait()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "some configuration"; + }; + readFromFile = (Job) params -> { + System.out.println("reading File..."); + try { + Thread.currentThread().sleep(timeToWait()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "some configuration"; + }; + readFromDb = (Job) params -> { + System.out.println("accessing db..."); + try { + Thread.currentThread().sleep(timeToWait()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "100 rows from db"; + }; + httpReq = (Job) params -> { + if (params != null) { + System.out.println("trying to send request...Data = " + params[0]); + } else { + System.out.println("trying to send request..."); + } + try { + Thread.currentThread().sleep(timeToWait()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "http://vk.com"; + }; + } + + + private String readConfig() { + System.out.println("reading config..."); + try { + Thread.currentThread().sleep(timeToWait()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "some configuration"; + } + + private String readFromFile(String str) { + System.out.println("reading file." + str + "..."); + try { + Thread.currentThread().sleep(timeToWait()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "file content"; + } + + + private String readFromDb(String arg) { + System.out.println("accessing db." + arg + "..."); + try { + Thread.currentThread().sleep(timeToWait()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "100 rows from db"; + } + + private String httpReq(String str) { + System.out.println("trying to send request." + str + "..."); + try { + Thread.currentThread().sleep(timeToWait()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "http://vk.com"; + } + + public Job getReadConfig() { + return readConfig; + } + + public Job getReadFromFile() { + return readFromFile; + } + + public Job getReadFromDb() { + return readFromDb; + } + + public Job getHttpReq() { + return httpReq; + } +} diff --git a/Java/src/Main.java b/Java/src/Main.java new file mode 100644 index 0000000..17d2cb1 --- /dev/null +++ b/Java/src/Main.java @@ -0,0 +1,131 @@ +import java.io.PrintStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import job.CallbackTask; +import job.Job; +import utils.CustomPrintStream; +import utils.SameThreadExecutorService; + +public class Main { + + ExecutorService service; + JobInitializer initializer; + private Job readConfig; + private Job readFromFile; + private Job readFromDb; + private Job httpReq; + + public static void main(String[] args) { + Main test = new Main(); + test.setup(); + + test.sequentialExecutionsWithCallbacks(); + // test.asyncCalls(); + //test.orderedCalls(); + } + + private void setup() { + //change implementation of standard output stream. + PrintStream stream = new CustomPrintStream(System.out); + System.setOut(stream); + //initialize jobs + this.initializer = new JobInitializer(); + this.readConfig = initializer.getReadConfig(); + this.readFromFile = initializer.getReadFromFile(); + this.readFromDb = initializer.getReadFromDb(); + this.httpReq = initializer.getHttpReq(); + } + + //Sequential calls and sequential execution + private void sequentialExecutionsWithCallbacks() { + service = new SameThreadExecutorService(); + CallbackTask readConfigTask = new CallbackTask(initializer.getReadConfig(), + arg -> System.out.println("Successfully read from config: " + arg)); + CallbackTask readFileTask = new CallbackTask(initializer.getReadFromFile(), + arg -> System.out.println("Successfully read from file: " + arg)); + CallbackTask readFromDbTask = new CallbackTask(initializer.getReadFromDb(), + arg -> System.out.println("Successfully read from DB: " + arg)); + CallbackTask httpReqTask = new CallbackTask(initializer.getHttpReq(), + arg -> System.out.println("Successfully read from URL: " + arg)); + + executeAsync(service, readConfigTask); + executeAsync(service, readFileTask); + executeAsync(service, readFromDbTask); + executeAsync(service, httpReqTask); + + System.out.println("End of src.Main thread in [sequentialExecutionsWithCallbacks]."); + service.shutdown(); + } + + //async calls. Notify when all jobs are done + private void asyncCalls() { + service = Executors.newFixedThreadPool(4, r -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + }); + CountDownLatch latch = new CountDownLatch(4); + CallbackTask readConfigTask = new CallbackTask(readConfig, arg -> { + latch.countDown(); + System.out.println("Successfully read from config: " + arg); + }); + CallbackTask readFileTask = new CallbackTask(readFromFile, arg -> { + latch.countDown(); + System.out.println("Successfully read from file: " + arg); + }); + CallbackTask readFromDbTask = new CallbackTask(readFromDb, arg -> { + latch.countDown(); + System.out.println("Successfully read from DB: " + arg); + }); + CallbackTask httpReqTask = new CallbackTask(httpReq, arg -> { + latch.countDown(); + System.out.println("Successfully read from URL: " + arg); + }); + executeAsync(service, readConfigTask); + executeAsync(service, readFileTask); + executeAsync(service, readFromDbTask); + executeAsync(service, httpReqTask); + try { + latch.await(); + service.shutdown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("End of src.Main thread in [asyncCalls]."); + } + + //Asynchronous calls with jobs rely on results of some previous jobs + public void orderedCalls() { + service = Executors.newFixedThreadPool(4, r -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + }); + CallbackTask httpReqTask = new CallbackTask(httpReq, + arg -> System.out.println("Successfully read from URL: " + arg)); + CallbackTask readFromDbTask = new CallbackTask(readFromDb).callback(arg -> { + System.out.println("Successfully read from DB. It`s time to send req with data we found"); + executeAsync(service, httpReqTask.args(arg)); + }); + CallbackTask readFileTask = new CallbackTask(readFromFile).callback(arg -> { + System.out.println("Successfully read from file. Init connection with DB"); + executeAsync(service, readFromDbTask.args(arg)); + }); + + CallbackTask readConfigTask = new CallbackTask(readConfig).callback(arg -> { + executeAsync(service, readFileTask.args(arg)); + service.shutdown(); + }).args("Config file"); + + executeAsync(service, readConfigTask); + System.out.println("End of src.Main thread in [orderedCallsWithFutures]."); + } + + //wrap src.job to be executed asynchronously by ExecutorService + private Future executeAsync(ExecutorService service, CallbackTask func) { + return service.submit(func); + + } +} \ No newline at end of file diff --git a/Java/src/job/Callback.java b/Java/src/job/Callback.java new file mode 100644 index 0000000..764c4ae --- /dev/null +++ b/Java/src/job/Callback.java @@ -0,0 +1,10 @@ +package job; + +/** + * Callback interface with input data. + * + * @param + */ +public interface Callback { + void complete(T object); +} diff --git a/Java/src/job/CallbackTask.java b/Java/src/job/CallbackTask.java new file mode 100644 index 0000000..5636572 --- /dev/null +++ b/Java/src/job/CallbackTask.java @@ -0,0 +1,48 @@ +package job; + +import java.util.concurrent.Callable; + +/** + * wrapper around Job interface to make it Callable. + * + * @param - return type + */ +public class CallbackTask implements Callable { + + private final Job job; + private T[] args; + private Callback callback; + + public CallbackTask(Job job) { + this.job = job; + } + + public CallbackTask(Job job, Callback callback) { + this.job = job; + this.callback = callback; + } + + public CallbackTask callback(Callback callback) { + this.callback = callback; + return this; + } + + public CallbackTask args(T... args) { + this.args = args; + return this; + } + + @Override + public T call() throws Exception { + T res = null; + try { + res = job.processTask(args); + if (callback != null) { + callback.complete(res); + } + } catch (Exception ex) { + ex.printStackTrace(); + } + return res; + } +} diff --git a/Java/src/job/Job.java b/Java/src/job/Job.java new file mode 100644 index 0000000..bd3e898 --- /dev/null +++ b/Java/src/job/Job.java @@ -0,0 +1,9 @@ +package job; +/** + * Callable analogue with input arguments. + * + * @param - return type + */ +public interface Job { + T processTask(Object... params); +} diff --git a/Java/src/utils/CustomPrintStream.java b/Java/src/utils/CustomPrintStream.java new file mode 100644 index 0000000..4d7eb72 --- /dev/null +++ b/Java/src/utils/CustomPrintStream.java @@ -0,0 +1,17 @@ +package utils; + +import java.io.OutputStream; +import java.io.PrintStream; + +public class CustomPrintStream extends PrintStream { + public CustomPrintStream(OutputStream out) { + super(out); + } + + @Override + public void println(String x) { + super.print("[" + Thread.currentThread().getId() + "]"); + super.println(x); + } + +} diff --git a/Java/src/utils/SameThreadExecutorService.java b/Java/src/utils/SameThreadExecutorService.java new file mode 100644 index 0000000..40aac6d --- /dev/null +++ b/Java/src/utils/SameThreadExecutorService.java @@ -0,0 +1,43 @@ +package utils; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.TimeUnit; + +public class SameThreadExecutorService extends AbstractExecutorService { + //volatile because can be viewed by other threads + private volatile boolean terminated; + + @Override + public void shutdown() { + terminated = true; + } + + @Override + public boolean isShutdown() { + return terminated; + } + + @Override + public boolean isTerminated() { + return terminated; + } + + @Override + public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException { + shutdown(); + return terminated; + } + + @Override + public List shutdownNow() { + return Collections.emptyList(); + } + + @Override + public void execute(Runnable theCommand) { + theCommand.run(); + } + +} \ No newline at end of file diff --git a/JavaScript/1-SequentianCallbacks.js b/JavaScript/1-sequential-callbacks.js similarity index 100% rename from JavaScript/1-SequentianCallbacks.js rename to JavaScript/1-sequential-callbacks.js diff --git a/JavaScript/2-EmulateAsyncCalls.js b/JavaScript/2-emulate-async-calls.js similarity index 100% rename from JavaScript/2-EmulateAsyncCalls.js rename to JavaScript/2-emulate-async-calls.js diff --git a/JavaScript/3-BackToOrder.js b/JavaScript/3-back-to-order.js similarity index 100% rename from JavaScript/3-BackToOrder.js rename to JavaScript/3-back-to-order.js diff --git a/JavaScript/4-NoHierarchy.js b/JavaScript/4-no-hierarchy.js similarity index 100% rename from JavaScript/4-NoHierarchy.js rename to JavaScript/4-no-hierarchy.js