Celery là gì

Trong dự án hiện tại của mình khi tới phần scaling hệ thống thì kiến trúc hiện tại theo hướng microservice gặp phải một vấn đề: mọi service trong hệ thống đều tương tác trực tiếp với database nên xảy ra vấn đề càng nhiều service thì càng nhiều kết nối tới database dẫn đến tình trạng xảy ra deadlock, performance cũng rất chậm do các kết nối tới database từ các service phải chờ nhau giải phóng.

Bạn đang xem: Celery là gì

Sau khi được gợi ý về việc chuyển sang dùng hàng đợi thay vì để những service thao tác trực tiếp với database, mình có dành thời hạn tìm hiểu và khám phá thêm về kiến trúc Queue. Do dự án chạy đa phần bằng python nên tech lead gợi ý sử dụng Celery, một mạng lưới hệ thống quản trị queue thông dụng .Kiến trúc sau khi chuyển sang sử dụng queue trong mạng lưới hệ thống của mình sẽ như sau. Một bài viết khá cụ thể về một dạng phong cách thiết kế queue là message queue mọi người hoàn toàn có thể đọc thêm ở toidicodedao

Bạn đang đọc: Celery là gì

*Về CeleryLà một mạng lưới hệ thống quản trị hàng đợi giải quyết và xử lý task thời hạn thực. Trong mạng lưới hệ thống Celery tất cả chúng ta sẽ sử dụng khái niệm task giống như job ở 1 số ít framework khác như Sidekiq. Input của celery cần liên kết với một loại message broker còn output hoàn toàn có thể liên kết tới một mạng lưới hệ thống backend để tàng trữ tác dụngMọi người hoàn toàn có thể tìm hiểu thêm một bài viết khác về Celery trên viblo ở đây. Ngoài ra Celery cũng có một mạng lưới hệ thống document chi tiết cụ thể và dễ đọc ở trang chủ https://docs.celeryproject.org/en/latest/getting-started/introduction.html .Các bài toán nên sử dụng CeleryChạy background jobsChạy các job lập lịchTính toán phân tánXử lý song songCác chức năng chính Celery cung cấpMonitor: giám sát các job/task được đưa vào queueScheduling: chạy các task lập lịch (giống cronjob)Workflows: tạo một luồng xử lý taskTime & Rate Limits: kiểm soát số lượng task được thực thi trong một khoảng thời gian, thời gian một task được chạy,…Resource Leak Protection: kiểm soát tài nguyên trong quá trình xử lý taskUser Component: cho phép người dùng tự customize các worker.Cơ chế của CeleryCelery hoạt động dựa trên khái niệm task queue. Đây là cơ chế queue dùng để điều phối các job/work giữa các máy khác nhau. Các worker sẽ nhận task, chạy task và trả về kết quả.Input của queue:TaskCác process trên từng worker sẽ theo dõi queue để thực thi các task mới được đẩy vào queueCelery thường dùng một message broker để điều phối task giữa các clients và worker. Để tạo một task mới client sẽ thêm một message vào queue, broker sau đó sẽ chuyển message này tới worker. Celery hỗ trợ 3 loại broker:RabbitMQRedisSQSMột hệ thống sử dụng celery có thể có nhiều workers và brokers, nhờ vậy việc scale theo chiều ngang sẽ rất dễ dàng.Các module chính của Celery

Application

