Skip to content

Commit 8e94a9c

Browse files
committed
update dequeuing logic
1 parent 33f20a9 commit 8e94a9c

File tree

2 files changed

+69
-32
lines changed

2 files changed

+69
-32
lines changed

simulations/llm_ig_simulation/src/loadbalancer.py

+27-5
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ def find_target_pod(self, routing_type, input_size, output_size, target_latency
306306
latency_esimated = 0
307307

308308
active_req_target_latency_in_window = self.getActiveReqTargetLatencyInWindow()
309-
violations_present = self.getViolationsTargetLatencyInWindow()
309+
violations_present , _= self.getViolationsTargetLatencyInWindow()
310310

311311

312312

@@ -370,6 +370,27 @@ def check_if_queues_empty(self) -> bool:
370370
return True
371371

372372
import random
373+
374+
def slo_based_dequeue(self) -> Optional[Request]:
375+
# Get active targets and their latencies
376+
_, violation_dict = self.getViolationsTargetLatencyInWindow()
377+
# get list of active targets in order of violation dict
378+
379+
active_targets = sorted(violation_dict.keys(), key=lambda x: violation_dict[x], reverse=True)
380+
381+
for k in self.queues:
382+
if k not in active_targets and not self.queues[k].empty():
383+
req = self.queues[k].get()
384+
return req
385+
386+
for k in active_targets:
387+
if k in self.queues and not self.queues[k].empty():
388+
req = self.queues[k].get()
389+
return req
390+
391+
return None
392+
393+
373394

374395
def weighted_dequeue(self) -> Optional[Request]:
375396
# Get active targets and their latencies
@@ -386,7 +407,7 @@ def weighted_dequeue(self) -> Optional[Request]:
386407

387408
# Use random.choices to select a target based on probabilities
388409
# Attempt to dequeue from the selected target's queue
389-
for _ in range(100): # Try up to the 100 times
410+
for _ in range(1000): # Try up to the 100 times
390411
selected_target = random.choices(list(target_probs.keys()), weights=target_probs.values(), k=1)[0]
391412

392413
# Check if the selected target's queue is non-empty
@@ -411,7 +432,7 @@ def dequeue_process(self, routing_type, drop_late_requests = False):
411432
while True:
412433
if not self.check_if_queues_empty() and self.dequeueing_signal(routing_type):
413434
# Get the request with the highest SLO violation
414-
req = self.dequeue()
435+
req = self.weighted_dequeue()
415436
if req:
416437
if (drop_late_requests == False) or (self.env.now - req.arrival_time < 100*req.target_latency): #ad-hoc
417438
target_pod, estimated_latency = self.find_target_pod(routing_type, req.input_size, req.output_size, req.target_latency, req.lora)
@@ -470,7 +491,7 @@ def getViolationsTargetLatencyInWindow(self, time_windows = 300, percentile = 0.
470491
471492
:param time_windows: Time window in which to check for latency violations.
472493
:param percentile: The violation threshold percentile.
473-
:return: Boolean indicating if violations occurred.
494+
:return: Boolean indicating if violations occurred. And % of violations per target latency.
474495
"""
475496
didViolate = False
476497
violation_dict = {}
@@ -493,7 +514,8 @@ def getViolationsTargetLatencyInWindow(self, time_windows = 300, percentile = 0.
493514
for target_latency in violation_dict:
494515
if violation_dict[target_latency]/req_dict[target_latency] > percentile:
495516
didViolate = True
496-
return didViolate
517+
violation_dict[target_latency] = violation_dict[target_latency]/req_dict[target_latency]
518+
return didViolate, violation_dict
497519

498520

499521
def allPodsRunningCritical(self):

simulations/llm_ig_simulation/src/main.py

+42-27
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def main():
8484
'tol_lat_time_lo': [], 'tol_lat_time_hi': [],
8585
'avg_prefill_queue_size' : [],
8686
'avg_pending_tokens_perc' : [],
87-
'avg_actual_tokens_perc' : [], 'request_count': []},
87+
'avg_actual_tokens_perc' : [], 'request_count': [], 'request_count_lo': [], 'request_count_hi': []},
8888

8989
'smart': {'latency': [], 'latency_lo': [], 'latency_hi': [],
9090
'estimated_latency': [], 'estimated_latency_lo': [], 'estimated_latency_hi': [],
@@ -100,7 +100,7 @@ def main():
100100
'tol_lat_time_lo': [], 'tol_lat_time_hi': [],
101101
'avg_prefill_queue_size' : [],
102102
'avg_pending_tokens_perc' : [],
103-
'avg_actual_tokens_perc' : [], 'request_count': []},
103+
'avg_actual_tokens_perc' : [], 'request_count': [], 'request_count_lo': [], 'request_count_hi': []},
104104

105105
'leastlatency': {'latency': [], 'latency_lo': [], 'latency_hi': [],
106106
'throughput_prefill': [], 'throughput_decode': [],
@@ -114,7 +114,7 @@ def main():
114114
'tol_lat_time_lo': [], 'tol_lat_time_hi': [],
115115
'avg_prefill_queue_size' : [],
116116
'avg_pending_tokens_perc' : [],
117-
'avg_actual_tokens_perc' : [], 'request_count': []},
117+
'avg_actual_tokens_perc' : [], 'request_count': [], 'request_count_lo': [], 'request_count_hi': []},
118118
'least': {'latency': [], 'latency_lo': [], 'latency_hi': [],
119119
'throughput_prefill': [], 'throughput_decode': [],
120120
'throughput_prefill_lo': [], 'throughput_decode_lo': [],
@@ -127,7 +127,7 @@ def main():
127127
'tol_lat_time_lo': [], 'tol_lat_time_hi': [],
128128
'avg_prefill_queue_size' : [],
129129
'avg_pending_tokens_perc' : [],
130-
'avg_actual_tokens_perc' : [], 'request_count': []},
130+
'avg_actual_tokens_perc' : [], 'request_count': [], 'request_count_lo': [], 'request_count_hi': []},
131131
'random': {'latency': [], 'latency_lo': [], 'latency_hi': [],
132132
'throughput_prefill': [], 'throughput_decode': [],
133133
'throughput_prefill_lo': [], 'throughput_decode_lo': [],
@@ -140,21 +140,21 @@ def main():
140140
'tol_lat_time_lo': [], 'tol_lat_time_hi': [],
141141
'avg_prefill_queue_size' : [],
142142
'avg_pending_tokens_perc' : [],
143-
'avg_actual_tokens_perc' : [], 'request_count': []},
143+
'avg_actual_tokens_perc' : [], 'request_count': [], 'request_count_lo': [], 'request_count_hi': []},
144144
}
145145

146146
all_routing_types = [ routing_type ]
147147
prompt_output_tuple = None
148148

149149
# Iterate over routing types
150150
for routing_type in all_routing_types:
151-
print(f'Routing Type: {routing_type}')
151+
#print(f'Routing Type: {routing_type}')
152152

153153
for i, _ in enumerate(rates_lo):
154154
req_dict = {}
155155
req_dict_prefill = {}
156156
SIM_DURATION = SIM_DURATIONS[i]
157-
print(f'Simulate with rate: for lo {rates_lo[i]} and for hi {rates_hi[i]} and routing type: {routing_type}')
157+
#print(f'Simulate with rate: for lo {rates_lo[i]} and for hi {rates_hi[i]} and routing type: {routing_type}')
158158
sys.stdout.flush()
159159
# Simpy environment and LLM actors setup
160160
env = simpy.Environment()
@@ -292,28 +292,29 @@ def main():
292292
l1 = [np.sum(list(dict(x).values())) for x in results[routing_type]['target_pods_lo']][-1]
293293
l2 = [np.sum(list(dict(x).values())) for x in results[routing_type]['target_pods_hi']][-1]
294294

295-
print(f'req count {(l1, l2)}')
295+
#print(f'req count {(l1, l2)}')
296296
sys.stdout.flush()
297-
results[routing_type]['request_count'].append(len(completed_req))
297+
results[routing_type]['request_count'].append(len(filtered_req))
298+
results[routing_type]['request_count_lo'].append(len(filtered_req_lo))
299+
results[routing_type]['request_count_hi'].append(len(filtered_req_hi))
298300

299301
if routing_type == 'smart':
300302
results[routing_type]['estimated_latency'].append(estimated_latency_cur)
301303
results[routing_type]['estimated_latency_lo'].append(estimated_latency_cur_lo)
302304
results[routing_type]['estimated_latency_hi'].append(estimated_latency_cur_hi)
303-
print(f"lo dist {Counter(target_pods_lo)} latency {latency_cur_lo} estimated_latency_lo {estimated_latency_cur_lo}")
304-
print(f"hi dist {Counter(target_pods_hi)} latency {latency_cur_hi} estimated_latency_hi {estimated_latency_cur_hi}")
305-
else:
306-
print(f"lo dist {Counter(target_pods_lo)} latency {latency_cur_lo} ")
307-
print(f"hi dist {Counter(target_pods_hi)} latency {latency_cur_hi} ")
308-
309-
# Print the results for this qps
310-
print(f'QPS: {rates_lo[i]} (lo), {rates_hi[i]} (hi)')
311-
print(f'% of lo requests below target: {pct_below_target_lo}%')
312-
print(f'% of hi requests below target: {pct_below_target_hi}%')
313-
print(f"prefill_queue_size {np.mean(prefill_queue_size)}")
314-
print(f"pending_tokens_perc {np.mean(pending_tokens_at_arrival_perc)}")
315-
print(f"actual_tokens_perc {np.mean(actual_tokens_at_arrival_perc)}")
316-
sys.stdout.flush()
305+
#print(f"lo dist {Counter(target_pods_lo)} latency {latency_cur_lo} estimated_latency_lo {estimated_latency_cur_lo}")
306+
#print(f"hi dist {Counter(target_pods_hi)} latency {latency_cur_hi} estimated_latency_hi {estimated_latency_cur_hi}")
307+
#else:
308+
#print(f"lo dist {Counter(target_pods_lo)} latency {latency_cur_lo} ")
309+
#print(f"hi dist {Counter(target_pods_hi)} latency {latency_cur_hi} ")
310+
311+
# #print the results for this qps
312+
#print(f'QPS: {rates_lo[i]} (lo), {rates_hi[i]} (hi)')
313+
#print(f'% of lo requests below target: {pct_below_target_lo}%')
314+
#print(f'% of hi requests below target: {pct_below_target_hi}%')
315+
#print(f"prefill_queue_size {np.mean(prefill_queue_size)}")
316+
#print(f"pending_tokens_perc {np.mean(pending_tokens_at_arrival_perc)}")
317+
#print(f"actual_tokens_perc {np.mean(actual_tokens_at_arrival_perc)}")
317318

318319

319320

@@ -331,8 +332,15 @@ def main():
331332
if not os.path.exists(output_dir):
332333
os.makedirs(output_dir)
333334

335+
# Write results to CSV
336+
# Ensure the output directory exists
337+
output_dir = os.path.dirname(output_file)
338+
if not os.path.exists(output_dir):
339+
os.makedirs(output_dir)
340+
341+
# Open the CSV file for writing
334342
with open(output_file, 'w', newline='') as csvfile:
335-
fieldnames = ['RoutingType', 'RateIndex', 'Latency', 'Latency_Lo', 'Latency_Hi', 'avg_prefill_queue_size', 'avg_pending_tokens_perc', 'avg_actual_tokens_perc' , 'pct_below_latency_target_lo', 'pct_below_latency_target_hi']
343+
fieldnames = ['Job', 'RoutingType', 'RateIndex', 'Latency', 'Latency_Lo', 'Latency_Hi', 'avg_prefill_queue_size', 'avg_pending_tokens_perc', 'avg_actual_tokens_perc', 'pct_below_latency_target_lo', 'pct_below_latency_target_hi', 'num_req_lo', 'num_req_hi']
336344
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
337345

338346
writer.writeheader()
@@ -341,6 +349,7 @@ def main():
341349
for routing_type in all_routing_types:
342350
for i in range(len(rates_lo)):
343351
writer.writerow({
352+
'Job' : os.path.basename(output_file),
344353
'RoutingType': routing_type,
345354
'RateIndex': rates_lo[i],
346355
'Latency': results[routing_type]['latency'][i],
@@ -349,11 +358,17 @@ def main():
349358
'avg_prefill_queue_size': results[routing_type]['avg_prefill_queue_size'][i],
350359
'avg_pending_tokens_perc': results[routing_type]['avg_pending_tokens_perc'][i],
351360
'avg_actual_tokens_perc': results[routing_type]['avg_actual_tokens_perc'][i],
352-
'pct_below_latency_target_lo': results[routing_type]['pct_below_latency_target_lo'][i],
353-
'pct_below_latency_target_hi': results[routing_type]['pct_below_latency_target_hi'][i],
361+
'pct_below_latency_target_lo': results[routing_type]['pct_below_latency_target_lo'][i]* results[routing_type]['request_count_lo'][i]/no_of_messages,
362+
'pct_below_latency_target_hi': results[routing_type]['pct_below_latency_target_hi'][i]* results[routing_type]['request_count_hi'][i]/no_of_messages,
363+
'num_req_lo': results[routing_type]['request_count_lo'][i],
364+
'num_req_hi': results[routing_type]['request_count_hi'][i]
354365
})
355366

356-
print(f"Results have been saved to {output_file}")
367+
# Print the CSV file to stdout
368+
with open(output_file, 'r') as csvfile:
369+
sys.stdout.write(csvfile.read())
370+
371+
#print(f"Results have been saved to {output_file}")
357372

358373

359374

0 commit comments

Comments
 (0)