mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
hook in the request to publisher
This commit is contained in:
parent
f846b983d8
commit
2548fd36de
1 changed files with 54 additions and 6 deletions
|
@ -36,6 +36,7 @@ class Master(object):
|
|||
reqserv.start()
|
||||
local.start()
|
||||
while True:
|
||||
# Add something to keep the jobs dir clean
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
|
@ -59,12 +60,11 @@ class Publisher(threading.Thread):
|
|||
print binder
|
||||
self.socket.bind(binder)
|
||||
|
||||
def command(self, cmd):
|
||||
def publish(self, package):
|
||||
'''
|
||||
Publish out a command to the minions, takes a cmd structure, one of
|
||||
those encrypted json jobs
|
||||
Publish out a command to the minions, takes a cmd structure
|
||||
'''
|
||||
self.socket.send(cmd)
|
||||
self.socket.send(package)
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
|
@ -84,13 +84,28 @@ class ReqServer(threading.Thread):
|
|||
self.master_key = salt.crypt.MasterKeys(self.opts)
|
||||
self.num_threads = self.opts['worker_threads']
|
||||
self.context = zmq.Context(1)
|
||||
|
||||
# Prepare the zeromq sockets
|
||||
self.c_uri = 'tcp://' + self.opts['interface'] + ':'\
|
||||
+ self.opts['ret_port']
|
||||
self.clients = self.context.socket(zmq.XREP)
|
||||
|
||||
self.workers = self.context.socket(zmq.XREQ)
|
||||
self.w_uri = 'inproc://wokers'
|
||||
# Start the publisher
|
||||
self.publisher = Publisher(opts)
|
||||
self.publisher.start()
|
||||
# Prepare the aes key
|
||||
self.key = self.__prep_key()
|
||||
self.crypticle = salt.crypt.Crypticle(self.key)
|
||||
|
||||
def __prep_key(self):
|
||||
'''
|
||||
A key needs to be placed in the filesystem with permissions 0400 so
|
||||
clients are required to run as root.
|
||||
'''
|
||||
keyfile = os.path.join(self.opts['cachedir'], 'root_key')
|
||||
key = salt.crypt.Crypticle.generate_key_string()
|
||||
open(keyfile, 'w+').write(key)
|
||||
return key
|
||||
|
||||
def __worker(self):
|
||||
'''
|
||||
|
@ -119,6 +134,21 @@ class ReqServer(threading.Thread):
|
|||
|
||||
zmq.device(zmq.QUEUE, self.clients, self.workers)
|
||||
|
||||
def _prep_jid(self, load):
|
||||
'''
|
||||
Parses the job return directory and generates a job id and sets up the
|
||||
job id directory
|
||||
'''
|
||||
jid_root = os.path.join(self.opts['cachedir'], 'jobs')
|
||||
jid = str(time.time())
|
||||
jid_dir = os.path.join(jid_root, jid)
|
||||
if not os.path.isdir(jid_dir):
|
||||
os.makedirs(jid)
|
||||
pickle.dump(load, open(os.path.join(jid_dir, 'load.p')))
|
||||
else:
|
||||
return self._gen_jid(load)
|
||||
return jid
|
||||
|
||||
def _handle_payload(self, payload):
|
||||
'''
|
||||
The _handle_payload method is the key method used to figure out what
|
||||
|
@ -176,6 +206,24 @@ class ReqServer(threading.Thread):
|
|||
ret['aes'] = key.public_encrypt(self.opts['aes'], 4)
|
||||
return ret
|
||||
|
||||
def publish(self, load):
|
||||
'''
|
||||
This method sends out publications to the minions
|
||||
'''
|
||||
jid = self._gen_jid()
|
||||
if not load['key'] == self.key:
|
||||
return False
|
||||
payload = {'enc': 'aes'}
|
||||
load = {
|
||||
'fun': load['fun'],
|
||||
'arg': load['arg'],
|
||||
'tgt': load['tgt'],
|
||||
'jid': self._prep_jid(load),
|
||||
}
|
||||
payload['load'] = self.crypticle.dumps(load)
|
||||
self.publisher.publish(salt.payload.package(payload))
|
||||
return True
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
Start up the ReqServer
|
||||
|
|
Loading…
Add table
Reference in a new issue