解决go mod模式下本地包引用问题

使用go mod模式,进行项目源码管理,可以解决不用在Gopath下的src目录下进行项目开发。

在go mod 下,解决自定义包倒入出错问题:

1
main.go:11:2: package hellogRPC/proto/hello is not in GOROOT (/Users/austsxk/Golang_dev/go/src/hellogRPC/proto/hello)

文件目录结构

解决的方法:

在每个包下,都使用go mod进行管理,对于需要在项目本地导入的包,可以在当前包的go mod文件中:

自定义导入包的路径,如上图所示,将在server包中使用到的hellogRPC/proto/hello包就命名为此,并在require中填入相关内容,初始化版本为v0.0.0,然后使用 replace 关键词 将其取代为相对于当前文件的包的相对路径。

在server包中就可以正常导入 hellogRPC/proto/hello

如下图所示:

使用项目中的包

celery分布式消息队列简单使用

Celery-分布式任务队列

一、Celery简介

1. 什么是任务队列

任务队列是一种用于在线程或计算机之间分配工作的机制。

任务队列的输入是一个称为任务的工作单元,有专门的职程(Worker)进行不断的监视任务队列,进行执行新的任务工作。

Celery 通过消息机制进行通信,通常使用中间件(Broker)作为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列 发送一条消息,然后中间件(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)执行。

Celery 可以有多个职程(Worker)和中间件(Broker),用来提高Celery的高可用性以及横向扩展能力。

Celery 需要消息中间件来进行发送和接收消息。 RabbitMQ 和 Redis 中间件的功能比较齐全,但也支持其它的实验性的解决方案,其 中包括 SQLite 进行本地开发。

Celery 可以在一台机器上运行,也可以在多台机器上运行,甚至可以跨数据中心运行。

2. Celery组件

Celery 扮演生产者和消费者的角色

Celery Beat:任务调度器。Beat 进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列。

Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者,提高运行效率。

Broker:消息代理,队列本身。 也称为消息中间件。 接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。

Producer:任务生产者。 调用 Celery API ,函数或者装饰器,而产生任务并交给任务队列处理的都是任务生产者。

Result Backend : 任务处理完成之后保存状态信息和结果,以供查询。

celery架构图

celery架构图

3. Celery特点

  • 高可用

当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务。

  • 快速

一个单进程的 Celery 每分钟可以处理数以百万的任务,而且延迟仅为亚毫秒(使用 RabbitMQ、 librabbitmq 在优化过后)。

  • 灵活

Celery 的每个部分几乎都可以自定义扩展和单独使用,例如自定义连接池、序列化方式、压缩方式、日志记录方式、任务调度、生产者、消费者、中间件(Broker)等。

4. Celery功能

  • 监控

可以针对整个流程进行监控,内置的工具可以实时说明当前集群的概况。

  • 调度

可以通过调度功能在一段时间内指定任务的执行时间 datetime,也可以根据简单每隔一段时间进行执行重复的任务,支持分钟、小时、星期几,也支持某一天或某一年的Crontab表达式。

  • 工作流

可以通过“canvas”进行组成工作流,其中包含分组、链接、分块等等。

简单和复杂的工作流程可以使用一组“canvas“组成,其中包含分组、链接、分块等。

  • 资源(内存)泄漏保护

–max-tasks-per-child 参数适用于可能会出现资源泄漏(例如:内存泄漏)的任务。

  • 时间和速率的限制

您可以控制每秒/分钟/小时执行任务的次数,或者任务执行的最长时间,也将这些设置为默认值,针对特定的任务或程序进行定制化配置。

  • 自定义组件

开发者可以定制化每一个职程(Worker)以及额外的组件。职程(Worker)是用 “bootsteps” 构建的-一个依赖关系图,可以对职程(Worker)的内部进行细粒度控制。

5. 版本要求

Celery 4.0 运行:

  • Python ❨2.7,3.4,3.5❩

  • PyPy ❨5.4,5.5❩

    这是支持 Python2.7 的最后一个版本,从下一个版本Celery5.x开始,需要Python3.5或更高的版本。

如果您的 Python 运行环境比较老,则需要使用旧版本的Celery:

  • Python 2.6:Celery 3.1 或更早版本。
  • Python 2.5:Celery 3.0 或更早版本。
  • Python 2.4:Celery 2.2 或更早版本。

二、中间件:Brokers

Celery 支持多种消息传输的方式:

​ RabbitMQ

​ Redis

​ Amazon SQS

1. 中间件(Broker)概况

这是不同的中间件比对情况,更多的信息可以在每个中间件的文档中找到。

名称 状态 监控 远程控制
RabbitMQ 稳定
Redis 稳定
Amazon SQS 稳定
Zookeeper 试验阶段

目前试验阶段的中间件(Broker)只是功能性的,没有专门的维护人员。

缺少监控就意味着这个监控已经失效,因此相关的 Flower、Celery events、celerymon 和其他基于此功能的监控工具全部失效。

远程管理控制是指可以通过 celery inspect 和 celery control(以及使用远程控制API的工具)在程序运行时检查和管理职程(Worker)的能力。

2. 使用RabbitMQ

安装与配置

RabbitMQ 是默认的中间件(Broker),只需要配置连接的URL即可,不需要安装额外的的配置以及初始化配置信息

1
broker_url = 'amqp://myuser:mypassword@localhost:5672/myvhost'

安装 RabbitMQ 服务可以通过 RabbitMQ官网 进行 安装RabbitMQ

注意:

如果在安装 RabbitMQ 后,使用 rabbitmqctl 出现 nodedown 错误信息,可以查阅这片文章解决问题:

http://www.somic.org/2009/02/19/on-rabbitmqctl-and-badrpcnodedown/

配置 RabbitMQ

要使用 Celery,需要创一个RabbitMQ账户:

1
2
3
4
$ sudo rabbitmqctl add_user myuser mypassword
$ sudo rabbitmqctl add_vhost myvhost
$ sudo rabbitmqctl set_user_tags myuser mytag
$ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

修改myuser、mypassword、myvhost为自己配置的配置信息。

关于更多RabbitMQ配置,请查阅 RabbitMQ手册

3. 使用Redis

配置

Redis 的配置非常的简单,只需要配置 Redis 的 URL :

1
app.conf.broker_url = 'redis://localhost:6379/0'

URL 的格式为:

1
redis://:password@hostname:port/db_number

结果存储

如果您想保存任务执行返回结果保存到Redis,您需要进行以下配置:

1
app.conf.result_backend = 'redis://localhost:7379/0'

三、Celery基本使用

Celery 是一个包含一系列的消息任务队列。您可以不用了解内部的原理直接使用,它的使用时非常简单的。此外 Celery 可以快速与您的产品扩展与集成,以及 Celery 提供了一系列 Celery 可能会用到的工具和技术支持方案。

1. 选择中间件(Broker)

Celery 需要一个中间件来进行接收和发送消息,通常以独立的服务形式出现

RabbitMQ

RabbitMQ 的功能比较齐全、稳定、便于安装。在生产环境来说是首选的。

Redis

Redis 功能比较全,但是如果突然停止运行或断电会造成数据丢失。

2. 安装 Celery

Celery 在 python 的 PyPI 中管理,可以使用 pip 或 easy_install 来进行安装:

1
$ pip install celery

3. 应用

创建第一个 Celery 实例程序,我们把创建 Celery 程序成为 Celery 应用或直接简称为 app,创建的第一个实例程序可能需要包含 Celery 中执行操作的所有入口点,例如创建任务、管理职程(Worker)等,所以必须要导入 Celery 模块。

首先创建 tasks.py:

1
2
3
4
5
6
7
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
return x + y

4. 运行 Celery 职程(Worker)服务

1
$ celery -A tasks worker --loglevel=info

celery worker 的相关参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
Usage: celery worker [options]

Start worker instance.

Examples::
celery worker --app=proj -l info
celery worker -A proj -l info -Q hipri,lopri
celery worker -A proj --concurrency=4
celery worker -A proj --concurrency=1000 -P eventlet
celery worker --autoscale=10,0

