diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 1eb0a83f75..9dff30976e 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -1,9 +1,8 @@ import { Injectable } from '@nestjs/common'; import { ModuleRef, Reflector } from '@nestjs/core'; import { ClassConstructor } from 'class-transformer'; -import { AddJobsJobSpec, makeWorkerUtils, run, Runner, TaskSpec, WorkerUtils } from 'graphile-worker'; +import { AddJobsJobSpec, makeWorkerUtils, run, Runner, WorkerUtils } from 'graphile-worker'; import { Kysely } from 'kysely'; -import { DateTime, Duration } from 'luxon'; import { InjectKysely } from 'nestjs-kysely'; import pg, { PoolConfig } from 'pg'; import { DB } from 'src/db'; @@ -188,7 +187,7 @@ export class JobRepository { } queue(item: JobItem): Promise<unknown> { - return this.workerUtils!.addJob(this.getQueueName(item.name), item, this.getJobOptions(item)); + return this.queueAll([item]); } queueAll(items: JobItem[]): Promise<unknown> { @@ -278,47 +277,27 @@ export class JobRepository { } private getJobSpec(item: JobItem): AddJobsJobSpec { + const identifier = (this.handlers[item.name] as JobMapItem).queueName; switch (item.name) { case JobName.NOTIFY_ALBUM_UPDATE: { return { - identifier: item.name, - payload: item.data, + identifier, + payload: item, jobKey: item.data.id, runAt: item.data?.delay ? new Date(Date.now() + item.data.delay) : undefined, }; } case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: { - return { identifier: item.name, payload: item.data, jobKey: QueueName.STORAGE_TEMPLATE_MIGRATION }; + return { identifier, payload: item, jobKey: QueueName.STORAGE_TEMPLATE_MIGRATION }; } case JobName.GENERATE_PERSON_THUMBNAIL: { - return { identifier: item.name, payload: item.data, priority: 1 }; + return { identifier, payload: item, priority: 1 }; } case JobName.QUEUE_FACIAL_RECOGNITION: { - return { identifier: item.name, payload: item.data, jobKey: JobName.QUEUE_FACIAL_RECOGNITION }; + return { identifier, payload: item, jobKey: JobName.QUEUE_FACIAL_RECOGNITION }; } default: { - return { identifier: item.name, payload: item.data }; - } - } - } - - private getJobOptions(item: JobItem): TaskSpec | undefined { - switch (item.name) { - case JobName.NOTIFY_ALBUM_UPDATE: { - let runAt: Date | undefined; - if (item.data?.delay) { - runAt = DateTime.now().plus(Duration.fromMillis(item.data.delay)).toJSDate(); - } - return { jobKey: item.data.id, runAt }; - } - case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: { - return { jobKey: QueueName.STORAGE_TEMPLATE_MIGRATION }; - } - case JobName.GENERATE_PERSON_THUMBNAIL: { - return { priority: 1 }; - } - case JobName.QUEUE_FACIAL_RECOGNITION: { - return { jobKey: JobName.QUEUE_FACIAL_RECOGNITION }; + return { identifier, payload: item }; } } } diff --git a/server/src/services/metadata.service.ts b/server/src/services/metadata.service.ts index 6c2d69a1e7..aaff7c4850 100644 --- a/server/src/services/metadata.service.ts +++ b/server/src/services/metadata.service.ts @@ -163,17 +163,20 @@ export class MetadataService extends BaseService { async handleQueueMetadataExtraction(job: JobOf<JobName.QUEUE_METADATA_EXTRACTION>): Promise<JobStatus> { const { force } = job; - let queue: { name: JobName.METADATA_EXTRACTION; data: { id: string } }[] = []; - for await (const asset of this.assetJobRepository.streamForMetadataExtraction(force)) { - queue.push({ name: JobName.METADATA_EXTRACTION, data: { id: asset.id } }); + for (let i = 0; i < 10; i++) { + let queue: { name: JobName.METADATA_EXTRACTION; data: { id: string } }[] = []; + for await (const asset of this.assetJobRepository.streamForMetadataExtraction(force)) { + queue.push({ name: JobName.METADATA_EXTRACTION, data: { id: asset.id, source: 'upload' } as any }); - if (queue.length >= JOBS_ASSET_PAGINATION_SIZE) { - await this.jobRepository.queueAll(queue); - queue = []; + if (queue.length >= JOBS_ASSET_PAGINATION_SIZE) { + await this.jobRepository.queueAll(queue); + queue = []; + } } + + await this.jobRepository.queueAll(queue); } - await this.jobRepository.queueAll(queue); return JobStatus.SUCCESS; }