>

내 주요 문제는 작업이 여전히 대기, 시작 또는 취소되었는지 알아야한다는 사실에 달려 있습니다.

결과는 redis로 표시된 24 시간 후에 삭제되므로 셀러리와 redis로이 작업을 수행 할 수 없습니다.

몇 가지 아이디어가 있지만 가장 확실한 아이디어는 데이터베이스를 추적하고 사용자가 실행중인 작업에 필요한 중요 정보를 수동으로 추가하는 것입니다.

작업을 시작하기 전에 실행할 수있는 방법이 있으며 작업을 만들거나 올바르게 철회 할 때 데이터베이스를 수동으로 사용할 수도 있습니까? 모든 작업의 ​​새 행을 만들지 않고 모든 사용자의 마지막 작업에만 관심이 있기 때문에 모든 사용자의 행을 업데이트합니다.

  • 답변 # 1

    여러 가지 접근 방식을 결합해야 할 것입니다. 백엔드에서 결과가 만료되면 (합리적 임) 작업 상태를 장기간 보관하기 위해 데이터베이스와 같은 다른 스토리지를 사용해야합니다. 시작을 위해 task_track_started 를 활성화 할 수 있습니다  작업이 STARTED 를보고하도록  작업자가 실행을 시작할 때의 상태). 그런 다음 준비 상태가 아닌 작업의 상태 업데이트 ( SUCCESS )에 대한 결과 백엔드를 정기적으로 확인하십시오. FAILURE  그리고 REVOKED ). 이들이 최종 상태 인 경우 forget() 를 사용하여 백엔드에서 결과를 제거하십시오.  방법.

    유일한 문제는 취소 된 작업에 있습니다. 사용 가능한 작업자가없는 경우 작업을 취소해도 아무런 효과가 없습니다 (따라서 취소를 호출 할 때 항상 응답을 기다려야합니다). 작업자가 바빠서 작업이 메시지 대기열에 남아 있으면 작업자는 해당 작업을 대기열에서 가져올 때 폐기해야하지만 작업자의 상태에만 저장됩니다. 일단 가져 가면 작업이 중단되고 결과에 REVOKED 가 포함됩니다.  결국 상태. 취소 된 작업은 작업자 상태에서만 유지되므로 --statedb 를 사용해야합니다.  작업자가 충돌하는 경우 상태를 유지하는 매개 변수입니다. 그렇지 않으면 이미 취소 된 작업이 동일하거나 다른 작업자에 의해 행복하게 처리됩니다.

    가장 좋은 옵션은 revoke 명령을 호출하는 것입니다. 작업자로부터 응답을 받으면 데이터베이스의 작업 내부 상태를 FLAGGED_REVOKED 와 같은 것으로 설정하십시오. . 상태 업데이트 루프에서 취소 된 작업의 상태가 PENDING 가 아닌 경우에만 업데이트합니다 .

    APScheduler를 스케줄러로 사용하고 Celery를 실행 계층으로 사용하는 간단한 작업 예약 앱이 있습니다. 작업, 작업 실행 및 일정에 대한 정보는 MongoDB에 보관됩니다. 작업을 취소하는 데 사용하는 코드는 다음과 같습니다.

    database = scheduler._jobstores['default'].collection.database
    collection = database['runs']
    run = collection.find_one({'job_id': job_id, '_id': run_id})
    if run.get('task_state') in ('PENDING', 'RECEIVED', 'STARTED', 'RETRY'):
        reply = celery.control.revoke(run['task_id'], terminate=terminate, reply=True)
        if reply:
            collection.update_one({'_id': run['_id']},
                                  {'$set': {'task_state': 'FLAGGED_REVOKED'}})
        else:
            raise Exception('Failed to revoke the task (no reply received)')
    else:
        raise Exception('Job execution cannot be canceled')
    
    

    이것은 내 상태 업데이트 코드입니다 (수초마다 실행되는 내부 APScheduler 작업으로 유지됨) :

    database = scheduler._jobstores['default'].collection.database
    collection = database['runs']
    runs = collection.find({
        'task_id': {'$exists': True},
        'task_state': {'$nin': ['SUCCESS', 'FAILURE', 'REVOKED']}
    })
    for run in runs:
        result = AsyncResult(run['task_id'],
                             backend=celery.backend, app=celery)
        if run['task_state'] == 'FLAGGED_REVOKED' and result.state == 'PENDING':
            update = {'task_state': 'FLAGGED_REVOKED'}
        else:
            update = {'task_state': result.state}
        if result.state == 'FAILURE':
            update['exception'] = str(result.result)
            update['traceback'] = result.traceback
        elif result.state == 'SUCCESS':
            update['result'] = result.result
        if result.date_done:
            date_done = dateparser.parse(result.date_done) \
                if isinstance(result.date_done, str) else result.date_done
            update['finish_time'] = date_done
        try:
            collection.update_one({'_id': run['_id']}, {'$set': update})
        except Exception as e:
            print('Failed to update task status: %s', str(e))
        else:
            if result.state in ['SUCCESS', 'FAILURE', 'REVOKED']:
                result.forget()
    
    

  • 이전 php - '비 객체'의 속성을 얻으려고합니다API의 json_decode
  • 다음 맵 박스 내비게이션 컨트롤 확대 및 축소