Options:
-A APP, --app=APP app instance to use (e.g. module.attr_name)
-b BROKER, --broker=BROKER url to broker. default is 'amqp://guest@localhost//'
--loader=LOADER name of custom loader class to use.
--config=CONFIG Name of the configuration module
--workdir=WORKING_DIRECTORY Optional directory to change to after detaching.
-C, --no-color
-q, --quiet
-c CONCURRENCY, --concurrency=CONCURRENCY
Number of child processes processing the queue. The
default is the number of CPUs available on your
system.
-P POOL_CLS, --pool=POOL_CLS
Pool implementation: prefork (default), eventlet,
gevent, solo or threads.
--purge, --discard Purges all waiting tasks before the daemon is started.
**WARNING**: This is unrecoverable, and the tasks will
be deleted from the messaging server.
-l LOGLEVEL, --loglevel=LOGLEVEL
Logging level, choose between DEBUG, INFO, WARNING,
ERROR, CRITICAL, or FATAL.
-n HOSTNAME, --hostname=HOSTNAME
Set custom hostname, e.g. 'w1.%h'. Expands: %h
(hostname), %n (name) and %d, (domain).
-B, --beat Also run the celery beat periodic task scheduler.
Please note that there must only be one instance of
this service.
-s SCHEDULE_FILENAME, --schedule=SCHEDULE_FILENAME
Path to the schedule database if running with the -B
option. Defaults to celerybeat-schedule. The extension
".db" may be appended to the filename. Apply
optimization profile. Supported: default, fair
--scheduler=SCHEDULER_CLS
Scheduler class to use. Default is
celery.beat.PersistentScheduler
-S STATE_DB, --statedb=STATE_DB
Path to the state database. The extension '.db' may be
appended to the filename. Default: None
-E, --events Send events that can be captured by monitors like
celery events, celerymon, and others.
--time-limit=TASK_TIME_LIMIT
Enables a hard time limit (in seconds int/float) for
tasks.
--soft-time-limit=TASK_SOFT_TIME_LIMIT
Enables a soft time limit (in seconds int/float) for
tasks.
--maxtasksperchild=MAX_TASKS_PER_CHILD
Maximum number of tasks a pool worker can execute
before it's terminated and replaced by a new worker.
-Q QUEUES, --queues=QUEUES
List of queues to enable for this worker, separated by
comma. By default all configured queues are enabled.
Example: -Q video,image
-X EXCLUDE_QUEUES, --exclude-queues=EXCLUDE_QUEUES
-I INCLUDE, --include=INCLUDE
Comma separated list of additional modules to import.
Example: -I foo.tasks,bar.tasks
--autoscale=AUTOSCALE
Enable autoscaling by providing max_concurrency,
min_concurrency. Example:: --autoscale=10,3 (always
keep 3 processes, but grow to 10 if necessary)
--autoreload Enable autoreloading.
--no-execv Don't do execv after multiprocessing child fork.
--without-gossip Do not subscribe to other workers events.
--without-mingle Do not synchronize with other workers at startup.
--without-heartbeat Do not send event heartbeats.
--heartbeat-interval=HEARTBEAT_INTERVAL
Interval in seconds at which to send worker heartbeat
-O OPTIMIZATION
-D, --detach
-f LOGFILE, --logfile=LOGFILE
Path to log file. If no logfile is specified, stderr
is used.
--pidfile=PIDFILE Optional file used to store the process pid. The
program will not start if this file already exists and
the pid is still alive.
--uid=UID User id, or user name of the user to run as after
detaching.
--gid=GID Group id, or group name of the main group to change to
after detaching.
--umask=UMASK Effective umask (in octal) of the process after
detaching. Inherits the umask of the parent process
by default.
--executable=EXECUTABLE
Executable to use for the detached process.
--version show program's version number and exit
-h, --help show this help message and exit

5. 调用异步任务

需要调用我们创建的实例任务,可以通过 delay() 进行调用。

delay()apply_async() 的快捷方法,可以更好的控制任务的执行

1
2
>>> from tasks import add
>>> add.delay(4, 4)

如果该任务已经有职程(Worker)开始处理,可以通过控制台输出的日志进行查看执行情况。

调用任务会返回一个 AsyncResult 的实例,用于检测任务的状态,等待任务完成获取返回值(如果任务执行失败,会抛出异常)。默认这个功能是不开启的,如果开启则需要配置 Celery 的结果后端,下一小节会详细说明。

6. 保存结果

如果您需要跟踪任务的状态,Celery 需要在某处存储任务的状态信息。Celery 内置了一些后端结果:SQLAlchemy/Django ORM、MemcachedRedis、 RPC (RabbitMQ/AMQP)以及自定义的后端结果存储中间件。

可以使用Redis作为Celery结果后端,使用RabbitMQ作为中间件(Broker)可以使用以下配置(这种组合比较流行):

1
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

现在已经配置结果后端,重新调用执行任务。会得到调用任务后返回的一个 AsyncResult 实例:

1
>>> result = add.delay(4, 4)

ready() 可以检测是否已经处理完毕:

1
2
>>> result.ready()
True

整个任务执行过程为异步的,如果一直等待任务完成,会将异步调用转换为同步调用:

1
2
>>> result.get(timeout=1)
8

如果任务出现异常,get() 会再次引发异常,可以通过 propagate 参数进行覆盖:

1
>>> result.get(propagate=False)

如果任务出现异常,可以通过以下命令进行回溯:

1
>>> result.traceback

7. Celery + Redis 的探究

尝试研究,使用 redis 作为 celery 的 broker 时,celery 的交互操作同 redis 中数据记录的关联关系。

[1] 继续使用上面的tasks.py,直接发起一个任务。

1
2
>>> from tasks import add
>>> add.delay(4, 4)

执行后可看到 redis 上生成了两个 key

redis1

celery:表示当前正在队列中的 task,等待被 worker 所接收
_kombu.binding.celery:这个不用管(celery 使用 kombu 维护消息队列,这个是 kombu 生成的对逻辑影响不大)

然后启动 worker

1
celery -A tasks worker --loglevel=info

执行后可看到 celery 这个 key 消失了,同时新增了 2 个 key

celery 消失说明任务已经被刚启动的 worker 接收了,worker 会自己去执行这个 task,当前没有等待被接收的任务
_kombu.binding.celery.pidbox:这个也不用管(也是 kombu 维护的)
_kombu.binding.celeryev:这个也不用管(也是 kombu 维护的,用来记下当前连接的 worker)

**[2] ** 下面我们试一下延时任务,使用apply_async()调用任务,并启动worker

1
add.apply_async((1, 2), countdown=60)

在 60 秒内查看 redis,可以看到没有出现 celery 这个 key,但多出了另外两个 key

redis2

unacked:可以理解为这个是被 worker 接收了但是还没开始执行的 task 列表(因为60秒后才会开始执行)。
unacked_index:用户标记上面 unacked 的任务的 id,理论上应该与 unacked 一一对应的。

60 秒后再次查看 redis,可以看到又回到了无任务的状态,这表示被 worker 领取的任务确实在 60 秒后执行了。

[3] 这里在尝试一种异常的情况,worker 领取任务后还没到 60 秒,突然遇到问题退出了。

还是使用apply_async()调用任务,并启动worker,等大约 10 秒后,ctrl+c 中断 worker。
可以看到 redis 中有 celery 这个 key,其中有一条等待领取的任务

再次启动 worker,可以发现任务被再次正常领取和执行。

由此可以推测出 celery 和 redis 之间交互的基本原理:

  1. 当发起一个 task 时,会向 redis 的 celery key 中插入一条记录。
  2. 如果这时有正在待命的空闲 worker,这个 task 会立即被 worker 领取。
  3. 如果这时没有空闲的 worker,这个 task 的记录会保留在 celery key 中。
  4. task 被 worker 领取后,如果没有到设定的执行时间,这时会将这个 task 的记录从 key celery 中移除,并添加相关信息到 unackedunacked_index 中。
  5. worker 根据 task 设定的期望执行时间执行任务,如果接到的不是延时任务或者已经超过了期望时间,则立刻执行。
  6. worker 开始执行任务时,通知 redis。(如果设置了 CELERY_ACKS_LATE = True 那么会在任务执行结束时再通知)
  7. redis 接到通知后,将 unackedunacked_index 中相关记录移除。
  8. 如果在接到通知前,worker 中断了,这时 redis 中的 unacked 和 unacked_index 记录会重新回到 celery key 中。(这个回写的操作是由 worker 在 “临死” 前自己完成的,所以在关闭 worker 时为防止任务丢失,请务必使用正确的方法停止它,如: celery multi stop w1 -A proj1)
  9. celery key 中的 task 可以再次重复上述 2 以下的流程。
  10. celery 只是利用 redis 的 list 类型,当作个简单的 Queue,并没有使用消息订阅等功能。

友情提醒

1、启动 celery worker 时可以加上 -B 参数使得 schedule 定时任务生效,但要注意如果为同一个项目启动多个 worker 时,只需要其中一个启动命令中加上 -B,否则 schedule 会被多次执行。
2、上面的 1 同时也说明了 schedule task 的执行是由 celery 发起的。也就是说,如果在 django 中使用了 CELERYBEAT_SCHEDULE,那么只要 celery worker -B 启动了,即使 django web 服务没有启动,定时任务也一样会被发起。(推荐使用专门的 celery beat 方法)
3、使用 flower 时,在上述的 “worker 领取任务后突然遇到问题退出了然后又重新启动执行” 这种情况下可能会出现显示不正常的问题。

8. Celery配置

Celery 像家用电器一样,不需要任何配置,开箱即用。它有一个输入和输出,输入端必须连接中间件(Broker),输出端可以连接到结果后端。大多数情况下,使用默认的配置就可以满足,也可以按需配置。查看配置选项可以更加的熟悉 Celery 的配置信息,可以参考 配置和默认配置

可以直接在程序中进行配置,也可以通过配置模块进行专门配置。例如,通过 task_serializer 选项可以指定序列化的方式:

1
app.conf.task_serializer = 'json'

如果需要配置多个选项,可以通过 upate 进行配置:

1
2
3
4
5
6
7
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)

针对大型的项目,建议使用专用配置模块,进行针对 Celery 配置。不建议使用硬编码,建议将所有的配置项集中化配置。集中化配置可以像系统管理员一样,当系统发生故障时可针对其进行微调。

可以通过 app.config_from_object() 进行加载配置模块:

1
app.config_from_object('celery_config')

其中 celery_config 为配置模块的名称,这个是可以自定义修改的。

