1
- import hmac
2
- import hashlib
3
- import json
4
- import logging
5
- import os
6
- import base64
7
- import requests
8
- from fastapi import APIRouter , Request , Header , HTTPException
9
- from Crypto .Cipher import AES
10
- from Crypto .Util .Padding import unpad
11
-
12
- from dotenv import load_dotenv
13
-
14
- load_dotenv ()
15
-
16
- router = APIRouter ()
17
- WEBHOOK_SECRET = os .getenv ("WEBHOOK_SECRET" )
18
-
19
- logger = logging .getLogger (__name__ )
20
- logging .basicConfig (level = logging .INFO )
21
-
22
-
23
- def verify_signature (secret , body , signature ):
24
- """Verify GitHub webhook signature using HMAC-SHA256."""
25
- mac = hmac .new (secret .encode (), body , hashlib .sha256 ).hexdigest ()
26
- expected_signature = f"sha256={ mac } "
27
- return hmac .compare_digest (expected_signature , signature )
28
-
29
-
30
- def decrypt_token (encrypted_token , iv ):
31
- """Decrypt API token using WEBHOOK_SECRET as the key."""
32
- try :
33
- # Generate the key from WEBHOOK_SECRET in the same way as the GitHub Action
34
- key = hashlib .sha256 (WEBHOOK_SECRET .encode ()).hexdigest ()
35
- key_bytes = bytes .fromhex (key )
36
- iv_bytes = bytes .fromhex (iv )
37
-
38
- # Base64 decode the encrypted token
39
- encrypted_data = base64 .b64decode (encrypted_token )
40
-
41
- # Create cipher and decrypt
42
- cipher = AES .new (key_bytes , AES .MODE_CBC , iv_bytes )
43
- decrypted_bytes = cipher .decrypt (encrypted_data )
44
-
45
- # Handle padding properly
46
- try :
47
- unpadded = unpad (decrypted_bytes , AES .block_size )
48
- except ValueError :
49
- # If unpadding fails, try to find the null termination
50
- if b'\x00 ' in decrypted_bytes :
51
- unpadded = decrypted_bytes [:decrypted_bytes .index (b'\x00 ' )]
52
- else :
53
- unpadded = decrypted_bytes
54
-
55
- return unpadded .decode ('utf-8' )
56
- except Exception as e :
57
- logger .error (f"Token decryption error: { str (e )} " )
58
- raise HTTPException (status_code = 500 , detail = "Failed to decrypt token" )
59
-
60
-
61
- def get_pr_commits (repo_full_name , pr_number , github_token ):
62
- """Fetch the list of commits for a PR from GitHub API."""
63
- url = f"https://api.github.com/repos/{ repo_full_name } /pulls/{ pr_number } /commits"
64
- print (url )
65
- headers = {"Authorization" : f"{ github_token } " , "Accept" : "application/vnd.github.v3+json" }
66
-
67
- response = requests .get (url , headers = headers )
68
-
69
- if response .status_code != 200 :
70
- logger .error (f"Failed to fetch commits: { response .text } " )
71
- raise HTTPException (status_code = 500 , detail = "Error fetching PR commits" )
72
-
73
- return response .json ()
74
-
75
-
76
- @router .post ("/github-webhook" )
77
- async def github_webhook (
78
- request : Request ,
79
- x_hub_signature_256 : str = Header (None ),
80
- x_encrypted_token : str = Header (None , alias = "X-Encrypted-Token" ),
81
- x_token_iv : str = Header (None , alias = "X-Token-IV" )
82
- ):
83
- """Receives GitHub webhook payload and fetches PR commits if applicable."""
84
- body = await request .body ()
85
-
86
- # Verify webhook signature
87
- if WEBHOOK_SECRET and x_hub_signature_256 :
88
- if not verify_signature (WEBHOOK_SECRET , body , x_hub_signature_256 ):
89
- logger .error ("Signature verification failed" )
90
- raise HTTPException (status_code = 403 , detail = "Invalid signature" )
91
-
92
- # Validate encrypted token headers
93
- if not x_encrypted_token or not x_token_iv :
94
- logger .error ("Missing encryption headers" )
95
- raise HTTPException (status_code = 403 , detail = "Missing token encryption headers" )
96
-
97
- # Decrypt the token
98
- try :
99
- github_token = decrypt_token (x_encrypted_token , x_token_iv )
100
- except Exception as e :
101
- logger .error (f"Token decryption failed: { str (e )} " )
102
- raise HTTPException (status_code = 403 , detail = "Token decryption failed" )
103
-
104
- payload = await request .json ()
105
- # save this locally
106
- with open ("samples/payload.json" , "w" ) as f :
107
- json .dump (payload , f )
108
- event_type = payload .get ("action" , "" )
109
-
110
- logger .info (f"Received GitHub event: { event_type } " )
111
-
112
- if event_type == "synchronize" :
113
- action = payload .get ("action" , "" )
114
- if action in ["opened" , "synchronize" , "reopened" ]:
115
- repo_full_name = payload ["repository" ]["full_name" ]
116
- pr_number = payload ["pull_request" ]["number" ]
117
- commits = get_pr_commits (repo_full_name , pr_number , github_token )
118
-
119
- logger .info (f"Fetched { len (commits )} commits for PR #{ pr_number } " )
120
- return {"message" : "PR processed" , "pr_number" : pr_number , "commits_count" : len (commits )}
121
-
122
- return {"message" : "Webhook received" , "event" : event_type }
0 commit comments