这篇文章主要讲解了“怎么用python+Element实现主机Host操作”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么用python+Element实现主机Host操作”吧!
创新互联建站长期为1000多家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为广元企业提供专业的成都做网站、网站设计,广元网站改版等技术服务。拥有十多年丰富建站经验和众多成功案例,为您定制开发。
<%inherit file="/base.html"/>{{ scope.row.is_monitored ? "移除监控":"加入监控" }} 查看性能 查看状态 
urls.py文件内容
from django.conf.urls import patterns from home_application.temp import views as temp_view from home_application.job import views as job_view from home_application.host import views as host_view from home_application.exam import views as exam_view urlpatterns = patterns( 'home_application.views', (r'^$', job_view.job), (r'^dev-guide/$', 'dev_guide'), (r'^contactus/$', 'contactus'), (r'^api/test/$', 'test'), (r'^temp/$', 'temp'), (r'^host/$', host_view.host), (r'^status/$', host_view.status), (r'^host_view/$', host_view.HostView.as_view()), (r'^get_all_hosts/$', host_view.get_all_hosts), (r'^get_perform_data/$', host_view.get_perform_data), ... )
host\views.py文件内容
import json
from django.views.generic import View
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.http import JsonResponse
from common.mymako import render_mako_context
from home_application.models import Host, LoadData
from home_application.utils.job_api import FastJobApi
perform_script = """
#!/bin/bash
MEMORY=$(free -m | awk 'NR==2{printf "%.2f%%",$3*100/$2}')
DISK=$(df -h| awk '$NF=="/"{printf "%s",$5}')
CPU=$(top -bn1 | grep load | awk '{printf "%.2f%%",$(NF-2)}')
DATE=$(date "+%Y-%m-%d %H:%M:%S")
echo -e "$DATE|$MEMORY|$DISK|$CPU"
"""
def host(request):
    return render_mako_context(request, '/home_application/host.html')
def status(request):
    return render_mako_context(request, "/home_application/status.html")
class CsrfExemptView(View):
    @method_decorator(csrf_exempt)
    def dispatch(self, request, *args, **kwargs):
        return super(CsrfExemptView, self).dispatch(request, *args, **kwargs)
def get_all_hosts(request):
    from home_application.utils.cc_by_request import cc_search_host
    res_data = cc_search_host().get("info")
    for host in res_data:
        bk_host_innerip = host.get("host", {}).get("bk_host_innerip")
        bk_host_id = host.get("host", {}).get("bk_host_id")
        bk_host_name = host.get("host", {}).get("bk_host_name")
        bk_os_name = host.get("host", {}).get("bk_os_name")
        bk_biz_id = host.get("biz", [])[0].get("bk_biz_id")
        bk_biz_name = host.get("biz", [])[0].get("bk_biz_name")
        cloud_name = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_name")
        cloud_id = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_id")
        host_obj = {
            "ip": bk_host_innerip,
            "bk_biz_id": bk_biz_id,
            "os_name": bk_os_name,
            "host_name": bk_host_name,
            "bk_biz_name": bk_biz_name,
            "cloud_id": cloud_id,
            "cloud_name": cloud_name,
        }
        Host.objects.update_or_create(host_id=bk_host_id, defaults=host_obj)
    return JsonResponse({"result": res_data})
@csrf_exempt
def get_perform_data(request):
    data = json.loads(request.body)
    host_id = data.get("host_id")
    try:
        obj = LoadData.objects.filter(host_id=host_id).last()
        if obj:
            res_data = {
                "cpu": obj.cpu,
                "mem": obj.mem,
                "disk": obj.disk,
            }
            return JsonResponse({"result": True, "data": res_data})
        bk_biz_id = int(data.get("bk_biz_id"))
        ip = data.get("ip")
        cloud_id = data.get("cloud_id")
        # res_data = execute_script_and_return(request,ip_list, bk_biz_id, perform_script)
        obj = FastJobApi(bk_biz_id, cloud_id, ip, perform_script)
        job_res = obj.execute_script_and_return()
        if job_res:
            log_content = job_res[0].get("log_content")
            res_data = {
                "result": True,
                "cpu": log_content.split("|")[3],
                "mem": log_content.split("|")[1],
                "disk": log_content.split("|")[2],
            }
            return JsonResponse({"result": True, "data": res_data})
        return JsonResponse({"result": False})
    except Exception:
        return JsonResponse({"result": False})
class HostView(CsrfExemptView):
    def get(self, request, *args, **kwargs):
        try:
            host_query = Host.objects.all()
        except Exception:
            return JsonResponse({"result": False})
        search_biz_id = request.GET.get("search_biz_id")
        query_str = request.GET.get("query_str")
        if search_biz_id:
            host_query = host_query.filter(bk_biz_id=search_biz_id)
        if query_str:
            host_query = host_query.filter(ip__in=query_str.split(","))
        host_query = host_query[:30] if host_query.count() > 10 else host_query
        res_data = [i.to_dict() for i in host_query]
        return JsonResponse({"result": True, "data": res_data})
    def post(self, request, *args, **kwargs):
        try:
            data = json.loads(request.body)
            host_id = data.get("host_id")
            is_monitored = data.get("is_monitored")
            if is_monitored:
                Host.objects.filter(host_id=host_id).update(is_monitored=False)
            else:
                Host.objects.filter(host_id=host_id).update(is_monitored=True)
            return JsonResponse({"result": True})
        except Exception:
            return JsonResponse({"result": False})models.py文件内容