在上面的实例中,需要在同级目录下创建一个名为 celery_config.py 的文件,添加以下内容:

1
2
3
4
5
6
7
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

可以通过以下命令来进行验证配置模块是否配置正确:

1
$ python -m celeryconfig

Celery 也可以设置任务执行的专用队列,这只是配置模块中一小部分,详细配置如下:

1
2
3
task_routes = {
'tasks.add': 'low-priority',
}

Celery 也可以针对任务进行限速,以下为每分钟内允许执行的10个任务的配置:

1
2
3
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}

如果使用的是 RabbitMQ 或 Redis 的话,可以在运行时进行设置任务的速率:

1
2
3
$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
new rate limit set successfully

9. 故障处理

职程(Worker)无法正常启动:权限错误

  • 如果使用系统是 Debian、Ubuntu 或其他基于 Debian 的发行版:

    Debian 最近把 /dev/shm/重名 /run/shm

    使用软连接可以解决该问题:

    1
    $ ln -s /run/shm /dev/shm
  • 其他:

    如果设置了 --pidfile --logfile--statedb 其中的一个参数,必须要保证职程(Worker)对指向的文件/目录可读可写。

任务总处于 PENDING (待处理)状态

所有任务的状态默认都是 PENDING (待处理)状态,Celery 在下发任务时不会更换任务状态

  1. 确认任务没有启用 ignore_result

    如果启用,会强制跳过任务更新状态。

  2. 确保 task_ignore_result 未启用。

  3. 确保没有旧的职程(Worker)正在运行。

    启动多个职程(Worker)比较容易,在每次运行新的职程(Worker)之前需要确保之前的职程是否关闭。

    未配置结果后端的职程(Worker)是否正在运行,可能会消费当前的任务消息。

    –pidfile 参数设置为绝对路径,确保该情况不会出现。

  4. 确认客户是否配置正确。

    可能由于某种场景,客户端与职程(Worker)的后端不配置不同,导致无法获取结果,所以需要确保配置是否正确:

    1
    2
    >>> result = task.delay(…)
    >>> print(result.backend)

四、Celery 进阶使用

1. 任务调用

  • 执行方式

标准的执行选项以及三种方法:

  • apply_async(args[, kwargs[, …]])。发送任务消息
  • delay(*args, **kwargs)。发送任务消息的快捷方式,但不支持执行选项。
  • calling (__call__)。任务将不会由 Worker 执行,而是在当前进程中执行。

示例:

1
2
3
4
5
6
T.delay(arg, kwarg=value)
T.apply_async((arg,), {'kwarg': value})
T.apply_async(countdown=10) # 从现在起10秒内执行
T.apply_async(eta=now + timedelta(seconds=10)) # 从现在起10秒内执行
T.apply_async(countdown=60, expires=120) # 从现在起一分钟内执行,但2分钟后过期
T.apply_async(expires=now + timedelta(days=2)) # 使用设置的有效期限为2天
  • 任务回调

Celery支持将任务链接在一起,以便一个任务紧随另一个任务。回调任务将与父任务的结果一起用作部分参数:

1
add.apply_async((2, 2), link=add.s(16))

在这里,第一个任务的结果(4)将被发送到一个新任务,该新任务将之前的结果加16,形成表达式 (2+2)+16

如果task引发异常(errback),还可以使用错误回调,与常规回调的行为不同的是,它将传递父任务的ID,而不是结果。这是因为不一定总是可以序列化引发的异常,因此错误回调需要启用结果后端,并且任务必须检索任务的结果。

错误回调函数示例:

1
2
3
4
5
@app.task
def error_handler(task_id):
result = AsyncResult(task_id)
exc = result.get(propagate=False)
print('Task {0} raised exception: {1!r}\n{2!r}'.format(task_id, exc, result.traceback))

可以使用link_error执行选项将其添加到任务中:

1
add.apply_async((2, 2), link_error=error_handler.s())

此外,linklink_error选项都可以表示为列表:

1
add.apply_async((2, 2), link=[add.s(16), other_task.s()])

然后将依次调用回调/错误返回,并且将使用父任务的返回值作为部分参数来调用所有回调。

  • on_message 捕获任务状态

Celery可以通过设置on_message回调支持捕获所有状态更改。

例如,对于长时间运行的任务以发送任务进度,您可以执行以下操作:

1
2
3
4
5
6
7
8
9
10
11
12
@app.task(bind=True)
def hello(self, a, b):
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 50})
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 90})
time.sleep(1)
return 'hello world: %i' % (a+b)


def on_raw_message(body):
print(body)
1
2
r = hello.apply_async(args=(1, 2))
print(r.get(on_message=on_raw_message, propagate=False))

将生成如下输出:

1
2
3
4
{'status': 'PROGRESS', 'result': {'progress': 50}, 'traceback': None, 'children': [], 'date_done': None, 'task_id': '55fc6e9e-eba8-4229-9058-b06d10172752'}
{'status': 'PROGRESS', 'result': {'progress': 90}, 'traceback': None, 'children': [], 'date_done': None, 'task_id': '55fc6e9e-eba8-4229-9058-b06d10172752'}
{'status': 'SUCCESS', 'result': 'hello world: 3', 'traceback': None, 'children': [], 'date_done': '2020-11-09T08:29:18.843557', 'task_id': '55fc6e9e-eba8-4229-9058-b06d10172752'}
hello world: 3
  • 定时执行

通过 ETA(estimated time of arrival),可以设置特定的日期和时间执行任务。countdown是一种以秒为单位设置ETA的快捷方式。

1
2
3
4
5
6
7
8
>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get() # 至少3秒后才能拿到结果
20

>>> from datetime import datetime, timedelta

>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)
  • 任务过期时间

expires参数定义了一个可选的过期时间,既可以使用任务发布之后的多少秒,也可以使用一个特定的datetime

1
2
3
4
>>> add.apply_async((10, 10), expires=60)  # 60秒之后过期

>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs, expires=datetime.now() + timedelta(days=1) # 一天以后过期

当 Worker 收到过期的任务时,会将任务标记为 REVOKEDTaskRevokedError

  • 任务重试

当连接失败时,Celery会自动重试发送消息,并且可以配置重试行为(例如重试频率或最大重试次数)或一起禁用。

要禁用重试,可以将retry执行选项设置为False

1
add.apply_async((2, 2), retry=False)

重试策略(retry_policy)包含以下键:

max_retries:放弃之前的最大重试次数,值None意味着它将永远重试,默认为重试3次。

interval_start: 定义两次重试之间要等待的秒数,默认值为0(第一次重试将立即执行)。

interval_step:每次连续重试时,此数字将被添加到重试延迟中(浮点数或整数),默认值为0.2。

interval_max:重试之间等待的最大秒数(浮点数或整数),默认值为0.2。

1
2
3
4
5
6
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})

重试的最长时间为0.4秒。默认情况下将其设置为较短,因为如果代理连接断开,则连接失败可能导致重试堆效应–例如,许多Web服务器进程正在等待重试,从而阻止了其他传入请求。

2. Canvas:设计工作流程

  • 签名

signature() 可以包装单个任务调用的参数,关键字参数和执行选项,以便可以将其传递给函数,甚至进行序列化并通过网络发送。

1
2
3
>>> from celery import signature
>>> signature('tasks.add', args=(2, 2), countdown=10)
tasks.add(2, 2)

或者您可以使用任务的signature方法创建一个:

1
2
3
4
5
>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)

>>> add.s(2, 2)
tasks.add(2, 2)

它支持调用API delayapply_async等等,包括被直接调用(__call__

1
2
3
4
5
6
>>> add(2, 2)
4
>>> add.s(2, 2)()
4
>>> add.s(2, 2).delay()
>>> add.s(2, 2).apply_async()
  • group

    组,也是一种签名,其中包含应并行应用的任务列表。

您可以轻松创建一组任务以并行执行:

1
2
3
4
>>> from celery import group
>>> res = group(add.s(i, i) for i in range(10))()
>>> res.get(timeout=1)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
  • chain

链,使我们可以将签名链接在一起,以便一个被另一个调用,本质上形成了回调链。调用链将调用当前进程中的任务,并返回链中最后一个任务的结果:

简单链:

1
2
3
4
5
6
>>> from celery import chain

>>> # 2 + 2 + 4 + 8
>>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
>>> res.get()
16

也可以使用管道来编写:

1
2
>>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
16

不变的签名

签名可以是部分签名,因此可以将参数添加到现有签名中,但是您可能并不总是希望这样,例如,如果您不希望链中上一个任务的结果传递到下一个任务中,在这种情况下,您可以将签名标记为不可变的,这样就不能更改参数:

1
>>> add.signature((2, 2), immutable=True)

还有一个.si()捷径,这是创建签名的首选方式:

1
>>> add.si(2, 2)

这样就可以创建一系列独立的任务:

1
2
3
4
5
6
7
>>> res = (add.si(2, 2) | add.si(4, 4) | add.si(8, 8))()
>>> res.get()
16
>>> res.parent.get()
8
>>> res.parent.parent.get()
4
  • chord

chord就像是带有回调的组,由标题组和主体组成,其中主体是应在标题中的所有任务完成后执行的任务:

1
2
3
4
>>> from celery import chord
>>> res = chord((add.s(i, i) for i in range(10)), xsum.s())()
>>> res.get()
90

上面的示例创建了10个任务,这些任务全部并行启动,当所有任务完成时,返回值组合到一个列表中并发送给该xsum任务。

将组与另一个任务链接在一起将自动将其升级为chord

1
2
3
4
>>> c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())
>>> res = c3()
>>> res.get()
90
  • mapstarmap