Các bài toán nên sử dụng CeleryChạy background jobsChạy những job lập lịchTính toán phân tánXử lý tuy nhiên songCác công dụng chính Celery cung cấpMonitor : giám sát những job / task được đưa vào queueScheduling : chạy những task lập lịch ( giống cronjob ) Workflows : tạo một luồng giải quyết và xử lý taskTime và Rate Limits : trấn áp số lượng task được thực thi trong một khoảng chừng thời hạn, thời hạn một task được chạy, … Resource Leak Protection : trấn áp tài nguyên trong quy trình giải quyết và xử lý taskUser Component : được cho phép người dùng tự customize những worker. Cơ chế của CeleryCelery hoạt động giải trí dựa trên khái niệm task queue. Đây là chính sách queue dùng để điều phối những job / work giữa những máy khác nhau. Các worker sẽ nhận task, chạy task và trả về hiệu quả. Input của queue : TaskCác process trên từng worker sẽ theo dõi queue để thực thi những task mới được đẩy vào queueCelery thường dùng một message broker để điều phối task giữa những clients và worker. Để tạo một task mới client sẽ thêm một message vào queue, broker sau đó sẽ chuyển message này tới worker. Celery tương hỗ 3 loại broker : RabbitMQRedisSQSMột mạng lưới hệ thống sử dụng celery hoàn toàn có thể có nhiều workers và brokers, nhờ vậy việc scale theo chiều ngang sẽ rất thuận tiện. Các module chính của CeleryMột instance được khởi tạo từ thư viện Celery được gọi là applicationNhiều Celery application hoàn toàn có thể cùng sống sót trong một processKhởi tạo một celery application :from celery import Celeryapp = Celery ( ) Khi gửi một message tới queue, message đó sẽ chỉ chứa tên của task cần thực thi .Các celery worker sẽ map giữa tên của task với hàm thực thi task đó, việc mapping như vậy được gọi là task registry

app.taskdef add(x, y):return x + y

Tasks

Task trong Celery có hai nhiệm vụ chính:định nghĩa những gì sẽ xảy ra sau khi một task được gọi (gửi đi message)định nghĩa những gì sẽ xảy ra khi một worker nhận được message đóMỗi task có một tên riêng không trùng lặp, tên này sẽ được refer trong message để worker có thể tìm được đúng hàm để thực thi. Nếu không định nghĩa tên cho task thì task đó sẽ được tự đặt tên dựa vào module mà task được định nghĩa và tên function của task.Các message của task sẽ không bị xóa khỏi queue chừng nào message đó chưa được một worker xử lý. Một worker có thể xử lý nhiều message, nếu worker bị crash mà chưa xử lý hết các message đó thì chúng vẫn có thể được gửi lại tới một worker khácCác function của task nên ở trạng thái idempotent: function không gây ra ảnh hưởng gì kể cả khi có bị gọi nhiều lần với cùng một tham số => một task đã thực thi sẽ đảm bảo không bị chạy lại lần nữa.

Tạo task

Để tạo task chúng ta dùng decorator
app.task(name=”create_new_user”)def create_user(username, password):User.objects.create(username=username, password=password)Để task có thể retry chúng ta có thể bound task vào chính instance của nóTask trong Celery có hai trách nhiệm chính : định nghĩa những gì sẽ xảy ra sau khi một task được gọi ( gửi đi message ) định nghĩa những gì sẽ xảy ra khi một worker nhận được message đóMỗi task có một tên riêng không trùng lặp, tên này sẽ được refer trong message để worker hoàn toàn có thể tìm được đúng hàm để thực thi. Nếu không định nghĩa tên cho task thì task đó sẽ được tự đặt tên dựa vào module mà task được định nghĩa và tên function của task. Các message của task sẽ không bị xóa khỏi queue chừng nào message đó chưa được một worker giải quyết và xử lý. Một worker hoàn toàn có thể giải quyết và xử lý nhiều message, nếu worker bị crash mà chưa giải quyết và xử lý hết những message đó thì chúng vẫn hoàn toàn có thể được gửi lại tới một worker khácCác function của task nên ở trạng thái idempotent : function không gây ra ảnh hưởng tác động gì kể cả khi có bị gọi nhiều lần với cùng một tham số => một task đã thực thi sẽ bảo vệ không bị chạy lại lần nữa. Để tạo task tất cả chúng ta dùng decoratorapp.task ( name = ” create_new_user ” ) def create_user ( username, password ) : User. objects.create ( username = username, password = password ) Để task hoàn toàn có thể retry tất cả chúng ta hoàn toàn có thể bound task vào chính instance của nó

task(bind=True)def add(self, x, y):logger.info(self.request.id)Task cũng có thể kế thừa

import celeryclass MyTask(celery.Task):def on_failure(self, exc, task_id, args, kwargs, einfo): print(“{0!r} failed: {1!r}”.format(task_id, exc))
task(base=MyTask)def add(x, y):raise KeyError()Để biết thêm thông tin và trạng thái của task chúng ta có thể sử dụng Task.request

