1
- import base64
2
- import json
3
- import os
4
- import time
1
+ import re
5
2
6
- import requests
7
- from dotenv import load_dotenv
8
- from requests import RequestException
3
+ def get_column_name_and_type ( ddl ):
4
+ # Regular expression to match column name and type
5
+ column_pattern = re . compile ( r'(\w+)\s+([\w\(\)]+(?: PRIMARY KEY)?)' )
9
6
10
- load_dotenv ()
7
+ # Extract the part of DDL inside the parentheses
8
+ columns_section = re .search (r'\((.*)\);' , ddl , re .DOTALL ).group (1 )
11
9
12
- ORG_USERNAME = os .environ .get ("ORG_USERNAME" )
13
- ORG_PASSWORD = os .environ .get ("ORG_PASSWORD" )
14
- ORG_GRANT_TYPE = os .environ .get ("ORG_GRANT_TYPE" )
15
- ORG_TOKEN_URL = os .environ .get ("ORG_TOKEN_URL" )
16
- ORG_CONSUMER_KEY = os .environ .get ("ORG_CONSUMER_KEY" )
17
- ORG_CONSUMER_SECRET = os .environ .get ("ORG_CONSUMER_SECRET" )
18
- ORG_COMPANY_URL = os .environ .get ("ORG_COMPANY_URL" )
10
+ # Find all matches in the columns section
11
+ columns = column_pattern .findall (columns_section )
19
12
20
- cache = {}
13
+ # Create a dictionary with column names as keys and types as values
14
+ column_dict = {column [0 ]: column [1 ].split ('(' )[0 ] for column in columns }
21
15
16
+ return column_dict
22
17
23
- def get_token () -> dict | str :
24
- """Get authentication token and cache it."""
25
- if "org_token" in cache and cache ["org_token" ]["expires_at" ] > time .time ():
26
- return cache ["org_token" ]["token" ]
18
+ # Example DDL statement
19
+ ddl_statement = """
20
+ CREATE TABLE employees (
21
+ id SERIAL PRIMARY KEY,
22
+ first_name VARCHAR(50),
23
+ last_name VARCHAR(50),
24
+ email VARCHAR(100),
25
+ hire_date DATE,
26
+ salary NUMERIC(10, 2)
27
+ );
28
+ """
27
29
28
- payload = {
29
- "grant_type" : ORG_GRANT_TYPE ,
30
- "username" : ORG_USERNAME ,
31
- "password" : ORG_PASSWORD ,
32
- }
33
- auth_str = f"{ ORG_CONSUMER_KEY } :{ ORG_CONSUMER_SECRET } "
34
- auth_base64 = base64 .b64encode (auth_str .encode ("utf-8" )).decode ("utf-8" )
30
+ # Get column names and types as a dictionary
31
+ d = get_column_name_and_type (ddl_statement )
35
32
36
- headers = {
37
- "Content-Type" : "application/x-www-form-urlencoded" ,
38
- "Authorization" : f"Basic { auth_base64 } " ,
39
- }
40
- try :
41
- response = requests .post (ORG_TOKEN_URL , headers = headers , data = payload )
42
- response .raise_for_status ()
43
- token = response .json ().get ("access_token" )
44
- expires_in = response .json ().get ("expires_in" , 3600 ) # Standart 1 soat
45
-
46
- if token :
47
- cache ["org_token" ] = {
48
- "token" : token ,
49
- "expires_at" : time .time () + expires_in # Token muddati
50
- }
51
- return token
52
- except RequestException as e :
53
- return {"error" : "Failed to obtain token" , "details" : str (e )}
54
-
55
-
56
- def get_company (inn : int | str ) -> dict :
57
- """Fetch company details by INN from an external service."""
58
- payload = json .dumps ({"tin" : str (inn )})
59
-
60
- token = get_token ()
61
- if isinstance (token , dict ):
62
- return token
63
-
64
- headers = {
65
- "Content-Type" : "application/json" ,
66
- "Authorization" : f"Bearer { token } " ,
67
- }
68
-
69
- try :
70
- response = requests .post (ORG_COMPANY_URL , headers = headers , data = payload )
71
- if response .status_code == 401 :
72
- token = get_token ()
73
- if isinstance (token , dict ):
74
- return token
75
-
76
- headers ["Authorization" ] = f"Bearer { token } "
77
- response = requests .post (ORG_COMPANY_URL , headers = headers , data = payload )
78
-
79
- response .raise_for_status ()
80
- response_js = response .json ()
81
- return {
82
- "name" : response_js .get ("name" ),
83
- "address" : response_js .get ("address" ),
84
- "oked" : response_js .get ("nc6Code" ),
85
- "ns_code" : response_js .get ("ns10Code" ),
86
- }
87
-
88
- except RequestException as e :
89
- return {"error" : "Failed to fetch company details" , "details" : str (e )}
90
-
91
-
92
- print (get_company ("200524845" ))
33
+ # Print the result
34
+ print (d )
0 commit comments