pythonssh执⾏shell命令的⽰例# -*- coding: utf-8 -*-
import paramiko
import threading
def run(host_ip, username, password, command):
ssh = paramiko.SSHClient()
try:
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print('===================exec on [%s]=====================' % host_ip)
stdin, stdout, stderr = _command(command, timeout=300)
out = adlines()
for o in out:
print (o.strip('\n'))
except Exception as ex:
print('error, host is [%s], msg is [%s]' % (host_ip, ex.message))
finally:
ssh.close()
if __name__ == '__main__':
# 将需要批量执⾏命令的host ip地址填到这⾥
# eg: host_ip_list = ['IP1', 'IP2']
host_ip_list = ['147.116.20.19']
for _host_ip in host_ip_list:
# ⽤户名,密码,执⾏的命令填到这⾥
run(_host_ip, 'tzgame', 'tzgame@1234', 'df -h')
run(_host_ip, 'tzgame', 'tzgame@1234', 'ping -c 5 220.181.38.148')
pycrypto,由于 paramiko 模块内部依赖pycrypto,所以先下载安装pycrypto
pip3 install pycrypto
pip3 install paramiko
(1)基于⽤户名和密码的连接
import paramiko
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts⽂件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
# 执⾏命令
stdin, stdout, stderr = _command('ls')
# 获取命令结果
result = ad()
# 关闭连接
ssh.close()
(2)基于公钥秘钥连接
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts⽂件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh命令行
# 执⾏命令
stdin, stdout, stderr = _command('df')
# 获取命令结果
result = ad()
# 关闭连接
ssh.close()
SFTPClient:
  ⽤于连接远程服务器并进⾏上传下载功能。