app.task(bind=True)def dump_context(self, x, y):print(“Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}”.format( self.request))Celery quản lý trạng thái của tasks và có thể lưu chúng trong các hệ thống gọi là result backend. Vòng đời mặc định của task trong Celery gồm:

PENDING: task đợi được thực thi.

Xem thêm: Momo Là Gì – Những ý Nghĩa Của Momo

STARTED : task đã khởi chạySUCCESS : task đã chạy thành công xuất sắcFAILURE : task gặp lỗi sau khi khởi chạyRETRY : task đang được chạy lạiREVOKED : task được tịch thu lạiNgoài những trạng thái mặc định trên tất cả chúng ta hoàn toàn có thể tự định nghĩa thêm trạng thái và update trạng thái cho task bằng method update_state

app.task(bind=True)def upload_files(self, filenames):for i, file in enumerate(filenames): if not self.request.called_directly: self.update_state(state=”PROGRESS”, meta={“current”: i, “total”: len(filenames)})

Gọi task

Celery cung cấp các API để gọi task sau khi đã định nghĩa chúng ở trên.Celery cung ứng những API để gọi task sau khi đã định nghĩa chúng ở trên .

3 method chính:

apply_async: gửi task message.delay: gửi task messagecalling: task message sẽ không được gửi đi tới worker mà task sẽ được thực thi luôn bởi process hiện tại.apply_async : gửi task message.delay : gửi task messagecalling : task message sẽ không được gửi đi tới worker mà task sẽ được thực thi luôn bởi process hiện tại .Có một task như sau :

app.taskdef add(x, y):return x + yĐể gọi task này chúng ta sẽ thử dùng 2 method là apply_async và delay

Với delay chúng ta sẽ viết như sau:Với delay tất cả chúng ta sẽ viết như sau :# task.delay ( arg1, arg2, kwarg1 = ” x “, kwarg2 = ” y ” ) add.delay ( 10, 5 ) add.delay ( a = 10, b = 5 ) Dùng apply_async thì phải viết phức tạp hơn một chút ít # task. apply_async ( args =, kwargs = { ” kwarg1 ” : ” x “, ” kwarg2 ” : ” y ” } ) add. apply_async ( queue = ” low_priority “, args = ( 10, 5 ) ) add. apply_async ( queue = ” high_priority “, kwargs = { ” a ” : 10, ” b ” : 5 } ) Về thực chất delay và apply_async là như nhau nhưng delay đã có sẵn những thiết lập mặc định và tất cả chúng ta chỉ hoàn toàn có thể truyền vào những tham số bắt buộc đã định nghĩa trong function của task, còn với apply_async tất cả chúng ta hoàn toàn có thể truyền thêm những tham số khác như queue tất cả chúng ta muốn gửi message vào, …. Best practice là nên sử dụng apply_async để tiện việc config chạy task tùy theo nhu yếu sử dụng .Celery tương hỗ việc gọi task theo dạng chaining, hiệu quả của task này hoàn toàn có thể được truyền vào task tiếp theo

add.apply_async((2, 2), link=add.s(16)) # 20Nhờ vào cơ chế này chúng ta có thể thiết kế callback cho task như sau
app.taskdef error_handler(uuid):result = AsyncResult(uuid)exc = result.get(propagate=False)print(“Task {0} raised exception: {1!r}
{2!r}”.format( uuid, exc, result.traceback))add.apply_async((2, 2), link_error=error_handler.s())Sử dụng Celery

Cài đặt

pip install -U Celery

Sử dụng

Lựa chọn loại message broker phù hợp với dự án. Như đã nói ở trên Celery hỗ trợ 3 loại message broker là RabbitMQ, Redis, SQS. Mình sẽ đi sâu vào phân tích từng loại message broker trong phần sau về Celery.pip install – U CeleryLựa chọn loại message broker tương thích với dự án Bất Động Sản. Như đã nói ở trên Celery tương hỗ 3 loại message broker là RabbitMQ, Redis, SQS. Mình sẽ đi sâu vào nghiên cứu và phân tích từng loại message broker trong phần sau về Celery .Tạo một celery worker với task add

