working addJobs

This commit is contained in:
mertalev 2025-04-30 21:54:44 -04:00
parent e773a7b7a1
commit 7bcf9aa3a7
No known key found for this signature in database
GPG key ID: DF6ABC77AAD98C95
2 changed files with 19 additions and 37 deletions
server/src

View file

@ -1,9 +1,8 @@
import { Injectable } from '@nestjs/common';
import { ModuleRef, Reflector } from '@nestjs/core';
import { ClassConstructor } from 'class-transformer';
import { AddJobsJobSpec, makeWorkerUtils, run, Runner, TaskSpec, WorkerUtils } from 'graphile-worker';
import { AddJobsJobSpec, makeWorkerUtils, run, Runner, WorkerUtils } from 'graphile-worker';
import { Kysely } from 'kysely';
import { DateTime, Duration } from 'luxon';
import { InjectKysely } from 'nestjs-kysely';
import pg, { PoolConfig } from 'pg';
import { DB } from 'src/db';
@ -188,7 +187,7 @@ export class JobRepository {
}
queue(item: JobItem): Promise<unknown> {
return this.workerUtils!.addJob(this.getQueueName(item.name), item, this.getJobOptions(item));
return this.queueAll([item]);
}
queueAll(items: JobItem[]): Promise<unknown> {
@ -278,47 +277,27 @@ export class JobRepository {
}
private getJobSpec(item: JobItem): AddJobsJobSpec {
const identifier = (this.handlers[item.name] as JobMapItem).queueName;
switch (item.name) {
case JobName.NOTIFY_ALBUM_UPDATE: {
return {
identifier: item.name,
payload: item.data,
identifier,
payload: item,
jobKey: item.data.id,
runAt: item.data?.delay ? new Date(Date.now() + item.data.delay) : undefined,
};
}
case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: {
return { identifier: item.name, payload: item.data, jobKey: QueueName.STORAGE_TEMPLATE_MIGRATION };
return { identifier, payload: item, jobKey: QueueName.STORAGE_TEMPLATE_MIGRATION };
}
case JobName.GENERATE_PERSON_THUMBNAIL: {
return { identifier: item.name, payload: item.data, priority: 1 };
return { identifier, payload: item, priority: 1 };
}
case JobName.QUEUE_FACIAL_RECOGNITION: {
return { identifier: item.name, payload: item.data, jobKey: JobName.QUEUE_FACIAL_RECOGNITION };
return { identifier, payload: item, jobKey: JobName.QUEUE_FACIAL_RECOGNITION };
}
default: {
return { identifier: item.name, payload: item.data };
}
}
}
private getJobOptions(item: JobItem): TaskSpec | undefined {
switch (item.name) {
case JobName.NOTIFY_ALBUM_UPDATE: {
let runAt: Date | undefined;
if (item.data?.delay) {
runAt = DateTime.now().plus(Duration.fromMillis(item.data.delay)).toJSDate();
}
return { jobKey: item.data.id, runAt };
}
case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: {
return { jobKey: QueueName.STORAGE_TEMPLATE_MIGRATION };
}
case JobName.GENERATE_PERSON_THUMBNAIL: {
return { priority: 1 };
}
case JobName.QUEUE_FACIAL_RECOGNITION: {
return { jobKey: JobName.QUEUE_FACIAL_RECOGNITION };
return { identifier, payload: item };
}
}
}

View file

@ -163,17 +163,20 @@ export class MetadataService extends BaseService {
async handleQueueMetadataExtraction(job: JobOf<JobName.QUEUE_METADATA_EXTRACTION>): Promise<JobStatus> {
const { force } = job;
let queue: { name: JobName.METADATA_EXTRACTION; data: { id: string } }[] = [];
for await (const asset of this.assetJobRepository.streamForMetadataExtraction(force)) {
queue.push({ name: JobName.METADATA_EXTRACTION, data: { id: asset.id } });
for (let i = 0; i < 10; i++) {
let queue: { name: JobName.METADATA_EXTRACTION; data: { id: string } }[] = [];
for await (const asset of this.assetJobRepository.streamForMetadataExtraction(force)) {
queue.push({ name: JobName.METADATA_EXTRACTION, data: { id: asset.id, source: 'upload' } as any });
if (queue.length >= JOBS_ASSET_PAGINATION_SIZE) {
await this.jobRepository.queueAll(queue);
queue = [];
if (queue.length >= JOBS_ASSET_PAGINATION_SIZE) {
await this.jobRepository.queueAll(queue);
queue = [];
}
}
await this.jobRepository.queueAll(queue);
}
await this.jobRepository.queueAll(queue);
return JobStatus.SUCCESS;
}