使用工作队列进行细粒度并行处理

在本例中,您将运行一个 Kubernetes Job,该 Job 将多个并行任务作为工作进程运行,每个任务都作为独立的 Pod 运行。

在本例中,每个 Pod 创建后,都会从任务队列中获取一个工作单元,对其进行处理,并重复此过程,直到队列结束。

以下是本例中步骤的概述

  1. 启动一个存储服务来保存工作队列。在本例中,您将使用 Redis 来存储工作项。在之前的示例中,您使用了 RabbitMQ。在本例中,您将使用 Redis 和一个自定义工作队列客户端库;这是因为 AMQP 没有提供一个好的方法让客户端检测到有限长度的工作队列何时为空。在实践中,您将设置一个像 Redis 这样的存储,并将其用于许多作业的工作队列和其他用途。
  2. 创建一个队列,并用消息填充它。每条消息代表一项要完成的任务。在本例中,消息是一个整数,我们将对其进行长时间的计算。
  3. 启动一个处理队列中任务的 Job。该 Job 启动多个 Pod。每个 Pod 从消息队列中获取一个任务,对其进行处理,并重复此过程,直到队列结束。

开始之前

您需要有一个 Kubernetes 集群,并且 kubectl 命令行工具必须配置为与您的集群通信。建议您在至少有两个节点的集群上运行本教程,这些节点不充当控制平面主机。如果您还没有集群,可以使用minikube创建一个,或者您可以使用以下 Kubernetes 游乐场之一

您将需要一个容器镜像仓库,您可以在其中上传镜像以在您的集群中运行。本例使用Docker Hub,但您可以将其改编为其他容器镜像仓库。

本任务示例还假设您在本地安装了 Docker。您使用 Docker 构建容器镜像。

熟悉Job的基本非并行用法。

启动 Redis

在本例中,为了简单起见,您将启动单个 Redis 实例。有关可扩展且冗余地部署 Redis 的示例,请参见Redis 示例

您也可以直接下载以下文件

要启动单个 Redis 实例,您需要创建 redis pod 和 redis 服务

kubectl apply -f https://k8s.io/examples/application/job/redis/redis-pod.yaml
kubectl apply -f https://k8s.io/examples/application/job/redis/redis-service.yaml

用任务填充队列

现在让我们用一些“任务”填充队列。在本例中,任务是要打印的字符串。

启动一个临时交互式 Pod 来运行 Redis CLI。

kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt

现在按回车键,启动 Redis CLI,并创建一个包含一些工作项的列表。

redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"

因此,键为job2的列表将是工作队列。

注意:如果您没有正确设置 Kube DNS,您可能需要将上述代码块的第一步更改为redis-cli -h $REDIS_SERVICE_HOST

创建容器镜像

现在您已准备好创建一个镜像来处理该队列中的工作。

您将使用一个带有 Redis 客户端的 Python 工作程序来从消息队列中读取消息。

提供了一个简单的 Redis 工作队列客户端库,名为rediswq.py (下载).

Job 的每个 Pod 中的“工作程序”程序使用工作队列客户端库来获取工作。以下是该程序

#!/usr/bin/env python

import time
import rediswq

host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")

q = rediswq.RedisWQ(name="job2", host=host)
print("Worker with sessionID: " +  q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
  item = q.lease(lease_secs=10, block=True, timeout=2) 
  if item is not None:
    itemstr = item.decode("utf-8")
    print("Working on " + itemstr)
    time.sleep(10) # Put your actual work here instead of sleep.
    q.complete(item)
  else:
    print("Waiting for work")
print("Queue empty, exiting")

您也可以下载worker.pyrediswq.pyDockerfile文件,然后构建容器镜像。以下是如何使用 Docker 进行镜像构建的示例

docker build -t job-wq-2 .

推送镜像

对于Docker Hub,使用您的用户名标记您的应用程序镜像,并使用以下命令将其推送到 Hub。将<username>替换为您的 Hub 用户名。

docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2

您需要推送到公共仓库或配置您的集群以能够访问您的私有仓库

定义一个 Job

以下是您将创建的 Job 的清单

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-2
spec:
  parallelism: 2
  template:
    metadata:
      name: job-wq-2
    spec:
      containers:
      - name: c
        image: gcr.io/myproject/job-wq-2
      restartPolicy: OnFailure

在本例中,每个 Pod 处理队列中的多个项目,然后在没有更多项目时退出。由于工作程序本身会检测到工作队列何时为空,而 Job 控制器不知道工作队列,因此它依赖于工作程序来发出它们已完成工作的信号。工作程序通过成功退出来发出队列为空的信号。因此,只要任何工作程序成功退出,控制器就会知道工作已完成,并且 Pod 很快就会退出。因此,您需要将 Job 的完成计数保留为未设置状态。作业控制器将等待其他 Pod 也完成。

运行 Job

因此,现在运行 Job

# this assumes you downloaded and then edited the manifest already
kubectl apply -f ./job.yaml

现在等待一会儿,然后检查 Job

kubectl describe jobs/job-wq-2
Name:             job-wq-2
Namespace:        default
Selector:         controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-2
Annotations:      <none>
Parallelism:      2
Completions:      <unset>
Start Time:       Mon, 11 Jan 2022 17:07:59 +0000
Pods Statuses:    1 Running / 0 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                job-name=job-wq-2
  Containers:
   c:
    Image:              container-registry.example/exampleproject/job-wq-2
    Port:
    Environment:        <none>
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen    LastSeen    Count    From            SubobjectPath    Type        Reason            Message
  ---------    --------    -----    ----            -------------    --------    ------            -------
  33s          33s         1        {job-controller }                Normal      SuccessfulCreate  Created pod: job-wq-2-lglf8

您可以等待 Job 成功,并设置超时

# The check for condition name is case insensitive
kubectl wait --for=condition=complete --timeout=300s job/job-wq-2
kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
Working on lemon

如您所见,此 Job 的一个 Pod 处理了多个工作单元。

替代方案

如果运行队列服务或修改容器以使用工作队列不方便,您可能需要考虑其他作业模式

如果您有持续的后台处理工作要运行,那么请考虑使用 ReplicaSet 运行您的后台工作程序,并考虑运行一个后台处理库,例如https://github.com/resque/resque

上次修改时间:2024 年 3 月 16 日太平洋标准时间下午 2:39:修复并行处理工作队列任务的文档。 (bed970676c)