from celery Import Celeryapp = Celery(“name of module”, broker=”url_of_broker”)
app.taskdef add(x, y):return x + yChạy worker

USD celery – A tasks worker — loglevel = infoGọi task

Lưu kết quả

Celery có thể lưu lại trạng thái của tasks nếu chúng ta cần theo dõi tasks sau này. Với các hệ thống thực hiện task theo phương thức state machine thì việc hệ thống cần nắm được luồng trạng thái của task là vô cùng quan trọng.

Xem thêm:

Celery hoàn toàn có thể lưu lại trạng thái của tasks nếu tất cả chúng ta cần theo dõi tasks sau này. Với những mạng lưới hệ thống thực thi task theo phương pháp state machine thì việc mạng lưới hệ thống cần nắm được luồng trạng thái của task là vô cùng quan trọng. Xem thêm : Niacinamide Là Gì – Công Dụng Và Tác Dụng Trong Làm đẹpCác mạng lưới hệ thống celery dùng để lưu trạng thái task :SQLAlchemyMemcachedRedisSQLAlchemyMemcachedRedisĐể sử dụng chính sách lưu tác dụng trong Celery tất cả chúng ta khai báo celery worker có tham số backend. Ở đây mình sử dụng redis cho cả việc lưu tác dụng task lẫn làm message brokerapp = Celery ( ” tasks “, backend = ” redis : / / localhost “, broker = ” redis : / / localhost : 6379 / 0 ” )

Cấu hình Celery

Cấu hình mặc định cơ bản của celery:Cấu hình mặc định cơ bản của celery :# # Broker settings. broker_url = ” redis : / / localhost : 6379 / 0 ” # List of modules to import when the Celery worker starts.imports = ( ” myapp.tasks “, ) # # Using the database to store task state and results. result_backend = ” db + sqlite : / / / results.db ” task_annotations = { ” tasks.add ” : { ” rate_limit ” : ” 10 / s ” } } Best practice : tạo một file config riêng cho celery celeryconfig.pybroker_url = ” redis : / / localhost : 6379 / 0 : / / ” result_backend = ” rpc : / / ” task_serializer = ” json ” result_serializer = ” json ” accept_content = timezone = ” Europe / Oslo ” enable_utc = Truetask_routes = { ” tasks.add ” : ” low-priority “, } # routing một task tới queue mong muốnNgoài cách tạo file config trên ra tất cả chúng ta cũng hoàn toàn có thể config trực tiếp bằng application của Celery app.confapp.conf.update ( enable_utc = True, timezone = ” Europe / London “, ) Tổng kếtCelery không cần phải config nhiều mà chỉ cần import từ module sử dụng trực tiếp như saufrom celery Import Celeryapp = Celery ( ” name of module “, broker = ” url_of_broker ” ) Worker và client của Celery hoàn toàn có thể tự retryMột process của Celery hoàn toàn có thể giải quyết và xử lý hàng triệu task trong một phút với độ trễ chỉ vài miligiây

Celery hỗ trợ:

Message brokers:RabbitMQRedisSQSXử lý concurrencymultiprocessingmultithreadsingle threadeventlet, geventLưu trữ kết quả trên các hệ thống:AmqpRedisMemcachedSQLAlchemyAmazon S3File systemSerializationjsonyamlMessage brokers : RabbitMQRedisSQSXử lý concurrencymultiprocessingmultithreadsingle threadeventlet, geventLưu trữ tác dụng trên những mạng lưới hệ thống : AmqpRedisMemcachedSQLAlchemyAmazon S3File systemSerializationjsonyamlỞ phần sau bài viết mình sẽ đi sâu hơn về worker trong Celery và hai loại message broker mà Celery tương hỗ : SQS – Redis, đồng thời dựng một ứng dụng cơ bản sử dụng mạng lưới hệ thống này .
Chuyên mục: Chuyên mục : Hỏi Đáp

Source: https://iseo1.com
Category: Coin

Trả lời

Email của bạn sẽ không được hiển thị công khai.