Skip to content

Commit ea43de0

Browse files
authored
Improve shutdown handling of the UDFClient in error cases (#114)
* Centralize shutdown and error handling * Add parent process id check for UDFPlugins to watchdog thread * Replace SWIGVM_LOG_CLIENT with debug_messages macros * Fix indention
1 parent f23622b commit ea43de0

File tree

3 files changed

+130
-98
lines changed

3 files changed

+130
-98
lines changed

exaudfclient/base/.bazelrc

+1
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ build:verbose --copt='-v' --subcommands --verbose_failures --announce_rc
1414
build:optimize --copt="-g0" --copt="-DNDEBUG" --copt=-fstack-protector-strong --copt=-fomit-frame-pointer --copt=-ffunction-sections --copt=-fdata-sections --copt="-O3" --copt="-U_FORTIFY_SOURCE" --copt="-flto"
1515
build:no-tty --curses=no --color=no
1616
build:debug-build --sandbox_debug --config=verbose
17+
build:no-symlinks --symlink_prefix=/

exaudfclient/base/debug_message.h

+23-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
#ifndef DEBUG_MESSAGE_H
2+
#define DEBUG_MESSAGE_H
3+
14

25
#ifndef NDEBUG
36

@@ -13,6 +16,11 @@
1316
(os) << "DBG: " << __FILE__ << "(" << __LINE__ << ") in " << __func__ << " " \
1417
<< (msg) << std::endl
1518

19+
#define DBG_STREAM_MSG( os, msg ) \
20+
(os) << "DBG: " << __FILE__ << "(" << __LINE__ << ") in " << __func__ << " " \
21+
<< msg << std::endl
22+
23+
1624
#define DBG_FUNC_BEGIN( os ) \
1725
(os) << "DBG: " << __FILE__ << "(" << __LINE__ << ") BEGIN FUNC: " \
1826
<< __func__ << std::endl
@@ -27,6 +35,14 @@
2735
call; \
2836
(os) << "DBG: " << __FILE__ << "(" << __LINE__ << ") " \
2937
<< "CALL END: " << #call << std::endl
38+
39+
#define DBG_COND_FUNC_CALL( os, call ) \
40+
(os) << "DBG: " << __FILE__ << "(" << __LINE__ << ") " \
41+
<< "CALL BEGIN: " << #call << std::endl; \
42+
call; \
43+
(os) << "DBG: " << __FILE__ << "(" << __LINE__ << ") " \
44+
<< "CALL END: " << #call << std::endl
45+
3046
#else
3147

3248
#define DBG_EXCEPTION( os, ex )
@@ -35,10 +51,16 @@
3551

3652
#define DBGMSG( os, msg )
3753

54+
#define DBG_STREAM_MSG( os, msg )
55+
3856
#define DBG_FUNC_BEGIN( os )
3957

4058
#define DBG_FUNC_END( os )
4159

42-
#define DBG_FUNC_CALL( os, call ) call;
60+
#define DBG_FUNC_CALL( os, call ) call
61+
62+
#define DBG_COND_FUNC_CALL( os, call )
63+
64+
#endif
4365

4466
#endif

exaudfclient/base/exaudflib/exaudflib.cc

+106-97
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ void init_socket_name(const char* the_socket_name) {
9393

9494
static void external_process_check()
9595
{
96-
DBGVAR(cerr,&(socket_name_file));
9796
if (remote_client) return;
9897
if (::access(socket_name_file, F_OK) != 0) {
9998
::sleep(1); // give me a chance to die with my parent process
@@ -106,7 +105,30 @@ static void external_process_check()
106105
}
107106
}
108107

108+
static int first_ppid=-1;
109109

110+
void check_parent_pid(){
111+
int new_ppid=::getppid();
112+
if(first_ppid==-1){ // Initialize first_ppid
113+
first_ppid=new_ppid;
114+
}
115+
// Check if ppid has changed, if client is in own namespace,
116+
// the ppid will be forever 0 and never change.
117+
// If the client runs as udfplugin the ppid will point to the exasql process
118+
// and will change if it gets killed. Then client gets an orphaned process and
119+
// will be adopted by another process
120+
if(first_ppid!=new_ppid){
121+
::sleep(1); // give me a chance to die with my parent process
122+
cerr << "exaudfclient aborting " << socket_name_str << " ... current pid_sum " << new_ppid << " different to first_pid_sum " << first_ppid << "." << endl;
123+
#ifdef SWIGVM_LOG_CLIENT
124+
cerr << "### SWIGVM aborting with name '" << socket_name_str
125+
<< "' (" << ::getppid() << ',' << ::getpid() << ')' << endl;
126+
#endif
127+
::unlink(socket_name_file);
128+
::abort();
129+
}
130+
131+
}
110132

111133
void set_remote_client(bool value) {
112134
remote_client = value;
@@ -123,6 +145,7 @@ void *check_thread_routine(void* data)
123145
{
124146
while(keep_checking) {
125147
external_process_check();
148+
check_parent_pid();
126149
::usleep(100000);
127150
}
128151
return NULL;
@@ -142,10 +165,36 @@ void cancel_check_thread() {
142165
::pthread_cancel(check_thread);
143166
}
144167

168+
void print_args(int argc,char**argv){
169+
for (int i = 0; i<argc; i++)
170+
{
171+
cerr << "zmqcontainerclient argv[" << i << "] = " << argv[i] << endl;
172+
}
173+
}
174+
175+
176+
void delete_vm(SWIGVM*& vm){
177+
if (vm != nullptr)
178+
{
179+
delete vm;
180+
vm = nullptr;
181+
}
182+
}
183+
184+
void stop_all(zmq::socket_t& socket){
185+
socket.close();
186+
stop_check_thread();
187+
if (!get_remote_client()) {
188+
cancel_check_thread();
189+
::unlink(socket_name_file);
190+
} else {
191+
::sleep(3); // give other components time to shutdown
192+
}
193+
}
194+
145195
mutex zmq_socket_mutex;
146196
static bool use_zmq_socket_locks = false;
147197

148-
149198
void socket_send(zmq::socket_t &socket, zmq::message_t &zmsg)
150199
{
151200
DBG_FUNC_BEGIN(std::cerr);
@@ -1451,6 +1500,7 @@ class SWIGResultHandler_Impl : public SWIGRAbstractResultHandler, SWIGGeneralIte
14511500

14521501
} // namespace SWIGVMContainers
14531502

1503+
14541504
extern "C" {
14551505

14561506
SWIGVMContainers::SWIGMetadata* create_SWIGMetaData() {
@@ -1465,8 +1515,6 @@ SWIGVMContainers::SWIGRAbstractResultHandler* create_SWIGResultHandler(SWIGVMCon
14651515
return new SWIGVMContainers::SWIGResultHandler_Impl(table_iterator);
14661516
}
14671517

1468-
1469-
14701518
int exaudfclient_main(std::function<SWIGVM*()>vmMaker,int argc,char**argv)
14711519
{
14721520
assert(SWIGVM_params_ref != nullptr);
@@ -1485,12 +1533,7 @@ int exaudfclient_main(std::function<SWIGVM*()>vmMaker,int argc,char**argv)
14851533

14861534
zmq::context_t context(1);
14871535

1488-
#ifdef SWIGVM_LOG_CLIENT
1489-
for (int i = 0; i<argc; i++)
1490-
{
1491-
cerr << "zmqcontainerclient argv[" << i << "] = " << argv[i] << endl;
1492-
}
1493-
#endif
1536+
DBG_COND_FUNC_CALL(cerr, print_args(argc,argv));
14941537

14951538
if (socket_name.length() > 4 ) {
14961539
#ifdef PROTEGRITY_PLUGIN_CLIENT
@@ -1531,12 +1574,7 @@ int exaudfclient_main(std::function<SWIGVM*()>vmMaker,int argc,char**argv)
15311574
socket_name_file = &(socket_name_file[6]);
15321575
}
15331576

1534-
#ifdef SWIGVM_LOG_CLIENT
1535-
cerr << "### SWIGVM starting " << argv[0] << " with name '" << socket_name
1536-
<< " (" << ::getppid() << ',' << ::getpid() << "): '"
1537-
<< argv[1]
1538-
<< '\'' << endl;
1539-
#endif
1577+
DBG_STREAM_MSG(cerr,"### SWIGVM starting " << argv[0] << " with name '" << socket_name << " (" << ::getppid() << ',' << ::getpid() << "): '" << argv[1] << '\'');
15401578

15411579
start_check_thread();
15421580

@@ -1551,6 +1589,7 @@ int exaudfclient_main(std::function<SWIGVM*()>vmMaker,int argc,char**argv)
15511589
}
15521590

15531591
reinit:
1592+
15541593
DBGMSG(cerr,"Reinit");
15551594
zmq::socket_t socket(context, ZMQ_REQ);
15561595

@@ -1564,12 +1603,15 @@ int exaudfclient_main(std::function<SWIGVM*()>vmMaker,int argc,char**argv)
15641603
SWIGVM_params_ref->sock = &socket;
15651604
SWIGVM_params_ref->exch = &exchandler;
15661605

1606+
SWIGVM*vm=nullptr;
1607+
15671608
if (!send_init(socket, socket_name)) {
15681609
if (!get_remote_client() && exchandler.exthrowed) {
15691610
send_close(socket, exchandler.exmsg);
1570-
return 1;
1611+
goto error;
1612+
}else{
1613+
goto reinit;
15711614
}
1572-
goto reinit;
15731615
}
15741616

15751617
SWIGVM_params_ref->dbname = (char*) g_database_name.c_str();
@@ -1587,13 +1629,11 @@ int exaudfclient_main(std::function<SWIGVM*()>vmMaker,int argc,char**argv)
15871629
SWIGVM_params_ref->vm_id = g_vm_id;
15881630
SWIGVM_params_ref->singleCallMode = g_singleCallMode;
15891631

1590-
SWIGVM*vm=nullptr;
1591-
15921632
try {
15931633
vm = vmMaker();
15941634
if (vm == nullptr) {
15951635
send_close(socket, "Unknown or unsupported VM type");
1596-
return 1;
1636+
goto error;
15971637
}
15981638
if (vm->exception_msg.size()>0) {
15991639
throw SWIGVM::exception(vm->exception_msg.c_str());
@@ -1608,49 +1648,49 @@ int exaudfclient_main(std::function<SWIGVM*()>vmMaker,int argc,char**argv)
16081648
// EXASolution responds with a CALL message that specifies
16091649
// the single call function to be made
16101650
if (!send_run(socket)) {
1611-
break;
1612-
}
1651+
break;
1652+
}
1653+
16131654
assert(g_singleCallFunction != single_call_function_id_e::SC_FN_NIL);
1614-
try {
1615-
const char* result = nullptr;
1655+
try {
1656+
const char* result = nullptr;
16161657
switch (g_singleCallFunction)
16171658
{
1618-
case single_call_function_id_e::SC_FN_NIL:
1619-
break;
1620-
case single_call_function_id_e::SC_FN_DEFAULT_OUTPUT_COLUMNS:
1621-
result = vm->singleCall(g_singleCallFunction,noArg);
1622-
break;
1623-
case single_call_function_id_e::SC_FN_GENERATE_SQL_FOR_IMPORT_SPEC:
1624-
assert(!g_singleCall_ImportSpecificationArg.isEmpty());
1625-
result = vm->singleCall(g_singleCallFunction,g_singleCall_ImportSpecificationArg);
1626-
g_singleCall_ImportSpecificationArg = ExecutionGraph::ImportSpecification(); // delete the last argument
1627-
break;
1628-
case single_call_function_id_e::SC_FN_GENERATE_SQL_FOR_EXPORT_SPEC:
1629-
assert(!g_singleCall_ExportSpecificationArg.isEmpty());
1630-
result = vm->singleCall(g_singleCallFunction,g_singleCall_ExportSpecificationArg);
1631-
g_singleCall_ExportSpecificationArg = ExecutionGraph::ExportSpecification(); // delete the last argument
1632-
break;
1633-
case single_call_function_id_e::SC_FN_VIRTUAL_SCHEMA_ADAPTER_CALL:
1634-
assert(!g_singleCall_StringArg.isEmpty());
1635-
result = vm->singleCall(g_singleCallFunction,g_singleCall_StringArg);
1636-
break;
1659+
case single_call_function_id_e::SC_FN_NIL:
1660+
break;
1661+
case single_call_function_id_e::SC_FN_DEFAULT_OUTPUT_COLUMNS:
1662+
result = vm->singleCall(g_singleCallFunction,noArg);
1663+
break;
1664+
case single_call_function_id_e::SC_FN_GENERATE_SQL_FOR_IMPORT_SPEC:
1665+
assert(!g_singleCall_ImportSpecificationArg.isEmpty());
1666+
result = vm->singleCall(g_singleCallFunction,g_singleCall_ImportSpecificationArg);
1667+
g_singleCall_ImportSpecificationArg = ExecutionGraph::ImportSpecification(); // delete the last argument
1668+
break;
1669+
case single_call_function_id_e::SC_FN_GENERATE_SQL_FOR_EXPORT_SPEC:
1670+
assert(!g_singleCall_ExportSpecificationArg.isEmpty());
1671+
result = vm->singleCall(g_singleCallFunction,g_singleCall_ExportSpecificationArg);
1672+
g_singleCall_ExportSpecificationArg = ExecutionGraph::ExportSpecification(); // delete the last argument
1673+
break;
1674+
case single_call_function_id_e::SC_FN_VIRTUAL_SCHEMA_ADAPTER_CALL:
1675+
assert(!g_singleCall_StringArg.isEmpty());
1676+
result = vm->singleCall(g_singleCallFunction,g_singleCall_StringArg);
1677+
break;
16371678
}
16381679
if (vm->exception_msg.size()>0) {
1639-
send_close(socket, vm->exception_msg); socket.close();
1680+
send_close(socket, vm->exception_msg);
16401681
goto error;
16411682
}
16421683

16431684
if (vm->calledUndefinedSingleCall.size()>0) {
1644-
send_undefined_call(socket, vm->calledUndefinedSingleCall);
1645-
} else {
1646-
send_return(socket,result);
1647-
}
1685+
send_undefined_call(socket, vm->calledUndefinedSingleCall);
1686+
} else {
1687+
send_return(socket,result);
1688+
}
16481689

16491690
if (!send_done(socket)) {
16501691
break;
16511692
}
1652-
} catch(...) {
1653-
}
1693+
} catch(...) {}
16541694
}
16551695
} else {
16561696
for(;;) {
@@ -1660,7 +1700,7 @@ int exaudfclient_main(std::function<SWIGVM*()>vmMaker,int argc,char**argv)
16601700
while(!vm->run_())
16611701
{
16621702
if (vm->exception_msg.size()>0) {
1663-
send_close(socket, vm->exception_msg); socket.close();
1703+
send_close(socket, vm->exception_msg);
16641704
goto error;
16651705
}
16661706
}
@@ -1673,66 +1713,35 @@ int exaudfclient_main(std::function<SWIGVM*()>vmMaker,int argc,char**argv)
16731713
{
16741714
vm->shutdown();
16751715
if (vm->exception_msg.size()>0) {
1676-
send_close(socket, vm->exception_msg); socket.close();
1716+
send_close(socket, vm->exception_msg);
16771717
goto error;
16781718
}
1679-
delete vm;
1680-
vm = NULL;
16811719
}
16821720
send_finished(socket);
16831721
} catch (SWIGVM::exception &err) {
1684-
keep_checking = false;
1685-
send_close(socket, err.what()); socket.close();
1686-
1687-
#ifdef SWIGVM_LOG_CLIENT
1688-
cerr << "### SWIGVM crashing with name '" << socket_name
1689-
<< " (" << ::getppid() << ',' << ::getpid() << "): " << err.what() << endl;
1690-
#endif
1722+
DBG_STREAM_MSG(cerr,"### SWIGVM crashing with name '" << socket_name << " (" << ::getppid() << ',' << ::getpid() << "): " << err.what());
1723+
send_close(socket, err.what());
16911724
goto error;
16921725
} catch (std::exception &err) {
1693-
send_close(socket, err.what()); socket.close();
1694-
#ifdef SWIGVM_LOG_CLIENT
1695-
cerr << "### SWIGVM crashing with name '" << socket_name
1696-
<< " (" << ::getppid() << ',' << ::getpid() << "): " << err.what() << endl;
1697-
#endif
1726+
DBG_STREAM_MSG(cerr,"### SWIGVM crashing with name '" << socket_name << " (" << ::getppid() << ',' << ::getpid() << "): " << err.what());
1727+
send_close(socket, err.what());
16981728
goto error;
16991729
} catch (...) {
1700-
send_close(socket, "Internal/Unknown error"); socket.close();
1701-
#ifdef SWIGVM_LOG_CLIENT
1702-
cerr << "### SWIGVM crashing with name '" << socket_name
1703-
<< " (" << ::getppid() << ',' << ::getpid() << ')' << endl;
1704-
#endif
1730+
DBG_STREAM_MSG(cerr,"### SWIGVM crashing with name '" << socket_name << " (" << ::getppid() << ',' << ::getpid() << ')');
1731+
send_close(socket, "Internal/Unknown error");
17051732
goto error;
17061733
}
17071734

1708-
#ifdef SWIGVM_LOG_CLIENT
1709-
cerr << "### SWIGVM finishing with name '" << socket_name
1710-
<< " (" << ::getppid() << ',' << ::getpid() << ')' << endl;
1711-
#endif
1712-
stop_check_thread();
1713-
socket.close();
1714-
if (!get_remote_client()) {
1715-
cancel_check_thread();
1716-
::unlink(socket_name_file);
1717-
}
1735+
1736+
DBG_STREAM_MSG(cerr,"### SWIGVM finishing with name '" << socket_name << " (" << ::getppid() << ',' << ::getpid() << ')');
1737+
1738+
delete_vm(vm);
1739+
stop_all(socket);
17181740
return 0;
17191741

17201742
error:
1721-
keep_checking = false;
1722-
if (vm != NULL)
1723-
{
1724-
vm->shutdown();
1725-
delete vm;
1726-
vm = NULL;
1727-
}
1728-
1729-
socket.close();
1730-
if (!get_remote_client()) {
1731-
cancel_check_thread();
1732-
::unlink(socket_name_file);
1733-
} else {
1734-
::sleep(3); // give other components time to shutdown
1735-
}
1743+
delete_vm(vm);
1744+
stop_all(socket);
17361745
return 1;
17371746
}
17381747

0 commit comments

Comments
 (0)