from django.db import models
from home_application.utils.parse_time import parse_datetime_to_timestr
class Host(models.Model):
    host_id = models.IntegerField(u"主机ID", primary_key=True, unique=True)
    ip = models.CharField(u"IP地址", max_length=32, blank=True, null=True)
    bk_biz_id = models.CharField(u"业务ID", max_length=16, blank=True, null=True)
    bk_biz_name = models.CharField(u"业务名称", max_length=512, blank=True, null=True)
    os_name = models.CharField(u"系统名", max_length=128, blank=True, null=True)
    host_name = models.CharField(u"主机名", max_length=128, blank=True, null=True)
    cloud_id = models.IntegerField(u"云区域ID", blank=True, null=True)
    cloud_name = models.CharField(u"云区域名称", max_length=32, blank=True, null=True)
    is_monitored = models.BooleanField(u"是否已监控", default=False)
    def to_dict(self):
        return {
            "host_id": self.host_id,
            "ip": self.ip,
            "pk": self.pk,
            "bk_biz_id": self.bk_biz_id,
            "bk_biz_name": self.bk_biz_name,
            "os_name": self.os_name,
            "host_name": self.host_name,
            "cloud_name": self.cloud_name,
            "cloud_id": self.cloud_id,
            "is_monitored": self.is_monitored,
            "mem_use": "-",
            "cpu_use": "-",
            "disk_use": "-",
        }
    def get_load_data(self):
        load_query = LoadData.objects.filter(host_id=self.pk).order_by("create_time")
        return [i.to_dict() for i in load_query]
class LoadData(models.Model):
    host_id = models.IntegerField(u"主机ID", default=0)
    cpu = models.IntegerField(u"CPU使用率", default=0)
    mem = models.IntegerField(u"内存使用率", default=0)
    disk = models.IntegerField(u"硬盘使用率", default=0)
    create_time = models.DateTimeField(u"创建时间", auto_now_add=True)
    def to_dict(self):
        return {
            "host_id": self.host_id,
            "cpu": self.cpu,
            "mem": self.mem,
            "disk": self.disk,
            "create_time": parse_datetime_to_timestr(self.create_time)
        }FastJobApi部分代码
import base64
import time
from conf.default import APP_ID, APP_TOKEN
from blueking.component.shortcuts import get_client_by_user, get_client_by_request
count = 0
class FastJobApi(object):
    def __init__(self, bk_biz_id, bk_cloud_id, ip_list, script_content):
        self.client = get_client_by_user('admin')
        self.username = "admin"
        self.biz_id = bk_biz_id
        self.script_content = script_content
        self.ip_list = [{"bk_cloud_id": bk_cloud_id, "ip": i} for i in ip_list.split(",")]
    def _fast_execute_script(self, execute_account="root", param_content='', script_timeout=1000):
        """
        快速执行脚本
        :param execute_account: 执行脚本的账户名
        :param param_content: 执行脚本的参数
        :param script_timeout: 脚本执行超时时间
        :return: job_instance_id
        """
        kwargs = {
            "bk_app_code": APP_ID,
            "bk_app_secret": APP_TOKEN,
            "bk_biz_id": self.biz_id,
            "bk_username": self.username,
            "script_content": base64.b64encode(self.script_content),
            "ip_list": self.ip_list,
            "script_type": 1,
            "account": execute_account,
            "script_param": base64.b64encode(param_content),
            "script_timeout": script_timeout
        }
        result = self.client.job.fast_execute_script(kwargs)
        if result["result"]:
            return result.get("data").get("job_instance_id")
        return False
    def _get_job_instance_status(self, task_id):
        """
        获取脚本执行状态
        :param task_id: 执行脚本的 job_instance_id
        :return:
        """
        global count
        count += 1
        # 查询执行状态
        resp = self.client.job.get_job_instance_status(
            bk_username=self.username,
            bk_biz_id=self.biz_id,
            job_instance_id=task_id
        )
        if resp.get('data').get('is_finished'):
            count = 0
            return True
        elif not resp.get('data').get('is_finished') and count <= 5:
            time.sleep(2)
            return self._get_job_instance_status(task_id)
        else:
            count = 0
            return False
    def _get_job_instance_log(self, task_id):
        """
        查询作业日志
        :param task_id: 执行脚本的 job_instance_id
        :return:
        """
        if self._get_job_instance_status(task_id):
            resp = self.client.job.get_job_instance_log(
                job_instance_id=task_id,
                bk_biz_id=self.biz_id,
                bk_username='admin'
            )
            return resp['data'][0]['step_results'][0]['ip_logs']
    def execute_script_and_return(self):
        """
        执行脚本并获取脚本执行结果
        :return: 脚本执行结果
        """
        job_instance_id = self._fast_execute_script()
        if job_instance_id:
            ip_logs = self._get_job_instance_log(job_instance_id)
            return ip_logs实现效果





感谢各位的阅读,以上就是“怎么用python+Element实现主机Host操作”的内容了,经过本文的学习后,相信大家对怎么用python+Element实现主机Host操作这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!