1
+ import hmac
2
+ import hashlib
3
+ import json
4
+ import logging
5
+ import os
6
+ import base64
7
+ import requests
8
+ from fastapi import APIRouter , Request , Header , HTTPException
9
+ from Crypto .Cipher import AES
10
+ from Crypto .Util .Padding import unpad
11
+
12
+ from dotenv import load_dotenv
13
+
14
+ load_dotenv ()
15
+
16
+ router = APIRouter ()
17
+ WEBHOOK_SECRET = os .getenv ("WEBHOOK_SECRET" )
18
+
19
+ logger = logging .getLogger (__name__ )
20
+ logging .basicConfig (level = logging .INFO )
21
+
22
+
23
+ def verify_signature (secret , body , signature ):
24
+ """Verify GitHub webhook signature using HMAC-SHA256."""
25
+ mac = hmac .new (secret .encode (), body , hashlib .sha256 ).hexdigest ()
26
+ expected_signature = f"sha256={ mac } "
27
+ return hmac .compare_digest (expected_signature , signature )
28
+
29
+
30
+ def decrypt_token (encrypted_token , iv ):
31
+ """Decrypt API token using WEBHOOK_SECRET as the key."""
32
+ try :
33
+ # Generate the key from WEBHOOK_SECRET in the same way as the GitHub Action
34
+ key = hashlib .sha256 (WEBHOOK_SECRET .encode ()).hexdigest ()
35
+ key_bytes = bytes .fromhex (key )
36
+ iv_bytes = bytes .fromhex (iv )
37
+
38
+ # Base64 decode the encrypted token
39
+ encrypted_data = base64 .b64decode (encrypted_token )
40
+
41
+ # Create cipher and decrypt
42
+ cipher = AES .new (key_bytes , AES .MODE_CBC , iv_bytes )
43
+ decrypted_bytes = cipher .decrypt (encrypted_data )
44
+
45
+ # Handle padding properly
46
+ try :
47
+ unpadded = unpad (decrypted_bytes , AES .block_size )
48
+ except ValueError :
49
+ # If unpadding fails, try to find the null termination
50
+ if b'\x00 ' in decrypted_bytes :
51
+ unpadded = decrypted_bytes [:decrypted_bytes .index (b'\x00 ' )]
52
+ else :
53
+ unpadded = decrypted_bytes
54
+
55
+ return unpadded .decode ('utf-8' )
56
+ except Exception as e :
57
+ logger .error (f"Token decryption error: { str (e )} " )
58
+ raise HTTPException (status_code = 500 , detail = "Failed to decrypt token" )
59
+
60
+
61
+ def get_pr_commits (repo_full_name , pr_number , github_token ):
62
+ """Fetch the list of commits for a PR from GitHub API."""
63
+ url = f"https://api.github.com/repos/{ repo_full_name } /pulls/{ pr_number } /commits"
64
+ print (url )
65
+ headers = {"Authorization" : f"{ github_token } " , "Accept" : "application/vnd.github.v3+json" }
66
+
67
+ response = requests .get (url , headers = headers )
68
+
69
+ if response .status_code != 200 :
70
+ logger .error (f"Failed to fetch commits: { response .text } " )
71
+ raise HTTPException (status_code = 500 , detail = "Error fetching PR commits" )
72
+
73
+ return response .json ()
74
+
75
+
76
+ @router .post ("/github-webhook" )
77
+ async def github_webhook (
78
+ request : Request ,
79
+ x_hub_signature_256 : str = Header (None ),
80
+ x_encrypted_token : str = Header (None , alias = "X-Encrypted-Token" ),
81
+ x_token_iv : str = Header (None , alias = "X-Token-IV" )
82
+ ):
83
+ """Receives GitHub webhook payload and fetches PR commits if applicable."""
84
+ body = await request .body ()
85
+
86
+ # Verify webhook signature
87
+ if WEBHOOK_SECRET and x_hub_signature_256 :
88
+ if not verify_signature (WEBHOOK_SECRET , body , x_hub_signature_256 ):
89
+ logger .error ("Signature verification failed" )
90
+ raise HTTPException (status_code = 403 , detail = "Invalid signature" )
91
+
92
+ # Validate encrypted token headers
93
+ if not x_encrypted_token or not x_token_iv :
94
+ logger .error ("Missing encryption headers" )
95
+ raise HTTPException (status_code = 403 , detail = "Missing token encryption headers" )
96
+
97
+ # Decrypt the token
98
+ try :
99
+ github_token = decrypt_token (x_encrypted_token , x_token_iv )
100
+ except Exception as e :
101
+ logger .error (f"Token decryption failed: { str (e )} " )
102
+ raise HTTPException (status_code = 403 , detail = "Token decryption failed" )
103
+
104
+ payload = await request .json ()
105
+ # save this locally
106
+ with open ("samples/payload.json" , "w" ) as f :
107
+ json .dump (payload , f )
108
+ event_type = payload .get ("action" , "" )
109
+
110
+ logger .info (f"Received GitHub event: { event_type } " )
111
+
112
+ if event_type == "synchronize" :
113
+ action = payload .get ("action" , "" )
114
+ if action in ["opened" , "synchronize" , "reopened" ]:
115
+ repo_full_name = payload ["repository" ]["full_name" ]
116
+ pr_number = payload ["pull_request" ]["number" ]
117
+ commits = get_pr_commits (repo_full_name , pr_number , github_token )
118
+
119
+ logger .info (f"Fetched { len (commits )} commits for PR #{ pr_number } " )
120
+ return {"message" : "PR processed" , "pr_number" : pr_number , "commits_count" : len (commits )}
121
+
122
+ return {"message" : "Webhook received" , "event" : event_type }
0 commit comments