Skip to content

Commit 09d478e

Browse files
Allow 'airflow variables export' to print to stdout (#33279)
Co-authored-by: vedantlodha <[email protected]>
1 parent bfa09da commit 09d478e

File tree

4 files changed

+92
-66
lines changed

4 files changed

+92
-66
lines changed

airflow/cli/cli_config.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,11 @@ def string_lower_type(val):
543543
ARG_DESERIALIZE_JSON = Arg(("-j", "--json"), help="Deserialize JSON variable", action="store_true")
544544
ARG_SERIALIZE_JSON = Arg(("-j", "--json"), help="Serialize JSON variable", action="store_true")
545545
ARG_VAR_IMPORT = Arg(("file",), help="Import variables from JSON file")
546-
ARG_VAR_EXPORT = Arg(("file",), help="Export all variables to JSON file")
546+
ARG_VAR_EXPORT = Arg(
547+
("file",),
548+
help="Export all variables to JSON file",
549+
type=argparse.FileType("w", encoding="UTF-8"),
550+
)
547551

548552
# kerberos
549553
ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs="?")
@@ -1521,6 +1525,10 @@ class GroupCommand(NamedTuple):
15211525
ActionCommand(
15221526
name="export",
15231527
help="Export all variables",
1528+
description=(
1529+
"All variables can be exported in STDOUT using the following command:\n"
1530+
"airflow variables export -\n"
1531+
),
15241532
func=lazy_load_command("airflow.cli.commands.variable_command.variables_export"),
15251533
args=(ARG_VAR_EXPORT, ARG_VERBOSE),
15261534
),

airflow/cli/commands/connection_command.py

+24-30
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
"""Connection sub-commands."""
1818
from __future__ import annotations
1919

20-
import io
2120
import json
2221
import os
2322
import sys
@@ -30,6 +29,7 @@
3029
from sqlalchemy.orm import exc
3130