(1)基于⽤户名密码上传下载
import paramiko
transport = paramiko.Transport(('hostname',22))
sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传⾄服务器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 将remove_path 下载到本地 local_path
<('remove_path', 'local_path')
transport.close()
(2)基于公钥秘钥上传下载
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
transport = paramiko.Transport(('hostname', 22))
sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传⾄服务器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 将remove_path 下载到本地 local_path
<('remove_path', 'local_path')
transport.close()
下⾯是多线程执⾏版本
#!/usr/bin/python
#coding:utf-8
import threading
import subprocess
import os
import sys
sshport = 13131
log_path = 'update_log'
output = {}
def execute(s, ip, cmd, log_path_today):
with s:
cmd = '''ssh -p%s root@%s -n "%s" ''' % (sshport, ip, cmd)
ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)    output[ip] = adlines()
if __name__ == "__main__":
if len(sys.argv) != 3:
print "Usage: %s config.ini cmd" % sys.argv[0]
if not os.path.isfile(sys.argv[1]):
print "Usage: %s is not file!" % sys.argv[1]
cmd = sys.argv[2]
f = open(sys.argv[1],'r')
list = f.readlines()
f.close()
today = day()
log_path_today = '%s/%s' % (log_path,today)
if not os.path.isdir(log_path_today):
os.makedirs(log_path_today)
threading_num = 100
if threading_num > len(list):
threading_num = len(list)
s = threading.Semaphore(threading_num)
for line in list:
ip = line.strip()
t = threading.Thread(target=execute,args=(s, ip,cmd,log_path_today))
t.setDaemon(True)
t.start()
main_thread = threading.currentThread()
for t umerate():
if t is main_thread:
continue
t.join()
for ip,result in output.items():
print "%s: " % ip
for line in result:
print "  %s" % line.strip()
print "Done!"
以上脚本读取两个参数,第⼀个为存放IP的⽂本,第⼆个为shell命令执⾏效果如下
# -*- coding:utf-8 -*-
import requests
ptions import RequestException
import os, time
import re
from lxml import etree
import threading
lock = threading.Lock()
def get_html(url):
response = (url, timeout=10)
# print(response.status_code)
try:
if response.status_code == 200:
# )
else:
return None
except RequestException:
print("请求失败")
# return None
def parse_html(html_text):
html = etree.HTML(html_text)
if len(html) > 0:
img_src = html.xpath("//img[@class='photothumb lazy']/@data-original") # 元素提取⽅法
# print(img_src)
return img_src
else:
print("解析页⾯元素失败")
def get_image_pages(url):
html_text = get_html(url) # 获取搜索url响应内容
# print(html_text)
if html_text is not None:
html = etree.HTML(html_text) # ⽣成XPath解析对象
last_page = html.xpath("//div[@class='pages']//a[last()]/@href") # 提取最后⼀页所在href链接
print(last_page)
if last_page:
max_page = repile(r'(\d+)', re.S).search(last_page[0]).group() # 使⽤正则表达式提取链接中的页码数字      print(max_page)
print(type(max_page))
return int(max_page) # 将字符串页码转为整数并返回
else:
print("暂⽆数据")
return None
else:
print("查询结果失败")
def get_all_image_url(page_number):
base_url = 'imgbin/free-png/naruto/'
image_urls = []
x = 1 # 定义⼀个标识,⽤于给每个图⽚url编号,从1递增
for i in range(1, page_number):
url = base_url + str(i) # 根据页码遍历请求url
try:
html = get_html(url) # 解析每个页⾯的内容
if html:
data = parse_html(html) # 提取页⾯中的图⽚url
# print(data)
# time.sleep(3)
if data:
for j in data:
image_urls.append({
'name': x,
'value': j
})
x += 1 # 每提取⼀个图⽚url,标识x增加1
except RequestException as f:
print("遇到错误:", f)
continue
# print(image_urls)
return image_urls
def get_image_content(url):
try:
r = (url, timeout=15)
if r.status_code == 200:
t
return None
except RequestException:
return None
def main(url, image_name):
semaphore.acquire() # 加锁,限制线程数
print('当前⼦线程: {}'.format(threading.current_thread().name))
save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
try:
file_path = '{0}/{1}.jpg'.format(save_path, image_name)
if not ists(file_path): # 判断是否存在⽂件,不存在则爬取
with open(file_path, 'wb') as f:
f.write(get_image_content(url))
f.close()
print('第{}个⽂件保存成功'.format(image_name))
else:
print("第{}个⽂件已存在".format(image_name))
except FileNotFoundError as f:
print("第{}个⽂件下载时遇到错误,url为:{}:".format(image_name, url))
print("报错:", f)
raise
except TypeError as e:
print("第{}个⽂件下载时遇到错误,url为:{}:".format(image_name, url))
print("报错:", e)
class MyThread(threading.Thread):
"""继承Thread类重写run⽅法创建新进程"""
def __init__(self, func, args):
"""
:param func: run⽅法中要调⽤的函数名
:
param args: func函数所需的参数
"""
threading.Thread.__init__(self)
self.func = func
self.args = args
def run(self):
print('当前⼦线程: {}'.format(threading.current_thread().name))
self.func(self.args[0], self.args[1])
# 调⽤func函数
# 因为这⾥的func函数其实是上述的main()函数,它需要2个参数;args传⼊的是个参数元组,拆解开来传⼊
if __name__ == '__main__':
start = time.time()
print('这是主线程:{}'.format(threading.current_thread().name))
urls = get_all_image_url(5) # 获取所有图⽚url列表
thread_list = [] # 定义⼀个列表,向⾥⾯追加线程
semaphore = threading.BoundedSemaphore(5) # 或使⽤Semaphore⽅法
for t in urls:
# print(i)
m = MyThread(main, (t["value"], t["name"])) # 调⽤MyThread类,得到⼀个实例
thread_list.append(m)
for m in thread_list:
m.start() # 调⽤start()⽅法,开始执⾏
for m in thread_list:
m.join() # ⼦线程调⽤join()⽅法,使主线程等待⼦线程运⾏完毕之后才退出
end = time.time()
print(end-start)
# get_image_pages(imgbin/free-png/Naruto)
以上就是python ssh 执⾏shell命令的⽰例的详细内容,更多关于python ssh 执⾏shell命令的资料请关注其它相关⽂章!