map并且starmap是内置任务,它们为序列中的每个元素调用提供调用的任务。

它们与group的不同之处在于:

  • 仅发送一条任务消息。
  • 该操作是顺序的。

例如使用map

1
2
3
4
>>> from proj.tasks import add

>>> ~xsum.map([range(10), range(100)])
[45, 4950]

与执行以下任务相同:

1
2
3
@app.task
def temp():
return [xsum(range(10)), xsum(range(100))]

使用starmap

1
2
>>> ~add.starmap(zip(range(10), range(10)))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

与执行以下任务相同:

1
2
3
@app.task
def temp():
return [add(i, i) for i in range(10)]
  • chunks

    chunks 可让您将可重复的工作分成多个部分,如果有100万个对象,则可以创建10个任务,每个任务有10万个对象。

有些人可能会担心将任务分块会导致并行度降低,但是对于繁忙的集群而言,情况很少如此,实际上,因为避免了消息传递的开销,这可能会大大提高性能。

要创建块签名,可以使用 app.Task.chunks()

1
>>> add.chunks(zip(range(100), range(100)), 10)

group发送消息的行为一样,将在当前进程中调用时进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
>>> from proj.tasks import add

>>> res = add.chunks(zip(range(100), range(100)), 10)()
>>> res.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
[20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
[40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
[60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
[80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
[100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
[120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
[140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
[160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
[180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]

调用时.apply_async将创建一个专用任务,以便将单个任务应用在工作程序中:

1
>>> add.chunks(zip(range(100), range(100)), 10).apply_async()

您还可以将块转换为组:

1
>>> group = add.chunks(zip(range(100), range(100)), 10).group()

3. 定期任务

*celery beat *是一个调度程序,它定期启动任务,然后由群集中的可用工作程序节点执行任务。默认情况下,条目是从beat_schedule设置中获取的,但是也可以使用自定义存储,例如将条目存储在SQL数据库中。必须确保一次只有一个调度程序运行,否则最终将导致重复的任务。

要定期调用任务,您必须在beat schedule列表中添加一个条目。

示例:每30秒运行task.add任务。

1
2
3
4
5
6
7
8
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'

crontab 调度器

如果要对执行任务的时间(例如,一天中的特定时间或一周中的某天)进行更多控制,则可以使用crontab计划类型:

1
2
3
4
5
6
7
8
9
10
from celery.schedules import crontab

app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}

Crontab表达式的语法非常灵活,一些例子:

含义
crontab() 每分钟执行一次。
crontab(minute=0, hour=0) 每天午夜执行。
crontab(minute=0, hour='*/3') 每三个小时执行一次:午夜,凌晨3点,6am,9am,中午,3pm,6pm,9pm。
crontab(minute=0, hour='0,3,6,9,12,15,18,21') 同上。
crontab(minute='*/15') 每15分钟执行一次。
crontab(day_of_week='sunday') 在星期日的每一分钟执行。
crontab(minute='*', hour='*', day_of_week='sun') 同上。
crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri') 每十分钟执行一次,但仅在周四或周五的凌晨3-4点,下午5-6点以及晚上10-11点之间执行。
crontab(minute=0, hour='*/2,*/3') 每隔一小时执行一次,每一小时被三整除。这意味着:每小时除外:1 am、5am、7am、11am、1pm、5pm、7pm、11pm
crontab(minute=0, hour='*/5') 执行小时可被5整除。这意味着它在下午3点而不是下午5点被触发(因为3pm等于24小时时钟值“ 15”,可被5整除)。
crontab(minute=0, hour='*/3,8-17') 每小时执行一次可被3整除的时间,在办公时间内(上午8点至下午5点)每小时执行一次。
crontab(0, 0, day_of_month='2') 在每个月的第二天执行。
crontab(0, 0, day_of_month='2-30/2') 在每个偶数天执行一次。
crontab(0, 0, day_of_month='1-7,15-21') 在每月的第一和第三周执行。
crontab(0, 0, day_of_month='11', month_of_year='5') 每年5月11日执行。
crontab(0, 0, month_of_year='*/3') 在每个季度的第一个月每天执行一次。

solar 调度器

如果您有应根据日出,日落,黎明或黄昏执行的任务,则可以使用 solar计划类型:

1
2
3
4
5
6
7
8
9
10
from celery.schedules import solar

app.conf.beat_schedule = {
# Executes at sunset in Melbourne
'add-at-melbourne-sunset': {
'task': 'tasks.add',
'schedule': solar('sunset', -37.81753, 144.96715),
'args': (16, 16),
},
}

参数很简单: solar(event, latitude, longitude)

纬度和经度使用正确的符号:

Sign Argument 含义
+ latitude
- latitude
+ longitude
- longitude 西方

可能的事件类型:

事件 含义
dawn_astronomical 在天空不再完全黑暗的那一刻执行。这是当太阳低于地平线18度时。
dawn_nautical 当有足够的阳光照亮地平线并区分一些物体时执行。正式地,当太阳低于地平线12度时。
dawn_civil 当有足够的光线可辨别物体时执行,以便可以开始户外活动;正式地,当太阳在地平线以下6度时。
sunrise 当早晨早晨太阳的上边缘出现在东部地平线上方时执行。
solar_noon 当当天太阳最高到地平线以上时执行。
sunset 当傍晚太阳的后缘在西方地平线上消失时执行。
dusk_civil 当物体仍然可以区分并且可见一些恒星和行星时,在民航末尾执行。正式地,当太阳低于地平线6度时。
dusk_nautical 当太阳低于地平线12度时执行。物体不再可分辨,并且肉眼不再看到地平线。
dusk_astronomical 在天空完全变暗的那一刻执行;正式地,当太阳低于地平线18度时。

要启动celery定期调度任务:

1
$ celery -A proj beat

您还可以通过启用workers -B选项将beat嵌入到worker中,如果您永远不会运行一个以上的worker节点,这很方便,但是它不常用,因此不建议用于生产环境:

1
$ celery -A proj worker -B

Beat需要将任务的最后运行时间存储在本地数据库文件(默认情况下命名为celerybeat-schedule)中,因此需要访问才能在当前目录中进行写操作,或者可以为此文件指定一个自定义位置:

1
$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

4. 路由任务

自动路由

配置 task_routes 参数,celery将自动创建尚未在其中定义的命名队列 。这样可以轻松执行简单的路由任务。

1
task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}

启用上述配置后,import_feed 任务将被路由到 feeds 队列,其他任务将被路由到默认队列(默认是 celery )。

此外,还可以使用全局模式匹配甚至正则表达式来匹配名称空间中的所有任务:

1
app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}

启用路由配置后,就可以在启动 Worker 时使用 -Q 选项指定该 Worker 要处理的队列:

1
$ celery -A proj worker -Q feeds

可以指定任意数量的队列,因此也可以使该服务器处理默认队列:

1
$ celery -A proj worker -Q feeds,celery  # 逗号后面加空格会报错

手动路由配置

假设您有两个服务器x和y分别处理常规任务,一个服务器z仅仅处理与feed相关的任务,则可以使用以下配置:

1
2
3
4
5
6
7
8
9
10
from kombu import Queue

app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('feed_tasks', routing_key='feed.#'),
)
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_exchange_type = 'topic'
app.conf.task_default_routing_key = 'task.default'

要将任务路由到feed_tasks队列,可以在task_routes设置中添加一个条目 :

1
2
3
4
5
6
task_routes = {
'tasks.import_feed': {
'queue': 'feed_tasks',
'routing_key': 'feed.import',
},
}

您还可以在调用任务时使用 routing_key 参数覆盖此参数 :

1
2
3
4
5
from tasks import import_feed

import_feed.apply_async(args=['http://cnn.com/rss'],
queue='feed_tasks',
routing_key='feed.import')

任务优先级

  • RabbitMQ

可以通过设置x-max-priority参数将队列配置为支持优先级 :

1
2
3
4
5
6
from kombu import Exchange, Queue

app.conf.task_queues = [
Queue('tasks', Exchange('tasks'), routing_key='tasks',
queue_arguments={'x-max-priority': 10}),
]

可以使用以下 task_queue_max_priority 设置来设置所有队列的默认值 :

1
app.conf.task_queue_max_priority = 10

还可以使用以下task_default_priority设置来指定所有任务的默认优先级 :

1
app.conf.task_default_priority = 5
  • Redis

尽管芹菜Redis运输确实尊重优先领域,但Redis本身没有优先概念。在尝试使用Redis实施优先级之前,请阅读此说明,因为您可能会遇到一些意外的行为。

要根据优先级开始计划任务,您需要配置queue_order_strategy传输选项。

1
2
3
app.conf.broker_transport_options = {
'queue_order_strategy': 'priority',
}

通过为每个队列创建n个列表来实现优先级支持。这意味着即使有10(0-9)个优先级,默认情况下也会将其合并为4个级别以节省资源。这意味着名为celery的队列实际上将分为4个队列:

1
['celery0', 'celery3', 'celery6', 'celery9']

如果需要更多优先级,可以设置priority_steps传输选项:

1
2
3
4
app.conf.broker_transport_options = {
'priority_steps': list(range(10)),
'queue_order_strategy': 'priority',
}

也就是说,请注意,这永远不会像在服务器级别实现的优先级那样好,并且充其量可能是最好的。但这对于您的应用程序可能仍然足够好。

5. 监控和管理

Worker 管理命令

  • status:列出此集群中的活动节点。
1
$ celery -A proj status
  • result :显示任务的结果。
1
$ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
  • purge :从所有已配置的任务队列中清除消息。-Q选项指定要清除的队列,-X选项排除清除队列:
1
2
3
$ celery -A proj purge  # 全部清除
$ celery -A proj purge -Q celery,foo,bar
$ celery -A proj purge -X celery
  • inspect active :列出活动任务(当前正在执行的所有任务)。
1
$ celery -A proj inspect active
  • inspect scheduled :列出计划的ETA任务(设置了eta或countdown参数的任务 )。
1
$ celery -A proj inspect scheduled
  • inspect reserved :列出保留任务(这将列出工作者已经预取的所有任务,并且当前正在等待执行)。
1
$ celery -A proj inspect reserved
  • inspect revoked :列出已撤销任务的历史记录
1
$ celery -A proj inspect revoked
  • inspect registered :列出所有已注册任务
1
$ celery -A proj inspect registered
  • inspect stats :显示 worker 统计信息
1
celery -A proj inspect stats
  • inspect query_task :按ID显示有关任务的信息

  • $ celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8
    
    1
    2
    3
    4
    5

    - `control enable_events` :启用事件

    ```shell
    $ celery -A proj control enable_events
  • control disable_events :禁用事件

  • migrate :将任务从一个代理迁移到另一个代理

1
$ celery -A proj migrate redis://localhost amqp://localhost

Flower :celery实时监控工具

功能

  • 使用Celery Events进行实时监控
    • 任务进度和历史
    • 能够显示任务详细信息(参数,开始时间,运行时等)
    • 图形和统计
  • 遥控
    • 查看工作人员状态和统计数据
    • 关闭并重新启动工作程序实例
    • 控制工作池大小和自动缩放设置
    • 查看和修改工作人员实例从中使用的队列
    • 查看当前正在运行的任务
    • 查看计划的任务(ETA /倒计时)
    • 查看保留和撤销的任务
    • 应用时间和速率限制
    • 配置查看器
    • 撤销或终止任务
  • HTTP API
    • 列出工人
    • 关掉一个工人
    • 重新启动工人池
    • 增加工人的游泳池
    • 缩工池
    • 自动缩放工人池
    • 从队列开始消耗
    • 停止从队列消费
    • 列出任务
    • 列出(看到)任务类型
    • 获取任务信息
    • 执行任务
    • 按名称执行任务
    • 获取任务结果
    • 更改任务的软限制和硬限制
    • 任务的更改速率限制
    • 撤销任务
  • OpenID验证

用法:

您可以使用pip安装Flower:

1
$ pip install flower

运行flower命令将启动您可以访问的Web服务器:

1
$ celery -A proj flower

默认端口为5555,但是您可以使用 --port 参数更改此端口:

1
$ celery -A proj flower --port=5555

Broker URL也可以通过 --broker 参数传递 :

1
2
3
$ celery flower --broker=amqp://guest:guest@localhost:5672//
or
$ celery flower --broker=redis://guest:guest@localhost:6379/0

6. 最佳实践

  • 尽量不要使用数据库作为 AMQP Broker

随着 worker 的不断增多可能给数据库 IO 和连接造成很大压力。更具体来说不要把 Celery 的 task 数据和应用数据放到同一个数据库中。

  • 使用多个队列

对于不同的 task ,尽量使用不同的队列来处理。在 celery_config.py 中定义

1
2
3
4
task_queues=(
Queue('default', routing_key='default'),
Queue('other', routing_key='other'),
)

在 task 上定义

1
2
3
@app.task(queue='other')
def parse_something():
pass
  • 定义具有优先级的 workers

假如有一个 taskA 去处理一个队列 A 中的信息,一个 taskB 去处理队列 B 中的数据,然后起了 x 个 worker 去处理队列 A ,其他的 worker 去处理队列 B。而这时也可能会出现队列 B 中一些 task 急需处理,而此时堆积在队列 B 中的 tasks 很多,需要耗费很长时间来处理队列 B 中的 task。此时就需要定义优先队列来处理紧急的 task。

celery 中可以在定义 Queue 时,指定 routing_key

1
2
3
4
task_queues=(
Queue('other', routing_key='other_high'),
Queue('other', routing_key='other_low'),
)

然后定义

1
2
3
4
5
6
7
8
9
10
task_routes={
'path.to.task' : {
'queue': 'other',
'routing_key': 'other_high'
},
'path.to.task' : {
'queue': 'other',
'routing_key': 'other_low'
},
}

在启动 worker 时指定 routing_key

1
2
celery worker -E -l INFO -n workerA -Q other_high
celery worker -E -l INFO -n workerB -Q other_low
  • 使用 celery 的错误处理机制

一般情况下可能因为网络问题,或者第三方服务暂时性错误而导致 task 执行出错。这时可以使用 celery task 的重试机制。

1
2
3
4
5
6
7
@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A():
try:
print("doing stuff here...")
except SomeNetworkException as e:
print("maybe do some clenup here....")
self.retry(e)

一般添加 default_retry_delay 重试等待时间和 max_retries 重试次数来限定,防止任务无限重试。

  • 使用 Flower

Flower 为监控 celery tasks 和 workers 提供了一系列的便利。他使用 Web 界面提供 worker 当前状态, task 执行进度,各个 worker 详细信息,甚至可以在网页上动态更行执行速率。

  • 只有在真正需要时才去追踪 celery 的 result

任务的状态存储任务在退出时成功或者失败的信息,这些信息有些时候很重要,尤其是在后期分析数据时,但是大部分情况下更加关心 task 执行过程中真正想要保存的数据,而不是任务的状态。

所以,可以使用 task_ignore_result = True 来忽略任务结果。

  • 不要将 Database/ORM 对象传入 tasks

不应该讲 Database objects 比如一个 User Model 传入在后台执行的任务,因为这些 object 可能包含过期的数据。相反应该传入一个 user id ,让 task 在执行过程中向数据库请求全新的 User Object。

  • 尽量简化 tasks

task 应该简洁 (concise):

  • 将主要 task 逻辑包含在对象方法或者方法中
  • 确保方法抛出明确的异常 (identified exceptions)
  • 只有在切当的时机再实现重试机制

假设需要实现一个发送邮件的 task:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import requests
from myproject.tasks import app # app is your celery application
from myproject.exceptions import InvalidUserInput
from utils.mail import api_send_mail

@app.task(bind=True, max_retries=3)
def send_mail(self, recipients, sender_email, subject, body):
"""Send a plaintext email with argument subject, sender and body to a list of recipients."""
try:
data = api_send_mail(recipients, sender_email, subject, body)
except InvalidUserInput:
# No need to retry as the user provided an invalid input
raise
except Exception as exc:
# Any other exception. Log the exception to sentry and retry in 10s.
sentrycli.captureException()
self.retry(countdown=10, exc=exc)
return data

通常任务真实的实现只有一层,而剩余的其他部分都是错误处理。而通常这么处理会更加容易维护。

  • 设置 task 超时

设置一个全局的任务超时时间

1
task_soft_time_limit = 600   # 600 seconds

超时之后会抛出 SoftTimeLimitExceeded 异常

1
2
3
4
5
6
7
from celery.exceptions import SoftTimeLimitExceeded
@app.task
def mytask():
try:
return do_work()
except SoftTimeLimitExceeded:
cleanup_in_a_hurry()

同样,定义任务时也能够指定超时时间,如果任务 block 尽快让其失败,尽量配置 task 的超时时间。不让长时间 block task 的进程。

1
2
3
4
5
6
7
@app.task(
bind=True,
max_retries=3,
soft_time_limit=5 # time limit is in seconds.
)
def send_mail(self, recipients, sender_email, subject, body):
...
  • 将 task 重复部分抽象出来

使用 task 的基类来复用部分 task 逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from myproject.tasks import app
class BaseTask(app.Task):
"""Abstract base class for all tasks in my app."""
abstract = True
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""Log the exceptions to sentry at retry."""
sentrycli.captureException(exc)
super(BaseTask, self).on_retry(exc, task_id, args, kwargs, einfo)
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Log the exceptions to sentry."""
sentrycli.captureException(exc)
super(BaseTask, self).on_failure(exc, task_id, args, kwargs, einfo)

@app.task(bind=True, max_retries=3, soft_time_limit=5, base=BaseTask)
def send_mail(self, recipients, sender_email, subject, body):
"""Send a plaintext email with argument subject, sender and body to a list of recipients."""
try:
data = api_send_mail(recipients, sender_email, subject, body)
except InvalidUserInput:
raise
except Exception as exc:
self.retry(countdown=backoff(self.request.retries), exc=exc)
return data
  • 将大型 task 作为类

一般情况下将使用方法作为 task 就已经足够,如果遇到大型 task ,可以将其写成类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class handle_event(BaseTask):   # BaseTask inherits from app.Task
def validate_input(self, event):
...
def get_or_create_model(self, event):
...
def stream_event(self, event):
...
def run(self, event):
if not self.validate_intput(event):
raise InvalidInput(event)
try:
model = self.get_or_create_model(event)
self.call_hooks(event)
self.persist_model(event)
except Exception as exc:
self.retry(countdown=backoff(self.request.retries), exc=exc)
else:
self.stream_event(event)
  • 单元测试

直接调用 worker task 中的方法,不要使用 task.delay() 。 或者使用 Eager Mode,使用 task_always_eager 设置来启用,当启用该选项之后,task 会立即被调用。而 这两种方式都只能测试 task worker 中的内容,官方并不建议这么做。

  • 对于执行时间长短不一的任务建议开启 -Ofair

celery 中默认都会有 prefork pool 会异步将尽量多的任务发送给 worker 执行,这也意味着 worker 会预加载一些任务。这对于通常的任务会有性能提升,但这也容易导致因为某一个长任务处理时间长,而导致其他任务处于长时间等待状态。

对于执行时间长短不一的任务可以开启 -Ofair

1
celery -A proj worker -l info -Ofair
  • 设置 worker 的数量

Celery 默认会开启和 CPU core 一样数量的 worker,如果想要不想开启多个 worker ,可以通过启动时指定 --concurrency 选项

1
--concurrency=1
  • 在 Celery 中使用多线程

上面提到使用 --concurrency=1 或者 -c 1 来设置 worker 的数量,Celery 同样支持 Eventlet 协程方式,如果你的 worker 有大量的 IO 操作,网络请求,那么此时使用 Eventlet 协程来提高 worker 的执行效率。确保在使用 Eventlet 之前对 Eventlet 非常了解,否则不要轻易使用

1
celery -A proj worker -P eventlet -c 10

7. 并发

启用Eventlet

可以使用 worker -P 选项启用Eventlet。

1
$ celery -A proj worker -P eventlet -c 1000

Linux常用设备与文件描述符

1. /dev/null

​ 在类Unix系统中,/dev/null,或称空设备,是一个特殊的设备文件,它丢弃一切写入其中的数据(但报告写入操作成功),读取它则会立即得到一个EOF。在程序员行话,尤其是Unix行话中,/dev/null 被称为位桶(bit bucket)或者黑洞(black hole)。空设备通常被用于丢弃不需要的输出流,或作为用于输入流的空文件。这些操作通常由重定向完成。

2. /dev/zero

​ 在类UNIX 操作系统中, /dev/zero 是一个特殊的文件,当你读它的时候,它会提供无限的空字符(NULL, ASCII NUL, 0x00)。其中的一个典型用法是用它提供的字符流来覆盖信息,另一个常见用法是产生一个特定大小的空白文件。BSD就是通过mmap把/dev/zero映射到虚地址空间实现共享内存的。可以使用mmap将/dev/zero映射到一个虚拟的内存空间,这个操作的效果等同于使用一段匿名的内存。

3. 文件描述符0

  • 标准输入文件(stdin):stdin的文件描述符为0,Unix程序默认从stdin读取数据。

4. 文件描述符1

  • 标准输出文件(stdout):stdout 的文件描述符为1,Unix程序默认向stdout输出数据。

5. 文件描述符2

  • 标准错误文件(stderr):stderr的文件描述符为2,Unix程序会向stderr流中写入错误信息。

6. &

​ 后台运行,常见格式如下: command &

7. &&

​ &&运算符用于两个命令之间的连接,格式如下:

1
command1  && command2

注意:

  • 命令之间使用 && 连接,实现逻辑与的功能。

  • 只有在 && 左边的命令返回真(命令返回值 $? == 0),&& 右边的命令才会被执行。

  • 只要有一个命令返回假(命令返回值 $? == 1),后面的命令就不会被执行。

8. 重定向

命令 说明
command > file 将输出重定向到 file。
command < file 将输入重定向到 file。
command >> file 将输出以追加的方式重定向到 file。
n > file 将文件描述符为 n 的文件重定向到 file。
n >> file 将文件描述符为 n 的文件以追加的方式重定向到 file。
n >& m 将输出文件 m 和 n 合并。
n <& m 将输入文件 m 和 n 合并。
<< tag 将开始标记 tag 和结束标记 tag 之间的内容作为输入。

常见的高级用法如下:

  • 将标准错误文件和标准输出文件合并,并输入到空洞中,起到禁止输出的效果

    1
    command > /dev/null 2>&1
  • 将标准输入和标准输入都重定向

    1
    command < file1 > file2
  • /dev/zero常用来创建空洞文件

    1
    dd if=/dev/zero bs=4M count=10 of=afile

计算机系统知识(一)

  • 计算机系统基础知识

    • 计算机系统硬件的基本组成

      计算机硬件由运算器、控制器、存储器、输入设备、输出设备组成;

      运算器、控制器和存储器合起来为CPU;

      输入设备与输出设备为外设;

      存储器为记忆设备,分为外部存储器和内部存储器。

    • 中央处理单元

      ​ 负责获取程序的指令、并对指令进行编码、执行指令。

      • CPU功能

        • 程序控制。CPU通过控制指令的执行来控制程序的执行顺序;

        • 操作控制。CPU产生每条操作指令的操作信号,并将操作信号送往对应的部件,控制对应的部件按指令的功能进行操作;

        • 时间控制。CPU对各种操作的时间进行控制,指令执行过程中操作信号的产生、持续时间、已经出现的时间顺序进行严格的控制。

        • 数据处理。CPU对数据进行数据运算和逻辑运算,对数据加工处理,以便于处理结果被使用。

      • CPU组成

      CPU组成

      CPU主要是由运算器、控制器、寄存器组、内部总线等组成;

      • 运算器

      ​ 运算器由算数逻辑单元、累加寄存器、数据缓冲寄存器、状态条件寄存器组成。进行数据加工处理,用于计算机的算数计算和逻辑计算。

      ​ 主要功能有两点:

      1. 执行所有的算数运算。进行加、减、乘、除以及附加运算。

      2. 执行所有的逻辑运算和进行逻辑测试。或、与、非、零值测试与数值比较。

        • 算数逻辑单元(ALU)

          ​ 负责数据处理,算数运算和逻辑运算。

        • 累加寄存器(AC)

        • 数据缓冲寄存器(DR)

        • 状态条件寄存器

      • 控制器

      ​ 控制整个CPU工作,决定了运行过程的自动化。

      ​ 控制器包括指令控制逻辑、时序控制逻辑、总线控制逻辑、中断控制逻辑。

      ​ 指令控制逻辑:

      ​ 完成获取指令、分析指令、执行指令等操作。即取指令、指令译码、按指令执行码执行、形成下一条执行码地址。

      ​ 时序控制逻辑:

      ​ 为每一条指令按时间顺序提供应有的控制信号。

      ​ 总线控制逻辑:

      ​ 为多个功能部件服务的信息通路的控制电路。

      ​ 中断控制逻辑:

      ​ 控制各种中断请求,并按系统的优先级进行排队,然后逐一交给CPU执行。

      • 寄存器组

      ​ 分类:

      ​ 专用寄存器:运算器和控制器中的寄存器为专用寄存器,功能不能更改。

      ​ 通用寄存器:功能可以由人员控制。

      • 多核CPU
    • 数据表示
      • 权值表示法(R进制转十进制)

      ​ 以小数点为准位,小数点左侧高位的标示位从低位向高加一,由侧由0减一;

      ​ 例如:

      1
      2
      3
      数  值:  7 6 1 . 1 1
      标示位: 2 1 0 -1 -2
      (761.11)8 = 7*8^2 + 6*8^1 + 1*8^0 + 1*8^-1 + 1*8^-2
      • 进制转化(十进制转R进制 除R取余法)

        除R取余,逆向输出。(辗转相除法)

        • 二进制与八进制互转

          8进制一个数由3位2进制表示;

          从低位向高位,每3位为一组。

          8进制转2进制,将每一位用二进制表示即可。

        • 二进制与十六进制互转

        ​ 16进制一个数由4位2进制表示;

        ​ 从低位到高位,每4位为一组。

        ​ 16进制转2进制,将每一位用二进制表示即可。

      • 数据运算

        • 原码

          一个数据8位,对于有符号位,最高位为符号位。0表示正数,1表示负数;

          原码就是将数表示为二进制数;

        • 反码

          正数的反码,等于它的原码;

          负数的反码,在原码的基础上,符号位不变,其他位取反;

        • 补码

          正数的补码,等于它的原码;

          负数的补码,在反码的基础上 加1;

        • 移码

      ​ 在补码的基础上,符号位取反;

      ​ 总结:

      正数 三码合一;

      ​ 计算机计算都是按 补码 进行计算。

      • 数据表示

        • 定点数

          小数点位置不变的数。

          定点整数:小数点位置在最低有效位置之后(纯整数)

          定点小数:小数点位置在最高有效位置之前(纯小数)

        • 浮点数

          小数点位置不固定的数,能表示更大范围。

          • 表示方法:

            ​ N = 2 ^ E * F

            ​ E: 阶码

            ​ F: 尾数

          • 组成部分

            ​ 阶符 阶码 数符 尾数

          • 格式化

            ​ 将尾数的绝对值限定在[0.5, 1]之间。

            ​ 工业标准IEEE 754: (-1)^S 2^E (b0b1b2…bp)

          IEEE 754浮点数对照

          ​ 当S为0时表示正数,1表示负数;

          ​ E为指数,即阶码,用移码表示;

          ​ (B0b1b2…bp) 为尾数,长度为P,用原码表示。

          ​ 阶码计算:指数 + 浮点数指定的偏移量(例如单精度偏移量为127)

          ​ 例子:将176.0625转化为单精度浮点数

          ​ 176.0625将10进制转为2进制:

          ​ 整数部分: 176除2取余为: 10110000

          ​ 小数部分: 0.0625乘2取整为: 0001

          ​ 转化后为: (10110000.0001)2 小数点不占位

          ​ 格式化处理: 1.01100000001 * 2^7(b0可以为1)

          ​ 将b0的尾部拓展单精度23位: 01100000001000000000000

          ​ 阶符号为0,正7,阶码要求偏移量,127 + 7 = 134,即10000110

          ​ 最后表示为:

          ​ 0 10000110 01100000001000000000000

          • 浮点数计算

            第一步: 浮点数进行加减运算时,先进行对阶,低位向高位对,尾数右移;

            第二步: 尾数求差和;

            第三步:结果进行格式化并溢出判断;如果尾数溢出,则需要调整阶码;

            第四步:阶码溢出判断。如果阶码溢出,则运算结果溢出,如果阶码下溢,则结果位0,否则结果正确。

          • 浮点数乘除法

            乘法: 其结果阶码等于阶码相加,尾数相乘;

            除法: 其结果阶码等于被除数减除数的阶码,尾数相除;

    • 校验码
      • 作用

        检测传输的数据是否出错

        码距:是指在一个编码系统中,任意两个合法编码之间至少有多少个不同的二进制位;

      • 分类

        • 奇偶校验码

        • 海明码

          通过在数据位的特定位置插入K个校验位,通过扩大码距进行校错和纠错。

          编码过程:

          A. 设海明码为n位,则K的取值应满足下面条件:

          ​ 2^k - 1 >= n + k

          ​ 求出k的最小值即可.

          B. 编码规则

          ​ 则添加的海明码位为k个,分别为P1,P2,P3…Pk, 原始数据为: D0, D1,…,D(n-1),设生成的海明码为H1,H2….H(n+k).

          ​ 将海明码位k个与数据n分别对应到生成的海明码H中,其中H和P的对应关系为:

          ​ Pi = H(2^(i - 1))

          ​ 即: P1 = H1

          ​ P2 = H2

          ​ P3 = H4

          ​ …

          C. 校验规则

          ​ 海明码中的任何一位都是由若干校验位进行校验;

          ​ 校验位由自身校验(海明码位P);

          ​ 被校验的海明码位(数据位D)的下标等于所有参与校验该位的校验位的下标之和.

          D. 对应关系表

          海明码校验关系表

          如果再用奇校验,则将各位取反即可;

          • 错误检测

            G1 = P1 ⊕ D0 ⊕ D1 ⊕ D3 ⊕ D4 ⊕ D6

            G2 = P2 ⊕ D0 ⊕ D2 ⊕ D3 ⊕ D5 ⊕ D6

            G3 = P3 ⊕ D1 ⊕ D1 ⊕ D3 ⊕ D7

            G4 = P4 ⊕ D4 ⊕ D5 ⊕ D6 ⊕ D7

            如果采用偶校验,则G1G2G3G4全部为0,则校验成功,如果不是全部为0,则存在校验失败,找到对应为1的进行数据修正.(如果奇校验,则相反1)

          • 数据位出错查判

            例如,校验后的G1G2G3G4 = 0101

            说明G1和G4出现问题,但是G0和G3没有问题,可以排除D0, D1, D3, D4, D6,D7,这个可以在G2G3中的到,则出问题的就是D5,即H(10)

          • 根据数据求海明码

            • 求出P1, P2, P3, Pk的值根据公式

            • 将数据填充到H1至H(k+n)中

        • 循环冗余校验码(CRC)

        ​ 利用生成多项式,为k个数据生成r个校验位来进行编码.

        ​ 组成:

        CRC 校验码由信息码生成,校验码越长,校验能力越强.CRC编码采用模2运算.

Mac Application快捷键神器

  • 引言

​ app应用越来越多,要提升软件的使用效率,快捷键自然是占据重要位置,那么,每个应用的快捷键如此之多,如果去记忆每一个应用的快捷键,那将是一个繁琐枯燥的工作,并且常用的也就那么几个,不常用的偶尔使用,又去查询快捷键又比较费时,那么该如何解决这样的问题呢?

  • 快捷键之神 - CheatSheet

​ 告别快捷键的记忆与查询,按住command5秒,快捷键一览无余。

  • 哪里下载?

    下载地址如下:

    1
    https://www.mediaatelier.com/CheatSheet/
  • 权限问题解决?

    • 下载完成后,双击解压zip包,并将应用移动到 应用程序目录下。

    • 双击应用,如果弹出非认证打不开,则在 系统偏好设置—>安全性与隐私—>通用 中允许打开CheatSheet;

    • 允许App控制电脑权限打开;还是在安全性与隐私中,点击 辅助功能,将CheatSheet应用对勾打开即可。

      权限配置

  • 软件怎么使用?

    ​ 在权限配置完成之后,随便点击一个应用,然后按住 command键5秒左右,就会弹出这个应用相关的快捷键操作手册。

  • 操作示例在哪里?

    • 以Goland为例,打开Goland应用程序;

    • 长按 command 5s左右,就会弹出如下快捷键手册,松开按键即可恢复到应用页面。

      操作键手册

    • 可以在页面中上下滑动进行查找;

leetcode-15 三数之和

  • 题目描述

    ​ 给你一个包含 n 个整数的数组 nums,判断 nums 中是否存在三个元素 a,b,c ,使得 a + b + c = 0 ?请你找出所有满足条件且不重复的三元组。

    ​ 注意:答案中不可以包含重复的三元组。

    ​ 示例:

    给定数组 nums = [-1, 0, 1, 2, -1, -4],

    满足要求的三元组集合为:

    1
    2
    3
    4
    [
    [-1, 0, 1],
    [-1, -1, 2]
    ]
  • 解题说明

    • 将数组进行排序,从左向右依次从小到大进行排序;

    • 判断数组长度是否满足输入,如果长度小于等于2,则返回空数组;

    • 定位找值,先固定一个数值,然后定义两个指针,进行左右移动进行目标值确定。如果三个数和等于0,则将三个数加入到结果数组中,在进行判重,如果左指针的值等于左指针加一的值,即相邻数相等,则左指针继续加一,判重条件为左指针小于右指针,退出循环。退出循环后,左右指针各加一,继续从中间向两边查找。如果计算结果大于0,则右指针左移,否则左指针右移。

指针移动

  • 代码实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package main

import (
"fmt"
"sort"
)

/*
@Time : 2020/8/29 19:59
@Author : austsxk
@Email : austsxk@163.com
@File : day1_threeNumberSum.go
@Software: GoLand
*/

func threeSum(nums []int) [][]int {
// 将数组进行从小到大的排序
sort.Ints(nums)
if len(nums) <= 2 {
return [][]int{}
}
var result [][]int
var left, right = 0, 0
for i := 0; i < len(nums); i ++ {
// 如果最左侧的值大于0,则后面的值加起来肯定超过0,直接退出循环
if nums[i] > 0 {
break
}
// 如果连续数值相同,则直接进行下一个
if i > 0 && nums[i] == nums[i-1] {
continue
}

left = i + 1
right = len(nums) - 1
for left < right {
data := nums[i] + nums[left] + nums[right]
// 如果正好等于0,则左右都移动,向中间逼近,并进行判重
if data == 0 {
result = append(result, []int{nums[i], nums[left], nums[right]})
// 此处去重复,如果从左侧连着一样,直接去除掉
for left < right {
if nums[left] == nums[left+1] {
left ++
} else {
break
}
}
// 判断去重复后,直接左右向中间移动
left ++
right --
}
// 如果小于0,则说明左侧值偏小,则左指针右移动
if data < 0 {
left ++
}
// 如果大于0,说明右侧值偏大,则指针左移动
if data > 0 {
right --
}
}
}
return result
}

func main() {
var testList []int = []int{-1,0,1,2,-1,-4}
d := threeSum(testList)
fmt.Println(d)
}

picGo配合github图床、七牛云图床的使用

    1. Token的生成

      ​ 进入个人中心,依次点击,settings—> developer setting —> personal aceess tokens 。点击generate token;然后写一个描述,全部都点中;最后生成token。

      生成token

    1. picGo中Github图床的设置

    密钥管理

    1. 图片上传使用即可。
  • picGo配置七牛云图床【已经申请好七牛云存储空间/免费10G】

      1. 下载对应版本的picGo应用
    1
    下载链接: https://github.com/Molunerfinn/PicGo/releases
      1. AccessKey和SecretKey:可以在七牛云控制台,秘钥管理页面找到你的配置。登陆七牛云->个人中心->密钥管理

    密钥管理

      1. 存储空间名:填写前面创建的空间名称

    存储空间信息

      1. 访问地址:对应自己设置的加速域名或者七牛云的免费域名(30天)
      1. 存储区域:七牛云的存储区域,根据你空间所在的区域,填对应的代码。(不能写汉字,只能用代码表示)
    1
    2
    3
    4
    5
    华东 z0
    华北 z1
    华南 z2
    北美 na0
    东南亚 as0
      1. 进行picGo七牛云图床的参数设置,并进行保存。

      1. 上传图片。将图片拖拽进图片区,或者将图片复制到剪切板上,直接点击剪切板图片即可。

leetcode-8 字符串转换整数(atoi)

  • 题目描述

    字符串转换整数 (atoi)

    请你来实现一个 atoi 函数,使其能将字符串转换成整数。

    ​ 首先,该函数会根据需要丢弃无用的开头空格字符,直到寻找到第一个非空格的字符为止。接下来的转化规则如下:

    • 如果第一个非空字符为正或者负号时,则将该符号与之后面尽可能多的连续数字字符组合起来,形成一个有符号整数。
    • 假如第一个非空字符是数字,则直接将其与之后连续的数字字符组合起来,形成一个整数。
    • 该字符串在有效的整数部分之后也可能会存在多余的字符,那么这些字符可以被忽略,它们对函数不应该造成影响。

    注意:假如该字符串中的第一个非空格字符不是一个有效整数字符、字符串为空或字符串仅包含空白字符时,则你的函数不需要进行转换,即无法进行有效转换。

    在任何情况下,若函数不能进行有效的转换时,请返回 0 。

    提示:

    • 本题中的空白字符只包括空格字符 ' '
    • 假设我们的环境只能存储 32 位大小的有符号整数,那么其数值范围为 [−231, 231 − 1]。如果数值超过这个范围,请返回 INT_MAX (231 − 1) 或 INT_MIN (−231) 。

示例 1:

1
2
输入: "42"
输出: 42

示例 2:

1
2
3
4
输入: "   -42"
输出: -42
解释: 第一个非空白字符为 '-', 它是一个负号。
我们尽可能将负号与后面所有连续出现的数字组合起来,最后得到 -42 。

示例 3:

1
2
3
输入: "4193 with words"
输出: 4193
解释: 转换截止于数字 '3' ,因为它的下一个字符不为数字。

示例 4:

1
2
3
4
输入: "words and 987"
输出: 0
解释: 第一个非空字符是 'w', 但它不是数字或正、负号。
因此无法执行有效的转换。

示例 5:

1
2
3
4
输入: "-91283472332"
输出: -2147483648
解释: 数字 "-91283472332" 超过 32 位有符号整数范围。
因此返回 INT_MIN (−231) 。
  • 解题思路

    • 先去除字符串两边的空格
    1
    str = strings.TrimSpace(str)
    • 遍历字符串
    1
    for index, value := str {}
    • 使用flag标记保存数据的正符号,并使用索引判断负号位置的合理性
    • 对最终的结果进行校验返回
    1
    2
    math.MaxInt32
    math.MinInt32
  • 题解代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"fmt"
"math"
"strings"
)
func myAtoi2(str string) int {
//1. 去掉收尾空格
str = strings.TrimSpace(str)
number := 0
flag := 1

for index, value := range str {
// 如若是在0 - 9之间,则进行逐步计算
if value >= '0' && value <= '9' {
number = number*10 + int(value-'0')
fmt.Println(number,flag)
} else if value == '-' && index == 0 {
// 如果是符号,但是如果不是第一位,则直接退出,如果是则记录符号
flag = -1
} else if value == '+' && index == 0 {
flag = 1
} else {
// 如果都不在,则之间返回0
break
}
// 超阈值校验
if number > math.MaxInt32 {
if flag == 1 {
return math.MaxInt32
} else {
return math.MinInt32
}
}
}
return number * flag
}

leetcode-7 字符串Z字(V字)输出

  • 题目描述

    Z字变换

    将一个给定字符串根据给定的行数,以从上往下、从左到右进行 Z 字形排列。

    比如输入字符串为 "LEETCODEISHIRING" 行数为 3 时,排列如下:

    1
    2
    3
    L   C   I   R
    E T O E S I I G
    E D H N

    之后,你的输出需要从左往右逐行读取,产生出一个新的字符串,比如:"LCIRETOESIIGEDHN"

    示例 1:

    1
    2
    输入: s = "LEETCODEISHIRING", numRows = 3
    输出: "LCIRETOESIIGEDHN"

    示例 2:

    输入: s = “LEETCODEISHIRING”, numRows = 4
    输出: “LDREOEIIECIHNTSG”
    解释:

    1
    2
    3
    4
    L     D     R
    E O E I I
    E C I H N
    T S G
  • 说明

    Z字转化其实就是交替输出字符,如下图所示:

    Z字转化

  • 解答

    • Python
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    # -*- coding:utf-8 -*-
    # @Time : 2020-08-29 13:29
    # @Author : 宋晓奎
    # @Email : austsxk@vip.qq.com
    # @File : item_07Zconvert.py
    # @Software : PyCharm


    class Resolution(object):
    def convert(self, s: str, row: int) -> str:
    """
    将字符串转化为z字
    :param s: 字符串
    :param row: 行数
    :return:
    """
    if row <= 1:
    return s
    begin, flg = 0, -1
    array = ['' for _ in range(len(s))]
    for c in s:
    array[begin] += c
    if begin == 0 or begin == row - 1:
    flg *= -1
    begin += flg
    return "".join(array)
  • Golang
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    package main
    import "strings"
    /*
    @Time : 2020/8/28 13:48
    @Author : austsxk
    @Email : austsxk@163.com
    @File : day29_Zupdate.go
    @Software: GoLand
    */

    6. Z 字形变换
    func convert(s string, numRows int) string {
    if len(s) <= 1 {
    return s
    }
    // 初始化控制的索引和控制的标志
    begin, flag := 0, -1
    // 将每一行的字符进行拼接,然后最后将数组拼接
    array := make([]string, len(s))
    for _, value := range []rune(s) {
    array[begin] += string(value)
    if begin == 0 || begin == numRows -1 {
    // 如果是第一行和最后以后,在拼接完字符后,应该立马进行转化flag,反向操作
    flag *= -1
    }
    // 通过flag的改变控制下次拼接字符的索引,即行数
    begin += flag
    }
    // 将数据进行拼接后返回
    data := strings.Join(array, "")
    return data
    }

leetcode-3 最长子字符串

  • 题目描述

    给定一个字符串,请你找出其中不含有重复字符的 最长子串的长度及其开始位置和结束为止。

    示例 1:
    输入: “abcabcbb”
    输出: 3
    解释: 因为无重复字符的最长子串是 “abc”,所以其长度为 3。

  • 解答
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package main

/*
@Time : 2020/8/26 18:29
@Author : austsxk
@Email : austsxk@163.com
@File : lengthOfLongestSubString.go
@Software: GoLand
*/

import ("fmt")

// 位图法
func lengthOfLongestSubStringBitMap(s string) (int, int, int) {
if len(s) == 0 {
return 0, 0, 0
}
var BitMap [128]bool
// 初始化目标值,左指针,右指针,开始位置与结束位置
var target, left, right, start, end = 0, 0, 0, 0, 0
for left < len(s) {
// 遍历字符,只要不存在,则right一直加,否则,左指针移动
if BitMap[s[right]] {
// 如果右指针存在值
// 左指针置为False
BitMap[s[left]] = false
left ++
} else {
// 如果存在相同元素,则左指针右移,继续进行判断是否存在右指针元素重复
BitMap[s[right]] = true
right ++
}
// 计算最大长度并进行更新
if target < right - left {
target = right - left
}
// 出口
if right >= len(s) || left + target >= len(s) {
end = right
start = end - target
break
}

}
return target, start, end
}

// hash法,字典方法,同上
func lengthOfLongestSubStringHash(s string) (int, int, int) {
if len(s) == 0 {
return 0, 0, 0
}
var HashMap map[string]bool = map[string]bool{}
var target, left, right, start, end = 0, 0, 0, 0, 0
for left < len(s) {
// 遍历字符,只要不存在,则right一直加,否则,左指针移动
if HashMap[string(s[right])] {
// 如果右指针存在值
// 左指针置为False
HashMap[string(s[left])] = false
left ++
} else {
HashMap[string(s[right])] = true
right ++
}
if target < right - left {
target = right - left
}
// 出口
if right >= len(s) || left + target >= len(s) {
end = right
start = end - target
break
}
}
return target, start, end
}

// 滑动窗口法
func lengthOfLongestStringSplitWindows(s string) (int, int, int) {
if len(s) == 0 {
return 0, 0, 0
}
var windows [128]int
var target, left, right = 0, 0, 0
for left < len(s) {
if windows[s[right] - 'a'] == 0 && right + 1 < len(s) {
windows[s[right] - 'a'] ++
right ++
} else {
windows[s[left] - 'a']--
left ++
}
if target < right - left {
target = right - left
}
}

return target, right - target, right
}

func main() {
length, start, end := lengthOfLongestSubStringBitMap("pwwkew")
length1, start1, end1 := lengthOfLongestStringSplitWindows("pwwkew")
fmt.Println(length, start, end)
fmt.Println(length1, start1, end1)
}
本站总访问量 本站总访客数 本文总阅读量
载入天数...载入时分秒...