3231
from airflow.cli.simple_table import AirflowConsole
32+
from airflow.cli.utils import is_stdout
3333
from airflow.compat.functools import cache
3434
from airflow.configuration import conf
3535
from airflow.exceptions import AirflowNotFoundException
@@ -138,10 +138,6 @@ def _format_connections(conns: list[Connection], file_format: str, serialization
138138
return json.dumps(connections_dict)
139139

140140

141-
def _is_stdout(fileio: io.TextIOWrapper) -> bool:
142-
return fileio.name == "<stdout>"
143-
144-
145141
def _valid_uri(uri: str) -> bool:
146142
"""Check if a URI is valid, by checking if scheme (conn_type) provided."""
147143
return urlsplit(uri).scheme != ""
@@ -171,32 +167,30 @@ def connections_export(args):
171167
if args.format or args.file_format:
172168
provided_file_format = f".{(args.format or args.file_format).lower()}"
173169

174-
file_is_stdout = _is_stdout(args.file)
175-
if file_is_stdout:
176-
filetype = provided_file_format or default_format
177-
elif provided_file_format:
178-
filetype = provided_file_format
179-
else:
180-
filetype = Path(args.file.name).suffix
181-
filetype = filetype.lower()
182-
if filetype not in file_formats:
183-
raise SystemExit(
184-
f"Unsupported file format. The file must have the extension {', '.join(file_formats)}."
185-
)
186-
187-
if args.serialization_format and not filetype == ".env":
188-
raise SystemExit("Option `--serialization-format` may only be used with file type `env`.")
189-
190-
with create_session() as session:
191-
connections = session.scalars(select(Connection).order_by(Connection.conn_id)).all()
192-
193-
msg = _format_connections(
194-
conns=connections,
195-
file_format=filetype,
196-
serialization_format=args.serialization_format or "uri",
197-
)
198-
199170
with args.file as f:
171+
if file_is_stdout := is_stdout(f):
172+
filetype = provided_file_format or default_format
173+
elif provided_file_format:
174+
filetype = provided_file_format
175+
else:
176+
filetype = Path(args.file.name).suffix.lower()
177+
if filetype not in file_formats:
178+
raise SystemExit(
179+
f"Unsupported file format. The file must have the extension {', '.join(file_formats)}."
180+
)
181+
182+
if args.serialization_format and not filetype == ".env":
183+
raise SystemExit("Option `--serialization-format` may only be used with file type `env`.")
184+
185+
with create_session() as session:
186+
connections = session.scalars(select(Connection).order_by(Connection.conn_id)).all()
187+
188+
msg = _format_connections(
189+
conns=connections,
190+
file_format=filetype,
191+
serialization_format=args.serialization_format or "uri",
192+
)
193+
200194
f.write(msg)
201195

202196
if file_is_stdout:

airflow/cli/commands/variable_command.py

+26-35
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020

2121
import json
2222
import os
23+
import sys
2324
from json import JSONDecodeError
2425

2526
from sqlalchemy import select
2627

2728
from airflow.cli.simple_table import AirflowConsole
29+
from airflow.cli.utils import is_stdout
2830
from airflow.models import Variable
2931
from airflow.utils import cli as cli_utils
3032
from airflow.utils.cli import suppress_logs_and_warning
@@ -76,44 +78,30 @@ def variables_delete(args):
7678
@providers_configuration_loaded
7779
def variables_import(args):
7880
"""Imports variables from a given file."""
79-
if os.path.exists(args.file):
80-
_import_helper(args.file)
81-
else:
81+
if not os.path.exists(args.file):
8282
raise SystemExit("Missing variables file.")
83+
with open(args.file) as varfile:
84+
try:
85+
var_json = json.load(varfile)
86+
except JSONDecodeError:
87+
raise SystemExit("Invalid variables file.")
88+
suc_count = fail_count = 0
89+
for k, v in var_json.items():
90+
try:
91+
Variable.set(k, v, serialize_json=not isinstance(v, str))
92+
except Exception as e:
93+
print(f"Variable import failed: {repr(e)}")
94+
fail_count += 1
95+
else:
96+
suc_count += 1
97+
print(f"{suc_count} of {len(var_json)} variables successfully updated.")
98+
if fail_count:
99+
print(f"{fail_count} variable(s) failed to be updated.")
83100

84101

85102
@providers_configuration_loaded
86103
def variables_export(args):
87104
"""Exports all the variables to the file."""
88-
_variable_export_helper(args.file)
89-
90-
91-
def _import_helper(filepath):
92-
"""Helps import variables from the file."""
93-
with open(filepath) as varfile:
94-
data = varfile.read()
95-
96-
try:
97-
var_json = json.loads(data)
98-
except JSONDecodeError:
99-
raise SystemExit("Invalid variables file.")
100-
else:
101-
suc_count = fail_count = 0
102-
for k, v in var_json.items():
103-
try:
104-
Variable.set(k, v, serialize_json=not isinstance(v, str))
105-
except Exception as e:
106-
print(f"Variable import failed: {repr(e)}")
107-
fail_count += 1
108-
else:
109-
suc_count += 1
110-
print(f"{suc_count} of {len(var_json)} variables successfully updated.")
111-
if fail_count:
112-
print(f"{fail_count} variable(s) failed to be updated.")
113-
114-
115-
def _variable_export_helper(filepath):
116-
"""Helps export all the variables to the file."""
117105
var_dict = {}
118106
with create_session() as session:
119107
qry = session.scalars(select(Variable))
@@ -126,6 +114,9 @@ def _variable_export_helper(filepath):
126114
val = var.val
127115
var_dict[var.key] = val
128116

129-
with open(filepath, "w") as varfile:
130-
varfile.write(json.dumps(var_dict, sort_keys=True, indent=4))
131-
print(f"{len(var_dict)} variables successfully exported to {filepath}")
117+
with args.file as varfile:
118+
json.dump(var_dict, varfile, sort_keys=True, indent=4)
119+
if is_stdout(varfile):
120+
print("\nVariables successfully exported.", file=sys.stderr)
121+
else:
122+
print(f"Variables successfully exported to {varfile.name}.")

airflow/cli/utils.py

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import io
21+
import sys
22+
23+
24+
def is_stdout(fileio: io.IOBase) -> bool:
25+
"""Check whether a file IO is stdout.
26+
27+
The intended use case for this helper is to check whether an argument parsed
28+
with argparse.FileType points to stdout (by setting the path to ``-``). This
29+
is why there is no equivalent for stderr; argparse does not allow using it.
30+
31+
.. warning:: *fileio* must be open for this check to be successful.
32+
"""
33+
return fileio.fileno() == sys.stdout.fileno()

0 commit comments

Comments
 (0)