polardbxengine/storage/ndb/mcc/remote_clusterhost.py

659 lines
32 KiB
Python

# Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Provides specialization of ABClusterHost for remote hosts using Paramiko."""
import errno
import stat
import time
import ntpath
import logging
import os.path
import socket
import contextlib
import posixpath
import threading
import paramiko
import util
from util import bcolors
import clusterhost
from clusterhost import ABClusterHost
CONN_LOCK = threading.Lock()
RUNTIME = {} # Dictionary which will never change top-level,
RUNTIME["CON_AR"] = [] # so no need to mark this as global.
_logger = logging.getLogger(__name__)
def quote_if_contains_space(s):
"""-"""
if ' ' in s:
return '"'+s+'"'
return s
class RemoteExecException(clusterhost.ExecException):
"""Exception type thrown whenever os-command execution fails on
a remote host. """
def __init__(self, hostname, cmd, exitstatus, out):
self.hostname = hostname
self.cmd = cmd
self.exitstatus = exitstatus
self.out = out.read()
def __str__(self):
return 'Command `{self.cmd}\', running on {self.hostname} exited with \
{self.exitstatus}:{self.out}'.format(self=self)
def cleanup_connections():
"""Purge array of established SSH connections."""
global CONN_LOCK
_logger.warning(bcolors.WARNING + "RCH--> Cleaning up connArray." + bcolors.ENDC)
with CONN_LOCK:
if len(RUNTIME["CON_AR"]) >= 1:
try:
for x in range(len(RUNTIME["CON_AR"])):
if RUNTIME["CON_AR"][x] is not None:
RUNTIME["CON_AR"][x].close()
except (paramiko.SSHException, IndexError): # Nothing. It is closed.
_logger.warning(bcolors.WARNING + "RCH--> SSHExc or IndexError" + bcolors.ENDC)
del RUNTIME["CON_AR"][:]
def delete_connection(host):
"""Remove SSH connection from array of active connections."""
global CONN_LOCK
_logger.warning(bcolors.WARNING + "\nRCH--> Deleting remote conn. to %s." + bcolors.ENDC, host)
with CONN_LOCK:
_logger.debug("\nRCH--> Deleting remote conn. to %s, Len ConnArr is %s.", host,
len(RUNTIME["CON_AR"]))
if len(RUNTIME["CON_AR"]) >= 1:
try:
RUNTIME["CON_AR"][:] = [x for x in RUNTIME["CON_AR"] if not x.host == host]
RUNTIME["CON_AR"] = [x for x in RUNTIME["CON_AR"] if x != '']
except IndexError: # Nothing. It is closed.
_logger.warning(bcolors.WARNING + "RCH--> IndexError" + bcolors.ENDC)
wrn_msg = "\nRCH--> Deleting remote conn. to %s, NEW Len ConnArr is %s."
_logger.warning(bcolors.WARNING + wrn_msg + bcolors.ENDC, host, len(RUNTIME["CON_AR"]))
def check_connected(sshcl):
"""Check if SSH connection is newly/still connected."""
if sshcl is not None:
_logger.debug("RCH--> Is %s connected?", sshcl.host)
try:
transport = sshcl.get_transport()
if transport is not None:
transport.send_ignore()
_logger.debug("RCH--> %s is connected", sshcl.host)
return True
return False
except Exception:
_logger.debug("RCH--> NOT connected")
return False
else:
_logger.debug("RCH--> sshcl1 was None")
return False
def list_remote_hosts():
"""Provide FE with list of hosts we're permanently connected to."""
_logger.debug("RCH--> Listing hosts in connArray.")
host_arr = []
if len(RUNTIME["CON_AR"]) < 1:
return host_arr #Do not wait.
try:
for x in range(len(RUNTIME["CON_AR"])):
if RUNTIME["CON_AR"][x] is not None:
host_arr.append(RUNTIME["CON_AR"][x].host)
except IndexError:
_logger.warning(bcolors.WARNING + "RCH--> IndexError" + bcolors.ENDC)
# Let the calling code sort out what to do next depending on configuration.
return host_arr
def add_permconn(host):
"""Add new permanent SSH connection to remote host to array."""
global CONN_LOCK
with CONN_LOCK:
RUNTIME["CON_AR"].append(paramiko.SSHClient())
x = len(RUNTIME["CON_AR"])-1
_logger.debug('\nRCH--> Adding perm. conn. %s for Host=%s', x, host)
RUNTIME["CON_AR"][x].host = host
RUNTIME["CON_AR"][x].system = None
RUNTIME["CON_AR"][x].processor = None
RUNTIME["CON_AR"][x].osver = None
RUNTIME["CON_AR"][x].osflavor = None
RUNTIME["CON_AR"][x].load_system_host_keys()
RUNTIME["CON_AR"][x].set_missing_host_key_policy(paramiko.AutoAddPolicy())
_logger.debug('\nRCH--> Opening new permanent conn. %s to host %s.', x,
RUNTIME["CON_AR"][x].host)
#Not really necessary to return index as we use Host member to synchronize.
return x
def connect_permconn(host, user, pwd, pkey, keybased, index):
"""Connect permanent SSH connection to remote host."""
# JSON decoder can not handle below errors since they are tuples, not K/W strings. I.e.
# TypeError: key ('129.146.115.208', 22) is not a string. Must find batter way to handle,
# either here or in request_handler #1761.
# The error we're most concerned with here is:
# NoValidConnectionsError: [Errno None] Unable to connect to port 22 on 129.146.115.208
# First see if there is SSH service to connect socket with.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((host, 22))
# we need socket we can manipulate in SSHClient object, this is just a test so close.
sock.close()
if result > 0:
_logger.error(bcolors.FAIL + "\nRCH--> No SSH service at %s." + bcolors.ENDC, host)
return False
if keybased is False:
try:
_logger.debug('RCH--> ' + "PERMConnEnter NO KEY, NO PASSWORD")
RUNTIME["CON_AR"][index].connect(hostname=host, username=user, password=pwd, pkey=None,
look_for_keys=False, banner_timeout=4, timeout=5)
return True
except paramiko.ssh_exception.BadAuthenticationType as e:
_logger.error(bcolors.FAIL + "\nRCH--> Unsupported authentication type." + bcolors.ENDC)
except paramiko.ssh_exception.AuthenticationException as e:
_logger.error(bcolors.FAIL + "\nRCH--> Bad credentials provided" + bcolors.ENDC)
except paramiko.ssh_exception.NoValidConnectionsError:
_logger.error(bcolors.FAIL + "\nRCH--> No valid connections..." +bcolors.ENDC)
except paramiko.ssh_exception.SSHException as e: # any other exception
_logger.error(bcolors.FAIL + "\nRCH--> %s" + bcolors.ENDC, e.message)
except socket.error:
_logger.error(bcolors.FAIL + "\nRCH--> Socket error." +bcolors.ENDC)
else:
# We have key file pointed out for us.
privatekeyfile = pkey or os.path.expanduser('~/.ssh/id_rsa')
# Is it valid file?
if not os.path.isfile(privatekeyfile):
_logger.error(bcolors.FAIL + "RCH--> %s doesn't exist!" + bcolors.ENDC, privatekeyfile)
return False
try:
mykey = paramiko.RSAKey.from_private_key_file(privatekeyfile, password=pwd)
except paramiko.ssh_exception.SSHException as e:
_logger.error(bcolors.FAIL + "\nRCH--> %s" + bcolors.ENDC, e.message)
return False
try:
RUNTIME["CON_AR"][index].connect(hostname=host, username=user, password=pwd, pkey=mykey,
look_for_keys=False, banner_timeout=4, timeout=5)
return True
except paramiko.ssh_exception.BadAuthenticationType as e:
_logger.error(bcolors.FAIL + "\nRCH--> Unsupported authentication type." + bcolors.ENDC)
except paramiko.ssh_exception.AuthenticationException as e:
_logger.error(bcolors.FAIL + "\nRCH--> Bad credentials provided" + bcolors.ENDC)
except paramiko.ssh_exception.NoValidConnectionsError:
_logger.error(bcolors.FAIL + "\nRCH--> No valid connections..." +bcolors.ENDC)
except paramiko.ssh_exception.SSHException as e: # any other exception
_logger.error(bcolors.FAIL + "\nRCH--> %s" + bcolors.ENDC, e.message)
except socket.error:
_logger.error(bcolors.FAIL + "\nRCH--> Socket error." +bcolors.ENDC)
return False
class RemoteClusterHost(ABClusterHost):
"""Implements the ABClusterHost interface for remote hosts. Wraps a paramiko.SSHClient and uses
this to perform tasks on the remote host."""
def __init__(self, host, key_based=None, username=None, password=None, key_file=None):
super(type(self), self).__init__()
self.host = host
_logger.debug('RCH--> ' + "Host="+host)
self.user = username
# if password is not None:
# self.pwd = password
# else:
# self.pwd = None
self.pwd = password
_logger.debug('RCH--> ' + "Set self.pwd")
# Create new or reuse existing connection:
ConIdx = -1
if RUNTIME["CON_AR"]:
for x in range(len(RUNTIME["CON_AR"])):
if hasattr(RUNTIME["CON_AR"][x], "host"):
if RUNTIME["CON_AR"][x].host == host:
_logger.debug('RCH--> Reusing PERMConn[%s].', x)
ConIdx = x
if check_connected(RUNTIME["CON_AR"][ConIdx]):
try:
self.__client = RUNTIME["CON_AR"][ConIdx]
self.__sftp = RUNTIME["CON_AR"][ConIdx].open_sftp()
_logger.debug('RCH--> PERMConn[%s] is alive.', ConIdx)
except Exception:
# We definitely have failed connect attempt.
_logger.warning(bcolors.WARNING + "\nRCH--> PERMConn[%s] is "
"NOT alive." + bcolors.ENDC, ConIdx)
# Then try getting new one.
delete_connection(self.host)
ConIdx = add_permconn(self.host)
if connect_permconn(self.host, self.user, self.pwd, key_file,
key_based, ConIdx):
if check_connected(RUNTIME["CON_AR"][ConIdx]):
self.__client = RUNTIME["CON_AR"][ConIdx]
self.__sftp = RUNTIME["CON_AR"][ConIdx].open_sftp()
else:
# delete dead connection as we can not confirm it's alive
delete_connection(self.host)
else:
# delete dead connection as there was error connecting
delete_connection(self.host)
else:
_logger.debug("\nRCH--> PERMConn[%s] is not alive. Recreating.", ConIdx)
delete_connection(self.host)
ConIdx = add_permconn(self.host)
_logger.debug("RCH--> New PERMConn[%s] created.", ConIdx)
if connect_permconn(self.host, self.user, self.pwd, key_file,
key_based, ConIdx):
if check_connected(RUNTIME["CON_AR"][ConIdx]):
self.__client = RUNTIME["CON_AR"][ConIdx]
self.__sftp = RUNTIME["CON_AR"][ConIdx].open_sftp()
else:
# delete dead connection as we can not confirm it's alive
delete_connection(self.host)
else:
# delete dead connection as there was error connecting
delete_connection(self.host)
break
if not RUNTIME["CON_AR"] or ConIdx <= -1:
#ConnArr is empty OR no connection found for host. Add new in protected code.
ConIdx = add_permconn(self.host)
if connect_permconn(self.host, self.user, self.pwd, key_file,
key_based, ConIdx):
if check_connected(RUNTIME["CON_AR"][ConIdx]):
self.__client = RUNTIME["CON_AR"][ConIdx]
self.__sftp = RUNTIME["CON_AR"][ConIdx].open_sftp()
_logger.debug('\nRCH--> ' + "PERMConn[%s] is alive (new conn).", ConIdx)
else:
# delete dead connection as we can not confirm it's alive
_logger.warning(bcolors.WARNING + "\nRCH--> PERMConn[%s] is NOT alive "
"(new conn)." + bcolors.ENDC, ConIdx)
delete_connection(self.host)
raise Exception("No valid connection!")
else:
# delete dead connection as there was error connecting
_logger.warning(bcolors.WARNING + "\nRCH--> PERMConn[%s] is NOT alive (new conn)."
+ bcolors.ENDC, ConIdx)
delete_connection(self.host)
raise Exception("No valid connection!")
self.system = None
self.processor = None
self.osver = None
self.osflavor = None
def close(self):
"""-"""
self.drop()
@property
def client(self):
"""Returning Paramiko.SSHClient()"""
return self.__client
@property
def sftp(self):
"""Returning Paramiko.SSHClient().sftp connection"""
return self.__sftp
def _get_system_tuple(self):
_logger.debug('RCH--> remote_ch, system tuple')
if RUNTIME["CON_AR"]:
for x in range(len(RUNTIME["CON_AR"])):
a = hasattr(RUNTIME["CON_AR"][x], "host") and RUNTIME["CON_AR"][x].host == self.host
hlp_str = hasattr(RUNTIME["CON_AR"][x], "system") and RUNTIME["CON_AR"][x].system
if a and hlp_str:
_logger.debug("RCH--> Reusing system tuple.")
return (RUNTIME["CON_AR"][x].system, RUNTIME["CON_AR"][x].processor, \
RUNTIME["CON_AR"][x].osver, RUNTIME["CON_AR"][x].osflavor)
preamble = None
system = None
processor = None
osver = None
osflavor = None
hlp_str = None
try:
preamble = self.exec_blocking(['uname', '-sp']) #9009 on Win10
except RemoteExecException:
_logger.warning(bcolors.WARNING + '\nRCH--> executing uname failed - assuming '
'Windows.' + bcolors.ENDC)
system = "Windows"
res = self.exec_blocking(['cmd.exe', '/c', 'wmic', 'os', 'get',
'Caption,OSArchitecture,Version', '/value']).split('\r')
for t in res:
if t.strip() != "":
if t.find("Caption") != -1:
osflavor = t.split("Caption=")[1]
elif t.find("OSArchitecture") != -1:
processor = t.split("OSArchitecture=")[1]
if processor.find("64") != -1:
processor = "AMD64"
elif t.find("Version") != -1:
osver = t.split("Version=")[1].split(".")[0]
else: # no error executing uname -sp
uname = preamble
_logger.debug('RCH--> uname=%s', uname)
(system, processor) = uname.split(' ')
# When processing output of CAT looking for specific key, this is not relevant.(Linux)
if 'CYGWIN' in system:
system = 'CYGWIN'
elif 'Darwin' in system:
system = "Darwin"
osver = str(self.exec_blocking(['uname', '-r']))
osflavor = "MacOSX"
elif "SunOS" in system:
system = "SunOS"
osver = str(self.exec_blocking(['uname', '-v']))
osflavor = "Solaris"
elif "Linux" in system:
system = "Linux"
# I assume all Linux flavors will have /etc/os-release file.
if self.file_exists('/etc/os-release'):
hlp_str = self.exec_blocking(['cat', '/etc/os-release'])
matched_lines = [line for line in hlp_str.split('\n') if "ID=" in line]
hlp = (str(matched_lines[0]).split("ID=", 1)[1]).strip('"')
osflavor = hlp
matched_lines = [line for line in hlp_str.split('\n') if "VERSION_ID=" in line]
hlp = (str(matched_lines[0]).split("VERSION_ID=", 1)[1]).strip('"')
osver = hlp
else:
#Bail out, no file
_logger.warning(bcolors.WARNING + '\nRCH--> OS version (Linux) does not '
'have /etc/os-release file!' + bcolors.ENDC)
_logger.debug('\nRCH--> remote_ch, system tuple, returning')
if RUNTIME["CON_AR"]:
for x in range(len(RUNTIME["CON_AR"])):
if hasattr(RUNTIME["CON_AR"][x], "host") and RUNTIME["CON_AR"][x].host == self.host:
RUNTIME["CON_AR"][x].system = system
RUNTIME["CON_AR"][x].processor = processor.strip()
RUNTIME["CON_AR"][x].osver = osver
RUNTIME["CON_AR"][x].osflavor = osflavor
return (system, processor.strip(), osver, osflavor)
def _exec_pkg_cmdv(self, cmdv):
"""For remote hosts the binary is fist copied over using sftp."""
_logger.debug('RCH--> remote_ch, exec pkg')
hi = os.path.basename(cmdv[0])
self.sftp.put(cmdv[0], hi)
self.sftp.chmod(hi, stat.S_IRWXU)
return self.exec_cmdv([self.path_module.join('.', hi)] + cmdv[1:-1])
def _execfast(self, cmdv):
return self._exec_cmdv(cmdv, None, None)
def _sftpify(self, path):
"""Since sftp treats all path names as relative to its root we must convert absolute paths
before using them with sftp. As quick-fix we assume that the sftp root is equal to the drive
letter of all absolute paths used. If it isn't the sftp operations will fail."""
# self.path_module.splitdrive(path)[1] essentially does NOTHING on *NIX and messes up Win.
return path
def open(self, filename, mode='r'):
"""Forward to paramiko.SFTPClient.open for remote hosts. Wrap in contextlib.closing so that
clients can use with-statements on it."""
_logger.debug('RCH--> remote_ch, open')
return contextlib.closing(self.sftp.open(self._sftpify(filename), mode))
def drop(self, paths=[]):
"""Close open connections and remove files.
paths - list of files to remove from host before closing connection
"""
_logger.debug('RCH--> remote_ch, drop')
map(self.rm_r1, paths)
def get(self, filename, localname):
"""Get a file on ABClusterHost and save to ~/.mcc."""
#File names are complete with path.
_logger.warning(bcolors.WARNING + "RCH--> sftp-ing log file" + bcolors.ENDC)
self.sftp.get(filename, localname)
def put(self, localname, filename):
"""Put file on ABClusterHost."""
#File names are complete with path.
_logger.warning(bcolors.WARNING + "RCH--> sftp-ing file to remote host" + bcolors.ENDC)
self.sftp.put(localname, filename)
def file_exists(self, path):
"""Test for the existence of a file on the remote host. If the file actually exists, its
stat object is returned, otherwise None.
path - file to check the existence of
"""
_logger.debug('\nRCH--> remote_ch, file exists, %s', path)
try:
return self.sftp.stat(self._sftpify(path))
except IOError as ioerr:
_logger.warning(bcolors.WARNING + '\nRCH--> remote_ch file_exists, exception, %s' +
bcolors.ENDC, path)
if ioerr.errno == errno.ENOENT:
log_what = (bcolors.WARNING + '\nRCH--> remote_ch file_exists, exception, %s, '
'returning NONE.' + bcolors.ENDC)
_logger.warning(log_what, path)
return None
_logger.warning(bcolors.WARNING + '\nRCH--> stat failure on %s' + bcolors.ENDC, path)
raise
def list_dir(self, path):
"""List the files in a directory on the remote host. Forwards to SFTPClient.listdir(), but
also warns about empty results that may be caused by paramiko not reporting missing execute
permission on the directory correctly.
path - directory to list
"""
_logger.debug('\nRCH--> ' + 'remote_ch, list dir, %s', path)
content = self.sftp.listdir(self._sftpify(path))
if not content:
m = stat.S_IMODE(self.sftp.stat(path).st_mode)
for role in ['USR', 'GRP', 'OTH']:
mask = util.get_fmask('R', role)|util.get_fmask('X', role)
if (m & mask) != mask:
log_what = ('\nRCH--> Directory %s does not have both read and execute '
'permission for %s.\nIf you depend on %s for access, the empty '
'directory listing may not be correct')
_logger.debug(log_what, path, role, role)
return content
def mkdir_p(self, path):
"""Provides mkdir -p type functionality on the remote host. That is, all missing parent
directories are also created. If the directory we are trying to create already exists, we
do nothing. If path or any of its parents is not a directory an exception is raised.
path - directory to create on remote host
"""
_logger.debug('\nRCH--> remote_ch, mkdir_p(%s)', path)
path = self._sftpify(path)
pa = self.file_exists(path)
if pa != None:
if not util.is_dir(pa):
raise Exception(self.host+':'+path+' is not a directory')
return
# Need to user normpath here since dirname of a directory with a trailing slash
# is the directory without a slash (a dirname bug?)
sd = ntpath.splitdrive(path)
_logger.debug('\nRCH--> sd=%s', sd)
if sd[1] == '':
_logger.warning(bcolors.WARNING + '\nRCH--> path=%s is a drive letter. Returning...' +
bcolors.ENDC, path)
return
np = self.path_module.normpath(path)
parent = self.path_module.dirname(np)
assert parent != path
_logger.debug('\nRCH--> remote_ch, making dir %s', path)
self.mkdir_p(parent)
self.sftp.mkdir(np)
def rm_r1(self, path):
"""Provides rm -r type functionality on the remote host. That is, all
files and directories are removed recursively.
path - file or directory to remove
needed for self.drop()...
"""
path = self._sftpify(path)
if util.is_dir(self.sftp.stat(path)):
for f in self.sftp.listdir(path):
self.rm_r1(posixpath.join(path, f))
self.sftp.rmdir(path)
else:
self.sftp.remove(path)
def rm_r(self, path):
"""Provides rm -r type functionality on the remote host. That is, all
files and directories are removed recursively.
path - file or directory to remove
"""
_logger.debug('\nRCH--> remote_ch, deleting %s', path)
try:
self.exec_blocking(['uname', '-sp']) #9009 on Win10
except RemoteExecException:
# return self._exec_cmdln("rmdir /s " + str(path), {'waitForCompletion': True,
# 'daemonWait': 2}, None)
# unfortunately, some Windows versions miss DEL, some RMTREE and some RMDIR, on
# some not in path etc...
return self.rm_r1(path)
else:
return self._exec_cmdln("rm -Rf " + str(path),
{'waitForCompletion': True, 'daemonWait': 1}, None)
def _exec_cmdln(self, cmdln, procCtrl, stdInFile):
"""Execute an OS command line (as a single string) on the remote host.
cmdln - complete command line of the OS command
procCtrl - procCtrl object from message which controls how the process
is started (blocking vs non-blocking and output reporting)
"""
_logger.debug('\nRCH--> remote_ch, exec cmdln %s', cmdln)
check_connected(self.client)
contents = None
if stdInFile != None:
with self.open(stdInFile) as stdin:
contents = stdin.read()
with contextlib.closing(self.client.get_transport().open_session()) as chan:
chan.set_combine_stderr(True)
_logger.debug('\nRCH--> cmdln=%s', cmdln)
chan.exec_command(cmdln)
if contents != None:
_logger.debug('\nRCH--> Using supplied stdin from %s:', stdInFile)
_logger.debug('\nRCH--> %s ...', contents[0:50])
chan.sendall(contents)
chan.shutdown_write()
if util.get_val(procCtrl, 'waitForCompletion'):
_logger.debug("RCH--> Has waitForCompletion")
output = chan.makefile('rb')
if procCtrl is not None and procCtrl.has_key('daemonWait'):
procDW = procCtrl['daemonWait']
_logger.warning(bcolors.WARNING + '\nRCH--> Waiting for command %s seconds.'
+ bcolors.ENDC, procDW)
time.sleep(procDW)
for t in xrange(180): #Wait MAX 180s for remote execution to return.
time.sleep(0.5)
if chan.exit_status_ready():
_logger.warning(bcolors.WARNING + '\nRCH--> Waited %s sec (on top '
'of %s sec) for %s' + bcolors.ENDC, t/2, procDW, cmdln)
break
if chan.exit_status_ready():
exitstatus = chan.recv_exit_status()
if exitstatus != 0 and exitstatus != util.get_val(procCtrl, 'noRaise'):
raise RemoteExecException(self.host, cmdln, exitstatus, output)
out_ret = output.read()
if exitstatus != 0:
out_ret += ' errcode:' + str(exitstatus)
return out_ret
else:
#Timeout. RemoteExecException will not work as it assumes command completed.
return 'Command `{0}\', running on {1} timed out.'.format(cmdln, self.host)
else:
# Block.
_logger.debug('\nRCH--> Waiting for %s...', cmdln)
exitstatus = chan.recv_exit_status()
if exitstatus != 0 and exitstatus != util.get_val(procCtrl, 'noRaise'):
raise RemoteExecException(self.host, cmdln, exitstatus, output)
out_ret = output.read()
if exitstatus != 0:
out_ret += ' errcode:' + str(exitstatus)
return out_ret
else:
_logger.debug("RCH--> NO waitForCompletion")
dbg_msg = '\nRCH--> chan.exit_status_ready(1) %s'
_logger.debug(dbg_msg, str(chan.exit_status_ready()))
if not chan.exit_status_ready() and procCtrl is not None \
and procCtrl.has_key('daemonWait'):
procDW = procCtrl['daemonWait']
#Let's not waste user's time:
_logger.debug('\nRCH--> Waiting for command %s seconds.', procDW)
time.sleep(procCtrl['daemonWait'])
for t in xrange(180):
time.sleep(0.5)
if chan.exit_status_ready():
_logger.warning(bcolors.WARNING + '\nRCH--> Waited %s sec for %s'
+ bcolors.ENDC, t/2, cmdln)
break
_logger.debug('\nRCH--> chan.exit_status_ready(2) %s',
str(chan.exit_status_ready()))
if chan.exit_status_ready():
output = chan.makefile('rb')
#Strange to return with exception without checking...
exitstatus = chan.recv_exit_status()
# noRaise = 1 exclusively for installing Windows service.
if exitstatus != 0 and exitstatus != util.get_val(procCtrl, 'noRaise'):
raise RemoteExecException(self.host, cmdln, chan.recv_exit_status(), output)
else:
_logger.debug('\nRCH--> remote_ch, exec cmdln, returning')
out_ret = output.read()
if exitstatus != 0:
out_ret += ' errcode:' + str(exitstatus)
return out_ret
else:
# this is for commands that we collect status for differently, say mysqld start
return 'Command `{0}\', running on {1} timed out.'.format(cmdln, self.host)
def _exec_cmdv(self, cmdv, procCtrl, stdin_file):
"""Execute an OS command vector on the remote host.
cmdv - complete command vector (argv) of the OS command
procCtrl - procCtrl object from message which controls how the process
is started (blocking vs non-blocking and output reporting)
"""
_logger.debug('RCH--> remote_ch, exec cmdv')
if isinstance(cmdv, list):
return self._exec_cmdln(' '.join([quote_if_contains_space(a) for a in cmdv]),
procCtrl, stdin_file)
return self._exec_cmdln(cmdv, procCtrl, stdin_file)
def execute_command(self, cmdv, inFile=None):
"""Execute an OS command blocking on the local host, using
subprocess module. Returns dict containing output from process.
cmdv - complete command vector (argv) of the OS command.
inFile - File-like object providing stdin to the command.
"""
cmdln = ' '.join([quote_if_contains_space(a) for a in cmdv])
_logger.debug('\nRCH--> remote_ch, execute command %s', cmdln)
check_connected(self.client)
with contextlib.closing(self.client.get_transport().open_session()) as chan:
chan.exec_command(cmdln)
if inFile:
chan.sendall(inFile.read())
chan.shutdown_write()
result = {
'exitstatus': chan.recv_exit_status()
}
with contextlib.closing(chan.makefile('rb')) as out_file:
result['out'] = out_file.read()
with contextlib.closing(chan.makefile_stderr('rb')) as err_file:
result['err'] = err_file.read(),
return result