使用kubernetes API(Python)来控制集群内部对象的CURD

写在前面:依旧是疫情远程实验室中,不知道为什么我的 kubernetes 集群突然集体更换了 ip 地址,真的很诡异 desu… 重新配集群的话我会想死,最后使用了一个基于 Go 语言的二进制一键离线部署安装工具 sealos,基本上只要指定三个节点的地址,不过不知道为啥似乎只能使用 root 用户来进行操作(可能是前置的一些设置没有设置)。

sealos 地址:https://github.com/fanux/sealos

使用 sealos 时三台虚拟机上要开启 ssh 服务,通过以下命令安装并且开放权限(我的系统是 ubuntu20.04)

1
2
3
4
apt install ssh
vi /etc/ssh/sshd_config
# 修改 PermitRootLogin = yes
# restart ssh 服务

以下是新的系统配置

ip 地址 节点类型
192.168.2.113 master
192.168.2.114 node
192.168.2.115 node

最近主要研究实现了怎么使用 kubernetes 的 api 接口来管理 kubernetes 集群,使用的语言是 python,python-kubernetes-client 的官方地址如下:https://github.com/kubernetes-client/python/tree/master/kubernetes

# 引入 kubernetes 包

1
2
3
4
5
# pip install kubernetes
import kubernetes

# 以下使用到了kubernetes官方库的config和client,因此可以这样写
from kubernetes import config, client

# 集群内部控制

我搭建的系统是在 pod 内去修改 kubernetes 的资源,首先在对系统的修改之前,需要访问系统的配置文件,得到系统的相关配置

1
2
config.load_incluster_config()
# incluster表示在节点内

得到系统配置后,我们可以通过建立相关的 api 创建、更新、访问相关资源,以下以项目中用到的 Job 和 Server 为例,在实际创建过程中,可能会遇到 403 错误,该错误可以通过设置 rbac 解决,使用的 yaml 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: test
rules: # 似乎是在这里添加权限
- apiGroups: [""]
resources: ["pods", "services"]
verbs: ["list", "create", "get", "delete"]
- apiGroups: ["batch"]
resources: ["jobs", "jobs/status"]
verbs: ["list", "create", "get", "delete"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: test-bind
subjects:
- kind: ServiceAccount
name: default
namespace: default
roleRef:
kind: ClusterRole
name: test
apiGroup: rbac.authorization.k8s.io

# 创建和管理 Job

job 的 api-group 是 batch,通过以下命令来创建相关的 api,成功创建后就可以管理 job 资源

1
batch_v1 = client.BatchV1Api()

创建 job

删除 job 的逻辑写在创建 job 的时候,spec.ttlSecondsAfterFinished,代表 job 完工 30 秒后删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def create_container():
port_list = [8888, 8889]
container_port_list = list()
container_name = "pi" + str(uuid.uuid1())
for port in port_list:
container_port = int(port)
c_port = client.V1ContainerPort(container_port=container_port)
container_port_list.append(c_port)
container = client.V1Container(
name=container_name,
image="lcy200077/exec:v1.1",
ports=container_port_list
)
return container


def create_job_object(node):
# Configurate Pod template container
container = create_container()
# Create and configure a spec section
job_name = 'pi' + str(uuid.uuid1())
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "pi"}),
spec=client.V1PodSpec(restart_policy="Never", containers=[container], node_name=node))
# Create the specification of deployment
spec = client.V1JobSpec(
template=template,
ttl_seconds_after_finished=30,
backoff_limit=4)
# 三十秒后删除
# Instantiate the job object
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name=job_name),
spec=spec)

return job, job_name


def create_job(api_instance, job, job_name):
api_response = api_instance.create_namespaced_job(
body=job,
namespace="default")
print("Job created. status='%s'" % str(api_response.status))

获取 Job 信息

1
2
3
4
5
6
7
8
9
10
11
def get_job_status(api_instance, job_name):
job_completed = False
while not job_completed:
api_response = api_instance.read_namespaced_job_status(
name=job_name,
namespace="default")
if api_response.status.succeeded is not None or \
api_response.status.failed is not None:
job_completed = True
sleep(1)
print("Job status='%s'" % str(api_response.status))

# 创建和管理 service

service 采用 manifest 的方法创建,其中将 service 对应的 job 作为依赖项,当 job 被删除的时候,kubernetes 的垃圾回收机制会自动回收掉依赖 job 的 Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def create_service(api, job_name):
batch_v1 = client.BatchV1Api()
job_response = batch_v1.read_namespaced_job_status(
name=job_name,
namespace="default"
)
uid = job_response.metadata.uid
global num
service_name = "pi" + str(uuid.uuid1())
node_p1 = 30030 + num * 2
node_p2 = 30030 + num * 2 + 1
service_manifest = {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"labels": {"name": service_name},
"name": service_name,
"resourceversion": "v1",
"ownerReferences": {
"kind": "Job",
"name": job_name,
"apiVersion": "batch/v1",
"uid": uid
}
},
"spec": {
"type": "NodePort",
"ports": [
{"name": "p1",
"port": 80,
"protocol": "TCP",
"targetPort": 8888,
"nodePort": node_p1
},
{"name": "p2",
"port": 81,
"protocol": "TCP",
"targetPort": 8889,
"nodePort": node_p2
}
],
"selector": {"job-name": job_name},
},
}
service = api.create(body=service_manifest, namespace="default")
print("Service " + service_name + "created")
num += 1
return node_p1, node_p2

ownerReference

是依赖项,在 spec 中设置,一般来说 kubernetes 会自动配置,也可以手动配置,一个 ownerReference 需要以下几个参数,缺一不可

1
2
3
4
5
6
"ownerReferences": {
"kind": "Job",
"name": job_name,
"apiVersion": "batch/v1",
"uid": uid #可以通过api get来获取
}

使用kubernetes API(Python)来控制集群内部对象的CURD

http://example.com/2022/04/08/使用kubernetes-API-Python-来控制集群内部对象的CURD/

作者

lcy

发布于

2022-04-08

更新于

2023-01-17

许可协议