标签:client net ini yam nod self exce test core
k8s python api二次封装
pip install pprint kubernetes
import urllib3
from pprint import pprint
from kubernetes import client
from os import path
import yaml
class K8sApi(object):
def __init__(self):
# self.config = config.kube_config.load_kube_config()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.configuration = client.Configuration()
self.configuration.host = "https://192.168.100.111:6443"
self.configuration.api_key[‘authorization‘] = ‘Bearer token‘
self.configuration.verify_ssl = False
self.k8s_apps_v1 = client.AppsV1Api(client.ApiClient(self.configuration))
self.Api_Instance = client.CoreV1Api(client.ApiClient(self.configuration))
self.Api_Instance_Extensions = client.ExtensionsV1beta1Api(client.ApiClient(self.configuration))
####################################################################################################################
def list_deployment(self, namespace="default"):
api_response = self.k8s_apps_v1.list_namespaced_deployment(namespace)
return api_response
def read_deployment(self, name="nginx-deployment", namespace="default"):
api_response = self.k8s_apps_v1.read_namespaced_deployment(name, namespace)
return api_response
def create_deployment(self, file="deploy-nginx.yaml"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
resp = self.k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="default")
return resp
def replace_deployment(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
resp = self.k8s_apps_v1.replace_namespaced_deployment(name, namespace,
body=dep)
return resp
def delete_deployment(self, name="nginx-deployment", namespace="default"):
api_response = self.k8s_apps_v1.delete_namespaced_deployment(name, namespace)
return api_response
####################################################################################################################
def list_node(self):
api_response = self.Api_Instance.list_node()
data = {}
for i in api_response.items:
data[i.metadata.name] = {"name": i.metadata.name,
"status": i.status.conditions[-1].type if i.status.conditions[
-1].status == "True" else "NotReady",
"ip": i.status.addresses[0].address,
"kubelet_version": i.status.node_info.kubelet_version,
"os_image": i.status.node_info.os_image,
}
return data
def list_pod(self):
api_response = self.Api_Instance.list_pod_for_all_namespaces()
data = {}
for i in api_response.items:
data[i.metadata.name] = {"ip": i.status.pod_ip, "namespace": i.metadata.namespace}
return data
def read_pod(self, name="nginx-pod", namespace="default"):
api_response = self.Api_Instance.read_namespaced_pod(name, namespace)
return api_response
def create_pod(self, file="pod-nginx.yaml", namespace="default"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
api_response = self.Api_Instance.create_namespaced_pod(namespace, body=dep)
return api_response
def replace_pod(self, name="nginx-pod", namespace="default"):
dep = self.read_pod(name)
api_response = self.Api_Instance.replace_namespaced_pod(name, namespace, body=dep)
return api_response
def delete_pod(self, name="nginx-pod", namespace="default"):
api_response = self.Api_Instance.delete_namespaced_pod(name, namespace)
return api_response
####################################################################################################################
def list_service(self):
api_response = self.Api_Instance.list_service_for_all_namespaces()
return api_response
def read_service(self, name="", namespace="default"):
api_response = self.Api_Instance.read_namespaced_service(name, namespace)
return api_response
def create_service(self, file="service-nginx.yaml", namespace="default"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
api_response = self.Api_Instance.create_namespaced_service(namespace, body=dep)
return api_response
def replace_service(self, name="hequan", namespace="default"):
dep = self.read_pod(name)
api_response = self.Api_Instance.replace_namespaced_service(name, namespace, body=dep)
return api_response
def delete_service(self, name="hequan", namespace="default"):
api_response = self.Api_Instance.delete_namespaced_service(name, namespace)
return api_response
####################################################################################################################
def list_ingress(self):
api_response = self.Api_Instance_Extensions.list_ingress_for_all_namespaces()
return api_response
def read_ingress(self, name="", namespace="default"):
api_response = self.Api_Instance_Extensions.read_namespaced_ingress(name, namespace)
return api_response
def create_ingress(self, file="ingress-nginx.yaml", namespace="default"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
api_response = self.Api_Instance_Extensions.create_namespaced_ingress(namespace, body=dep)
return api_response
def replace_ingress(self, name="", file="ingress-nginx.yaml", namespace="default"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
api_response = self.Api_Instance_Extensions.replace_namespaced_ingress(name=name, namespace=namespace,
body=dep)
return api_response
def delete_ingress(self, name="hequan", namespace="default"):
api_response = self.Api_Instance_Extensions.delete_namespaced_ingress(name, namespace)
return api_response
#####################################################################################################################
def list_stateful(self):
api_response = self.k8s_apps_v1.list_stateful_set_for_all_namespaces()
return api_response
def read_stateful(self, name="nginx-deployment", namespace="default"):
api_response = self.k8s_apps_v1.read_namespaced_stateful_set(name, namespace)
return api_response
def create_stateful(self, file="deploy-nginx.yaml"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
resp = self.k8s_apps_v1.create_namespaced_stateful_set(
body=dep, namespace="default")
return resp
def replace_stateful(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
resp = self.k8s_apps_v1.replace_namespaced_stateful_set(name, namespace,
body=dep)
return resp
def delete_stateful(self, name="nginx-deployment", namespace="default"):
api_response = self.k8s_apps_v1.delete_namespaced_stateful_set(name, namespace)
return api_response
####################################################################################################################
if __name__ == ‘__main__‘:
def test():
obj = K8sApi()
ret = obj.list_deployment()
pprint(ret)
test()
标签:client net ini yam nod self exce test core
原文地址:https://blog.51cto.com/hequan/2438485