Merge pull request #53467 from twangboy/fix_test_win_file

Check valid username first (fixes failing symlink test)
This commit is contained in:
Daniel Wozniak 2019-06-19 10:04:29 -07:00 committed by GitHub
commit fd6cb35093
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 318 additions and 195 deletions

View file

@ -362,11 +362,14 @@ class SaltLoggingClass(six.with_metaclass(LoggingMixInMeta, LOGGING_LOGGER_CLASS
extra = None
# Let's try to make every logging message unicode
salt_system_encoding = __salt_system_encoding__
if salt_system_encoding == 'ascii':
# Encoding detection most likely failed, let's use the utf-8
# value which we defaulted before __salt_system_encoding__ was
# implemented
try:
salt_system_encoding = __salt_system_encoding__
if salt_system_encoding == 'ascii':
# Encoding detection most likely failed, let's use the utf-8
# value which we defaulted before __salt_system_encoding__ was
# implemented
salt_system_encoding = 'utf-8'
except NameError:
salt_system_encoding = 'utf-8'
if isinstance(msg, six.string_types) \

View file

@ -23,7 +23,6 @@ import re
import shutil
import stat
import string
import subprocess
import sys
import tempfile
import time
@ -2759,11 +2758,20 @@ def blockreplace(path,
backup_path = '{0}{1}'.format(path, backup)
shutil.copy2(path, backup_path)
# copy2 does not preserve ownership
check_perms(backup_path,
None,
perms['user'],
perms['group'],
perms['mode'])
if salt.utils.platform.is_windows():
# This function resides in win_file.py and will be available
# on Windows. The local function will be overridden
# pylint: disable=E1120,E1123
check_perms(path=backup_path,
ret=None,
owner=perms['user'])
# pylint: enable=E1120,E1123
else:
check_perms(name=backup_path,
ret=None,
user=perms['user'],
group=perms['group'],
mode=perms['mode'])
# write new content in the file while avoiding partial reads
try:
@ -2774,11 +2782,20 @@ def blockreplace(path,
fh_.close()
# this may have overwritten file attrs
check_perms(path,
None,
perms['user'],
perms['group'],
perms['mode'])
if salt.utils.platform.is_windows():
# This function resides in win_file.py and will be available
# on Windows. The local function will be overridden
# pylint: disable=E1120,E1123
check_perms(path=path,
ret=None,
owner=perms['user'])
# pylint: enable=E1120,E1123
else:
check_perms(path,
ret=None,
user=perms['user'],
group=perms['group'],
mode=perms['mode'])
if show_changes:
return diff
@ -5397,7 +5414,7 @@ def manage_file(name,
# on Windows. The local function will be overridden
# pylint: disable=E1120,E1121,E1123
ret = check_perms(
path=name,
path=name,
ret=ret,
owner=kwargs.get('win_owner'),
grant_perms=kwargs.get('win_perms'),

View file

@ -1161,13 +1161,11 @@ def symlink(src, link):
win32file.CreateSymbolicLink(link, src, int(is_dir))
return True
except win32file.error as exc:
privs = __salt__['cmd.run_stdout']('whoami /priv')
raise CommandExecutionError(
'Could not create \'{0}\' - [{1}] {2}\n{3}'.format(
'Could not create \'{0}\' - [{1}] {2}'.format(
link,
exc.winerror,
exc.strerror,
privs
exc.strerror
)
)

View file

@ -10,9 +10,9 @@ Much of what is here was adapted from the following:
http://stackoverflow.com/questions/29566330
'''
from __future__ import absolute_import, unicode_literals
import os
import collections
import logging
import os
import psutil
import ctypes
@ -21,7 +21,6 @@ from ctypes import wintypes
from salt.ext.six.moves import range
from salt.ext.six.moves import zip
import win32con
import win32con
import win32api
import win32process
@ -598,7 +597,7 @@ class HANDLE_IHV(wintypes.HANDLE):
def errcheck_ihv(result, func, args):
if result.value == INVALID_HANDLE_VALUE:
raise ctypes.WinError()
raise ctypes.WinError(ctypes.get_last_error())
return result.value
@ -608,13 +607,13 @@ class DWORD_IDV(wintypes.DWORD):
def errcheck_idv(result, func, args):
if result.value == INVALID_DWORD_VALUE:
raise ctypes.WinError()
raise ctypes.WinError(ctypes.get_last_error())
return result.value
def errcheck_bool(result, func, args):
if not result:
raise ctypes.WinError()
raise ctypes.WinError(ctypes.get_last_error())
return args
@ -1035,10 +1034,8 @@ def CreateProcessWithTokenW(token,
startupinfo = STARTUPINFO()
if currentdirectory is not None:
currentdirectory = ctypes.create_unicode_buffer(currentdirectory)
if environment:
environment = ctypes.pointer(
environment_string(environment)
)
if environment is not None:
environment = ctypes.pointer(environment_string(environment))
process_info = PROCESS_INFORMATION()
ret = advapi32.CreateProcessWithTokenW(
token,
@ -1174,14 +1171,23 @@ def make_inheritable(token):
)
def CreateProcessWithLogonW(username=None, domain=None, password=None,
logonflags=0, applicationname=None, commandline=None, creationflags=0,
environment=None, currentdirectory=None, startupinfo=None):
def CreateProcessWithLogonW(username=None,
domain=None,
password=None,
logonflags=0,
applicationname=None,
commandline=None,
creationflags=0,
environment=None,
currentdirectory=None,
startupinfo=None):
creationflags |= win32con.CREATE_UNICODE_ENVIRONMENT
if commandline is not None:
commandline = ctypes.create_unicode_buffer(commandline)
if startupinfo is None:
startupinfo = STARTUPINFO()
if environment is not None:
environment = ctypes.pointer(environment_string(environment))
process_info = PROCESS_INFORMATION()
advapi32.CreateProcessWithLogonW(
username,

View file

@ -6343,7 +6343,14 @@ def copy_(name,
# the filesystem we're copying to is squashed or doesn't support chown
# then we shouldn't be checking anything.
if not preserve:
__salt__['file.check_perms'](name, ret, user, group, mode)
if salt.utils.platform.is_windows():
# TODO: Add the other win_* parameters to this function
ret = __salt__['file.check_perms'](
path=name,
ret=ret,
owner=user)
else:
__salt__['file.check_perms'](name, ret, user, group, mode)
except (IOError, OSError):
return _error(
ret, 'Failed to copy "{0}" to "{1}"'.format(source, name))

View file

@ -68,10 +68,9 @@ def __virtual__():
'''
Provides zfs state
'''
if __grains__['zfs_support']:
return __virtualname__
else:
return (False, "The zfs state cannot be loaded: zfs not supported")
if not __grains__.get('zfs_support'):
return False, 'The zfs state cannot be loaded: zfs not supported'
return __virtualname__
def _absent(name, dataset_type, force=False, recursive=False):

View file

@ -88,10 +88,9 @@ def __virtual__():
'''
Provides zpool state
'''
if __grains__['zfs_support']:
return __virtualname__
else:
if not __grains__.get('zfs_support'):
return False, 'The zpool state cannot be loaded: zfs not supported'
return __virtualname__
def _layout_to_vdev(layout, device_dir=None):

View file

@ -67,6 +67,13 @@ def runas(cmdLine, username, password=None, cwd=None):
Commands are run in with the highest level privileges possible for the
account provided.
'''
# Validate the domain and sid exist for the username
username, domain = split_username(username)
try:
_, domain, _ = win32security.LookupAccountName(domain, username)
except pywintypes.error as exc:
message = win32api.FormatMessage(exc.winerror).rstrip('\n')
raise CommandExecutionError(message)
# Elevate the token from the current process
access = (
@ -88,6 +95,7 @@ def runas(cmdLine, username, password=None, cwd=None):
except WindowsError: # pylint: disable=undefined-variable
log.debug("Unable to impersonate SYSTEM user")
impersonation_token = None
win32api.CloseHandle(th)
# Impersonation of the SYSTEM user failed. Fallback to an un-privileged
# runas.
@ -95,18 +103,9 @@ def runas(cmdLine, username, password=None, cwd=None):
log.debug("No impersonation token, using unprivileged runas")
return runas_unpriv(cmdLine, username, password, cwd)
username, domain = split_username(username)
# Validate the domain and sid exist for the username
try:
_, domain, _ = win32security.LookupAccountName(domain, username)
except pywintypes.error as exc:
message = win32api.FormatMessage(exc.winerror).rstrip('\n')
raise CommandExecutionError(message)
if domain == 'NT AUTHORITY':
# Logon as a system level account, SYSTEM, LOCAL SERVICE, or NETWORK
# SERVICE.
logonType = win32con.LOGON32_LOGON_SERVICE
user_token = win32security.LogonUser(
username,
domain,
@ -173,53 +172,56 @@ def runas(cmdLine, username, password=None, cwd=None):
# Create the environment for the user
env = win32profile.CreateEnvironmentBlock(user_token, False)
# Start the process in a suspended state.
process_info = salt.platform.win.CreateProcessWithTokenW(
int(user_token),
logonflags=1,
applicationname=None,
commandline=cmdLine,
currentdirectory=cwd,
creationflags=creationflags,
startupinfo=startup_info,
environment=env,
)
try:
# Start the process in a suspended state.
process_info = salt.platform.win.CreateProcessWithTokenW(
int(user_token),
logonflags=1,
applicationname=None,
commandline=cmdLine,
currentdirectory=cwd,
creationflags=creationflags,
startupinfo=startup_info,
environment=env,
)
hProcess = process_info.hProcess
hThread = process_info.hThread
dwProcessId = process_info.dwProcessId
dwThreadId = process_info.dwThreadId
hProcess = process_info.hProcess
hThread = process_info.hThread
dwProcessId = process_info.dwProcessId
dwThreadId = process_info.dwThreadId
salt.platform.win.kernel32.CloseHandle(stdin_write.handle)
salt.platform.win.kernel32.CloseHandle(stdout_write.handle)
salt.platform.win.kernel32.CloseHandle(stderr_write.handle)
# We don't use these so let's close the handle
salt.platform.win.kernel32.CloseHandle(stdin_write.handle)
salt.platform.win.kernel32.CloseHandle(stdout_write.handle)
salt.platform.win.kernel32.CloseHandle(stderr_write.handle)
ret = {'pid': dwProcessId}
# Resume the process
psutil.Process(dwProcessId).resume()
ret = {'pid': dwProcessId}
# Resume the process
psutil.Process(dwProcessId).resume()
# Wait for the process to exit and get it's return code.
if win32event.WaitForSingleObject(hProcess, win32event.INFINITE) == win32con.WAIT_OBJECT_0:
exitcode = win32process.GetExitCodeProcess(hProcess)
ret['retcode'] = exitcode
# Wait for the process to exit and get it's return code.
if win32event.WaitForSingleObject(hProcess, win32event.INFINITE) == win32con.WAIT_OBJECT_0:
exitcode = win32process.GetExitCodeProcess(hProcess)
ret['retcode'] = exitcode
# Read standard out
fd_out = msvcrt.open_osfhandle(stdout_read.handle, os.O_RDONLY | os.O_TEXT)
with os.fdopen(fd_out, 'r') as f_out:
stdout = f_out.read()
ret['stdout'] = stdout
# Read standard out
fd_out = msvcrt.open_osfhandle(stdout_read.handle, os.O_RDONLY | os.O_TEXT)
with os.fdopen(fd_out, 'r') as f_out:
stdout = f_out.read()
ret['stdout'] = stdout
# Read standard error
fd_err = msvcrt.open_osfhandle(stderr_read.handle, os.O_RDONLY | os.O_TEXT)
with os.fdopen(fd_err, 'r') as f_err:
stderr = f_err.read()
ret['stderr'] = stderr
salt.platform.win.kernel32.CloseHandle(hProcess)
win32api.CloseHandle(user_token)
if impersonation_token:
win32security.RevertToSelf()
win32api.CloseHandle(impersonation_token)
# Read standard error
fd_err = msvcrt.open_osfhandle(stderr_read.handle, os.O_RDONLY | os.O_TEXT)
with os.fdopen(fd_err, 'r') as f_err:
stderr = f_err.read()
ret['stderr'] = stderr
finally:
salt.platform.win.kernel32.CloseHandle(hProcess)
win32api.CloseHandle(th)
win32api.CloseHandle(user_token)
if impersonation_token:
win32security.RevertToSelf()
win32api.CloseHandle(impersonation_token)
return ret
@ -228,6 +230,14 @@ def runas_unpriv(cmd, username, password, cwd=None):
'''
Runas that works for non-priviledged users
'''
# Validate the domain and sid exist for the username
username, domain = split_username(username)
try:
_, domain, _ = win32security.LookupAccountName(domain, username)
except pywintypes.error as exc:
message = win32api.FormatMessage(exc.winerror).rstrip('\n')
raise CommandExecutionError(message)
# Create a pipe to set as stdout in the child. The write handle needs to be
# inheritable.
c2pread, c2pwrite = salt.platform.win.CreatePipe(
@ -251,22 +261,21 @@ def runas_unpriv(cmd, username, password, cwd=None):
hStdError=errwrite,
)
username, domain = split_username(username)
# Run command and return process info structure
process_info = salt.platform.win.CreateProcessWithLogonW(
username=username,
domain=domain,
password=password,
logonflags=salt.platform.win.LOGON_WITH_PROFILE,
commandline=cmd,
startupinfo=startup_info,
currentdirectory=cwd)
salt.platform.win.kernel32.CloseHandle(dupin)
salt.platform.win.kernel32.CloseHandle(c2pwrite)
salt.platform.win.kernel32.CloseHandle(errwrite)
salt.platform.win.kernel32.CloseHandle(process_info.hThread)
try:
# Run command and return process info structure
process_info = salt.platform.win.CreateProcessWithLogonW(
username=username,
domain=domain,
password=password,
logonflags=salt.platform.win.LOGON_WITH_PROFILE,
commandline=cmd,
startupinfo=startup_info,
currentdirectory=cwd)
salt.platform.win.kernel32.CloseHandle(process_info.hThread)
finally:
salt.platform.win.kernel32.CloseHandle(dupin)
salt.platform.win.kernel32.CloseHandle(c2pwrite)
salt.platform.win.kernel32.CloseHandle(errwrite)
# Initialize ret and set first element
ret = {'pid': process_info.dwProcessId}

View file

@ -559,13 +559,16 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
'''.format(**managed_files)))
ret = self.run_function('state.sls', [state_name])
self.assertSaltTrueReturn(ret)
for typ in state_keys:
self.assertTrue(ret[state_keys[typ]]['result'])
self.assertIn('diff', ret[state_keys[typ]]['changes'])
finally:
os.remove(state_file)
if os.path.exists(state_file):
os.remove(state_file)
for typ in managed_files:
os.remove(managed_files[typ])
if os.path.exists(managed_files[typ]):
os.remove(managed_files[typ])
@skip_if_not_root
@skipIf(IS_WINDOWS, 'Windows does not support "mode" kwarg. Skipping.')
@ -915,10 +918,7 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
tmp_dir = os.path.join(TMP, 'pgdata')
sym_dir = os.path.join(TMP, 'pg_data')
if IS_WINDOWS:
self.run_function('file.mkdir', [tmp_dir, 'Administrators'])
else:
os.mkdir(tmp_dir, 0o700)
os.mkdir(tmp_dir, 0o700)
self.run_function('file.symlink', [tmp_dir, sym_dir])
@ -1254,10 +1254,7 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
null_file = '{0}/null'.format(tmp_dir)
broken_link = '{0}/broken'.format(tmp_dir)
if IS_WINDOWS:
self.run_function('file.mkdir', [tmp_dir, 'Administrators'])
else:
os.mkdir(tmp_dir, 0o700)
os.mkdir(tmp_dir, 0o700)
self.run_function('file.symlink', [null_file, broken_link])
@ -1770,7 +1767,7 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
with salt.utils.files.fopen(path_test, 'rb') as fp_:
serialized_file = salt.utils.stringutils.to_unicode(fp_.read())
# The JSON serializer uses LF even on OSes where os.path.sep is CRLF.
# The JSON serializer uses LF even on OSes where os.sep is CRLF.
expected_file = '\n'.join([
'{',
' "a_list": [',
@ -2615,15 +2612,17 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
user = 'salt'
mode = '0644'
self.run_function('user.add', [user])
ret = self.run_function('user.add', [user])
self.assertTrue(ret, 'Failed to add user. Are you running as sudo?')
ret = self.run_state('file.copy', name=dest, source=source, user=user,
makedirs=True, mode=mode)
makedirs=True, mode=mode)
self.assertSaltTrueReturn(ret)
file_checks = [dest, os.path.join(TMP, 'dir1'), os.path.join(TMP, 'dir1', 'dir2')]
for check in file_checks:
user_check = self.run_function('file.get_user', [check])
mode_check = self.run_function('file.get_mode', [check])
assert user_check == user
assert salt.utils.files.normalize_mode(mode_check) == mode
self.assertEqual(user_check, user)
self.assertEqual(salt.utils.files.normalize_mode(mode_check), mode)
def test_contents_pillar_with_pillar_list(self):
'''

View file

@ -8,7 +8,7 @@ import tempfile
import textwrap
# Import Salt Testing libs
from tests.support.helpers import with_tempfile, dedent
from tests.support.helpers import with_tempfile
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.paths import TMP
from tests.support.unit import TestCase, skipIf
@ -33,6 +33,10 @@ import salt.modules.cmdmod as cmdmod
from salt.exceptions import CommandExecutionError, SaltInvocationError
from salt.utils.jinja import SaltCacheLoader
if salt.utils.platform.is_windows():
import salt.modules.win_file as win_file
import salt.utils.win_dacl as win_dacl
SED_CONTENT = '''test
some
content
@ -338,22 +342,27 @@ class FileCommentLineTestCase(TestCase, LoaderModuleMockMixin):
class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
def setup_loader_modules(self):
return {
if salt.utils.platform.is_windows():
grains = {'kernel': 'Windows'}
else:
grains = {'kernel': 'Linux'}
opts = {'test': False,
'file_roots': {'base': 'tmp'},
'pillar_roots': {'base': 'tmp'},
'cachedir': 'tmp',
'grains': grains}
ret = {
filemod: {
'__salt__': {
'config.manage_mode': MagicMock(),
'cmd.run': cmdmod.run,
'cmd.run_all': cmdmod.run_all
},
'__opts__': {
'test': False,
'file_roots': {'base': 'tmp'},
'pillar_roots': {'base': 'tmp'},
'cachedir': 'tmp',
'grains': {},
},
'__grains__': {'kernel': 'Linux'},
'__opts__': opts,
'__grains__': grains,
'__utils__': {
'files.is_binary': MagicMock(return_value=False),
'files.get_encoding': MagicMock(return_value='utf-8'),
@ -361,6 +370,15 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
},
}
}
if salt.utils.platform.is_windows():
ret.update({
win_dacl: {'__opts__': opts},
win_file: {
'__utils__': {
'dacl.check_perms': win_dacl.check_perms
}}})
return ret
MULTILINE_STRING = textwrap.dedent('''\
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nam rhoncus
@ -409,12 +427,17 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
"We found them. I'm not a witch.",
"We shall say 'Ni' again to you, if you do not appease us."
])
filemod.blockreplace(self.tfile.name,
marker_start='#-- START BLOCK 1',
marker_end='#-- END BLOCK 1',
content=new_multiline_content,
backup=False,
append_newline=None)
if salt.utils.platform.is_windows():
check_perms_patch = win_file.check_perms
else:
check_perms_patch = filemod.check_perms
with patch.object(filemod, 'check_perms', check_perms_patch):
filemod.blockreplace(self.tfile.name,
marker_start='#-- START BLOCK 1',
marker_end='#-- END BLOCK 1',
content=new_multiline_content,
backup=False,
append_newline=None)
with salt.utils.files.fopen(self.tfile.name, 'rb') as fp:
filecontent = fp.read()
@ -444,12 +467,17 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
salt.utils.stringutils.to_unicode(fp.read())
)
filemod.blockreplace(self.tfile.name,
marker_start='#-- START BLOCK 2',
marker_end='#-- END BLOCK 2',
content=new_content,
backup=False,
append_if_not_found=True)
if salt.utils.platform.is_windows():
check_perms_patch = win_file.check_perms
else:
check_perms_patch = filemod.check_perms
with patch.object(filemod, 'check_perms', check_perms_patch):
filemod.blockreplace(self.tfile.name,
marker_start='#-- START BLOCK 2',
marker_end='#-- END BLOCK 2',
content=new_content,
backup=False,
append_if_not_found=True)
with salt.utils.files.fopen(self.tfile.name, 'rb') as fp:
self.assertIn(salt.utils.stringutils.to_bytes(
@ -475,7 +503,12 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tfile:
tfile.write(salt.utils.stringutils.to_bytes(base + os.linesep))
tfile.flush()
filemod.blockreplace(tfile.name, **args)
if salt.utils.platform.is_windows():
check_perms_patch = win_file.check_perms
else:
check_perms_patch = filemod.check_perms
with patch.object(filemod, 'check_perms', check_perms_patch):
filemod.blockreplace(tfile.name, **args)
expected = os.linesep.join([base, block])
with salt.utils.files.fopen(tfile.name) as tfile2:
self.assertEqual(
@ -486,7 +519,12 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tfile:
tfile.write(salt.utils.stringutils.to_bytes(base))
tfile.flush()
filemod.blockreplace(tfile.name, **args)
if salt.utils.platform.is_windows():
check_perms_patch = win_file.check_perms
else:
check_perms_patch = filemod.check_perms
with patch.object(filemod, 'check_perms', check_perms_patch):
filemod.blockreplace(tfile.name, **args)
with salt.utils.files.fopen(tfile.name) as tfile2:
self.assertEqual(
salt.utils.stringutils.to_unicode(tfile2.read()), expected)
@ -495,7 +533,12 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
# A newline should not be added in empty files
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tfile:
pass
filemod.blockreplace(tfile.name, **args)
if salt.utils.platform.is_windows():
check_perms_patch = win_file.check_perms
else:
check_perms_patch = filemod.check_perms
with patch.object(filemod, 'check_perms', check_perms_patch):
filemod.blockreplace(tfile.name, **args)
with salt.utils.files.fopen(tfile.name) as tfile2:
self.assertEqual(
salt.utils.stringutils.to_unicode(tfile2.read()), block)
@ -521,12 +564,17 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
'{0}#-- END BLOCK 2'.format(new_content)])),
fp.read())
filemod.blockreplace(self.tfile.name,
marker_start='#-- START BLOCK 2',
marker_end='#-- END BLOCK 2',
content=new_content,
backup=False,
prepend_if_not_found=True)
if salt.utils.platform.is_windows():
check_perms_patch = win_file.check_perms
else:
check_perms_patch = filemod.check_perms
with patch.object(filemod, 'check_perms', check_perms_patch):
filemod.blockreplace(self.tfile.name,
marker_start='#-- START BLOCK 2',
marker_end='#-- END BLOCK 2',
content=new_content,
backup=False,
prepend_if_not_found=True)
with salt.utils.files.fopen(self.tfile.name, 'rb') as fp:
self.assertTrue(
@ -536,11 +584,16 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
'{0}#-- END BLOCK 2'.format(new_content)]))))
def test_replace_partial_marked_lines(self):
filemod.blockreplace(self.tfile.name,
marker_start='// START BLOCK',
marker_end='// END BLOCK',
content='new content 1',
backup=False)
if salt.utils.platform.is_windows():
check_perms_patch = win_file.check_perms
else:
check_perms_patch = filemod.check_perms
with patch.object(filemod, 'check_perms', check_perms_patch):
filemod.blockreplace(self.tfile.name,
marker_start='// START BLOCK',
marker_end='// END BLOCK',
content='new content 1',
backup=False)
with salt.utils.files.fopen(self.tfile.name, 'r') as fp:
filecontent = salt.utils.stringutils.to_unicode(fp.read())
@ -555,12 +608,17 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
fext = '.bak'
bak_file = '{0}{1}'.format(self.tfile.name, fext)
filemod.blockreplace(
self.tfile.name,
marker_start='// START BLOCK',
marker_end='// END BLOCK',
content='new content 2',
backup=fext)
if salt.utils.platform.is_windows():
check_perms_patch = win_file.check_perms
else:
check_perms_patch = filemod.check_perms
with patch.object(filemod, 'check_perms', check_perms_patch):
filemod.blockreplace(
self.tfile.name,
marker_start='// START BLOCK',
marker_end='// END BLOCK',
content='new content 2',
backup=fext)
self.assertTrue(os.path.exists(bak_file))
os.unlink(bak_file)
@ -569,28 +627,43 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
fext = '.bak'
bak_file = '{0}{1}'.format(self.tfile.name, fext)
filemod.blockreplace(self.tfile.name,
marker_start='// START BLOCK',
marker_end='// END BLOCK',
content='new content 3',
backup=False)
if salt.utils.platform.is_windows():
check_perms_patch = win_file.check_perms
else:
check_perms_patch = filemod.check_perms
with patch.object(filemod, 'check_perms', check_perms_patch):
filemod.blockreplace(self.tfile.name,
marker_start='// START BLOCK',
marker_end='// END BLOCK',
content='new content 3',
backup=False)
self.assertFalse(os.path.exists(bak_file))
def test_no_modifications(self):
filemod.blockreplace(self.tfile.name,
marker_start='#-- START BLOCK 1',
marker_end='#-- END BLOCK 1',
content='new content 4',
backup=False,
append_newline=None)
if salt.utils.platform.is_windows():
check_perms_patch = win_file.check_perms
else:
check_perms_patch = filemod.check_perms
with patch.object(filemod, 'check_perms', check_perms_patch):
filemod.blockreplace(self.tfile.name,
marker_start='#-- START BLOCK 1',
marker_end='#-- END BLOCK 1',
content='new content 4',
backup=False,
append_newline=None)
before_ctime = os.stat(self.tfile.name).st_mtime
filemod.blockreplace(self.tfile.name,
marker_start='#-- START BLOCK 1',
marker_end='#-- END BLOCK 1',
content='new content 4',
backup=False,
append_newline=None)
if salt.utils.platform.is_windows():
check_perms_patch = win_file.check_perms
else:
check_perms_patch = filemod.check_perms
with patch.object(filemod, 'check_perms', check_perms_patch):
filemod.blockreplace(self.tfile.name,
marker_start='#-- START BLOCK 1',
marker_end='#-- END BLOCK 1',
content='new content 4',
backup=False,
append_newline=None)
after_ctime = os.stat(self.tfile.name).st_mtime
self.assertEqual(before_ctime, after_ctime)
@ -607,23 +680,28 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
self.assertEqual(before_ctime, after_ctime)
def test_show_changes(self):
ret = filemod.blockreplace(self.tfile.name,
marker_start='// START BLOCK',
marker_end='// END BLOCK',
content='new content 6',
backup=False,
show_changes=True)
if salt.utils.platform.is_windows():
check_perms_patch = win_file.check_perms
else:
check_perms_patch = filemod.check_perms
with patch.object(filemod, 'check_perms', check_perms_patch):
ret = filemod.blockreplace(self.tfile.name,
marker_start='// START BLOCK',
marker_end='// END BLOCK',
content='new content 6',
backup=False,
show_changes=True)
self.assertTrue(ret.startswith('---')) # looks like a diff
self.assertTrue(ret.startswith('---')) # looks like a diff
ret = filemod.blockreplace(self.tfile.name,
marker_start='// START BLOCK',
marker_end='// END BLOCK',
content='new content 7',
backup=False,
show_changes=False)
ret = filemod.blockreplace(self.tfile.name,
marker_start='// START BLOCK',
marker_end='// END BLOCK',
content='new content 7',
backup=False,
show_changes=False)
self.assertIsInstance(ret, bool)
self.assertIsInstance(ret, bool)
def test_unfinished_block_exception(self):
self.assertRaises(
@ -2319,6 +2397,10 @@ class ChattrTests(TestCase, LoaderModuleMockMixin):
actual = filemod._chattr_has_extended_attrs()
assert actual, actual
# We're skipping this on Windows as it tests the check_perms function in
# file.py which is specifically for Linux. The Windows version resides in
# win_file.py
@skipIf(salt.utils.platform.is_windows(), 'Skip on Windows')
def test_check_perms_should_report_no_attr_changes_if_there_are_none(self):
filename = '/path/to/fnord'
attrs = 'aAcCdDeijPsStTu'
@ -2356,6 +2438,10 @@ class ChattrTests(TestCase, LoaderModuleMockMixin):
)
assert actual_ret.get('changes', {}).get('attrs')is None, actual_ret
# We're skipping this on Windows as it tests the check_perms function in
# file.py which is specifically for Linux. The Windows version resides in
# win_file.py
@skipIf(salt.utils.platform.is_windows(), 'Skip on Windows')
def test_check_perms_should_report_attrs_new_and_old_if_they_changed(self):
filename = '/path/to/fnord'
attrs = 'aAcCdDeijPsStTu'