Skip to content

Commit 61541ac

Browse files
committed
Merge branch 'master' into tremor-graphs-refactor
2 parents aca766a + abde6ee commit 61541ac

File tree

14 files changed

+292
-1093
lines changed

14 files changed

+292
-1093
lines changed

App/FeatureSet/Workers/Index.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ import AnalyticsTableManagement from "./Utils/AnalyticsDatabase/TableManegement"
5858
import RunDatabaseMigrations from "./Utils/DataMigration";
5959
import JobDictionary from "./Utils/JobDictionary";
6060
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
61-
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
61+
import Queue, { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
6262
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
6363
import FeatureSet from "Common/Server/Types/FeatureSet";
6464
import logger from "Common/Server/Utils/Logger";
@@ -69,6 +69,10 @@ import "./Jobs/Probe/UpdateConnectionStatus";
6969

7070
// Telemetry Monitors.
7171
import "./Jobs/TelemetryMonitor/MonitorTelemetryMonitor";
72+
import Express, { ExpressApplication } from "Common/Server/Utils/Express";
73+
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
74+
75+
const app: ExpressApplication = Express.getExpressApp();
7276

7377
const WorkersFeatureSet: FeatureSet = {
7478
init: async (): Promise<void> => {
@@ -97,7 +101,14 @@ const WorkersFeatureSet: FeatureSet = {
97101
await funcToRun();
98102
}
99103
},
100-
{ concurrency: 10 },
104+
{ concurrency: 100 },
105+
);
106+
107+
// attach bull board to the app
108+
app.use(
109+
Queue.getInspectorRoute(),
110+
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
111+
Queue.getQueueInspectorRouter(),
101112
);
102113
} catch (err) {
103114
logger.error("App Init Failed:");

App/FeatureSet/Workers/Jobs/IncomingRequestMonitor/CheckHeartbeat.ts

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@ RunCron(
1717
"IncomingRequestMonitor:CheckHeartbeat",
1818
{ schedule: EVERY_THIRTY_SECONDS, runOnStartup: false },
1919
async () => {
20-
logger.debug("Checking IncomingRequestMonitor:CheckHeartbeat");
20+
logger.debug(
21+
"Checking IncomingRequestMonitor:CheckHeartbeat at " +
22+
OneUptimeDate.getDateAsLocalFormattedString(
23+
OneUptimeDate.getCurrentDate(),
24+
),
25+
);
2126

2227
const newIncomingRequestMonitors: Array<Monitor> =
2328
await MonitorService.findBy({
@@ -87,6 +92,14 @@ RunCron(
8792

8893
for (const monitor of totalIncomingRequestMonitors) {
8994
try {
95+
logger.debug(
96+
`Processing incoming request monitor: ${monitor.id?.toString()}`,
97+
);
98+
99+
if (!monitor.monitorSteps) {
100+
logger.debug("Monitor has no steps. Skipping...");
101+
continue;
102+
}
90103

91104
await MonitorService.updateOneById({
92105
id: monitor.id!,
@@ -99,10 +112,9 @@ RunCron(
99112
},
100113
});
101114

102-
if (!monitor.monitorSteps) {
103-
logger.debug("Monitor has no steps. Skipping...");
104-
continue;
105-
}
115+
logger.debug(
116+
`Updated incoming request monitor heartbeat checked at: ${monitor.id?.toString()}`,
117+
);
106118

107119
const processRequest: boolean = shouldProcessRequest(monitor);
108120

@@ -124,7 +136,15 @@ RunCron(
124136
onlyCheckForIncomingRequestReceivedAt: true,
125137
};
126138

139+
logger.debug(
140+
`Processing incoming request monitor: ${monitor.id?.toString()}`,
141+
);
142+
127143
await MonitorResourceUtil.monitorResource(incomingRequest);
144+
145+
logger.debug(
146+
`Processed incoming request monitor: ${monitor.id?.toString()}`,
147+
);
128148
} catch (error) {
129149
logger.error(
130150
`Error while processing incoming request monitor: ${monitor.id?.toString()}`,

App/Index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import Workers from "./FeatureSet/Workers/Index";
99
import Workflow from "./FeatureSet/Workflow/Index";
1010
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
1111
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
12-
import { PostgresAppInstance } from "Common/Server/Infrastructure/PostgresDatabase";
12+
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
1313
import Redis from "Common/Server/Infrastructure/Redis";
1414
import InfrastructureStatus from "Common/Server/Infrastructure/Status";
1515
import logger from "Common/Server/Utils/Logger";

Common/Server/Infrastructure/PostgresDatabase.ts

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,23 @@ export type DatabaseSourceOptions = DataSourceOptions;
88
export type DatabaseSource = DataSource;
99

1010
export default class Database {
11-
protected dataSourceOptions: DataSourceOptions | null = null;
12-
protected dataSource: DataSource | null = null;
11+
protected static dataSourceOptions: DataSourceOptions | null = null;
12+
protected static dataSource: DataSource | null = null;
1313

14-
public getDatasourceOptions(): DataSourceOptions {
14+
public static getDatasourceOptions(): DataSourceOptions {
1515
this.dataSourceOptions = DatabaseDataSourceOptions;
1616
return this.dataSourceOptions;
1717
}
1818

19-
public getDataSource(): DataSource | null {
19+
public static getDataSource(): DataSource | null {
2020
return this.dataSource;
2121
}
2222

23-
public isConnected(): boolean {
23+
public static isConnected(): boolean {
2424
return Boolean(this.dataSource);
2525
}
2626

27-
public async connect(): Promise<DataSource> {
27+
public static async connect(): Promise<DataSource> {
2828
let retry: number = 0;
2929

3030
const dataSourceOptions: DataSourceOptions = this.getDatasourceOptions();
@@ -67,14 +67,14 @@ export default class Database {
6767
}
6868
}
6969

70-
public async disconnect(): Promise<void> {
70+
public static async disconnect(): Promise<void> {
7171
if (this.dataSource) {
7272
await this.dataSource.destroy();
7373
this.dataSource = null;
7474
}
7575
}
7676

77-
public async checkConnnectionStatus(): Promise<boolean> {
77+
public static async checkConnnectionStatus(): Promise<boolean> {
7878
// check popstgres connection to see if it is still alive
7979

8080
try {
@@ -94,31 +94,29 @@ export default class Database {
9494
}
9595
}
9696

97-
public async dropDatabase(): Promise<void> {
97+
public static async dropDatabase(): Promise<void> {
9898
await dropDatabase({
9999
options: this.getDatasourceOptions(),
100100
});
101101
this.dataSource = null;
102102
this.dataSourceOptions = null;
103103
}
104104

105-
public async createDatabase(): Promise<void> {
105+
public static async createDatabase(): Promise<void> {
106106
await createDatabase({
107107
options: this.getDatasourceOptions(),
108108
ifNotExist: true,
109109
});
110110
}
111111

112-
public async createAndConnect(): Promise<void> {
112+
public static async createAndConnect(): Promise<void> {
113113
await this.createDatabase();
114114
await this.connect();
115115
}
116116

117-
public async disconnectAndDropDatabase(): Promise<void> {
117+
public static async disconnectAndDropDatabase(): Promise<void> {
118118
// Drop the database. Since this is the in-mem db, it will be destroyed.
119119
await this.disconnect();
120120
await this.dropDatabase();
121121
}
122122
}
123-
124-
export const PostgresAppInstance: Database = new Database();

Common/Server/Infrastructure/Queue.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
1-
import { RedisHostname, RedisPassword, RedisPort } from "../EnvironmentConfig";
1+
import {
2+
ClusterKey,
3+
RedisHostname,
4+
RedisPassword,
5+
RedisPort,
6+
} from "../EnvironmentConfig";
27
import Dictionary from "Common/Types/Dictionary";
38
import { JSONObject } from "Common/Types/JSON";
49
import { Queue as BullQueue, Job, JobsOptions } from "bullmq";
10+
import { ExpressAdapter } from "@bull-board/express";
11+
import { createBullBoard } from "@bull-board/api";
12+
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
13+
import { ExpressRouter } from "../Utils/Express";
514

615
export enum QueueName {
716
Workflow = "Workflow",
@@ -51,6 +60,32 @@ export default class Queue {
5160
await this.getQueue(queueName).removeRepeatableByKey(jobId);
5261
}
5362

63+
public static getInspectorRoute(): string {
64+
return "/api/inspect/queue/:clusterKey";
65+
}
66+
67+
public static getQueueInspectorRouter(): ExpressRouter {
68+
const serverAdapter = new ExpressAdapter();
69+
70+
createBullBoard({
71+
queues: [
72+
...Object.values(QueueName).map((queueName) => {
73+
return new BullMQAdapter(this.getQueue(queueName));
74+
}),
75+
],
76+
serverAdapter: serverAdapter,
77+
});
78+
79+
serverAdapter.setBasePath(
80+
this.getInspectorRoute().replace(
81+
"/:clusterKey",
82+
"/" + ClusterKey.toString(),
83+
),
84+
);
85+
86+
return serverAdapter.getRouter();
87+
}
88+
5489
public static async addJob(
5590
queueName: QueueName,
5691
jobId: string,

Common/Server/Infrastructure/Status.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// This class checks the status of all the datasources.
22
import { ClickhouseAppInstance } from "./ClickhouseDatabase";
3-
import { PostgresAppInstance } from "./PostgresDatabase";
3+
import PostgresAppInstance from "./PostgresDatabase";
44
import Redis from "./Redis";
55
import DatabaseNotConnectedException from "Common/Types/Exception/DatabaseNotConnectedException";
66

Common/Server/Services/DatabaseService.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { AppApiHostname, EncryptionSecret } from "../EnvironmentConfig";
2-
import { PostgresAppInstance } from "../Infrastructure/PostgresDatabase";
2+
import PostgresAppInstance from "../Infrastructure/PostgresDatabase";
33
import ClusterKeyAuthorization from "../Middleware/ClusterKeyAuthorization";
44
import CountBy from "../Types/Database/CountBy";
55
import CreateBy from "../Types/Database/CreateBy";

Common/Server/Utils/Telemetry.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import {
55
Histogram,
66
MetricOptions,
77
} from "@opentelemetry/api/build/src/metrics/Metric";
8-
import { getNodeAutoInstrumentations } from "@opentelemetry/auto-instrumentations-node";
98
import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-http";
109
import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-proto";
1110
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto";
@@ -152,7 +151,11 @@ export default class Telemetry {
152151
const nodeSdkConfiguration: Partial<opentelemetry.NodeSDKConfiguration> =
153152
{
154153
idGenerator: new AWSXRayIdGenerator(),
155-
instrumentations: hasHeaders ? [getNodeAutoInstrumentations()] : [],
154+
instrumentations: hasHeaders
155+
? [
156+
// Add instrumentations here
157+
]
158+
: [],
156159
resource: this.getResource({
157160
serviceName: data.serviceName,
158161
}),

Common/Tests/Server/TestingUtils/__mocks__/TestDatabase.mock.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import getTestDataSourceOptions from "../Postgres/TestDataSourceOptions";
2-
import {
2+
import PostgresAppInstance, {
33
DatabaseSourceOptions,
4-
PostgresAppInstance,
54
} from "../../../../Server/Infrastructure/PostgresDatabase";
65
import Redis from "../../../../Server/Infrastructure/Redis";
76
import getTestRedisConnectionOptions from "../Redis/TestRedisOptions";

0 commit comments

Comments
 (0)