Switch to TPL dataflow for subfolder scan

This commit is contained in:
Gary Wilber 2020-10-01 16:24:35 -07:00
parent c2276b17cb
commit 8f2fbf7a99
4 changed files with 106 additions and 204 deletions

View file

@ -758,7 +758,6 @@ namespace Emby.Server.Implementations
BaseItem.FileSystem = _fileSystemManager; BaseItem.FileSystem = _fileSystemManager;
BaseItem.UserDataManager = Resolve<IUserDataManager>(); BaseItem.UserDataManager = Resolve<IUserDataManager>();
BaseItem.ChannelManager = Resolve<IChannelManager>(); BaseItem.ChannelManager = Resolve<IChannelManager>();
TaskMethods.ConfigurationManager = ServerConfigurationManager;
Video.LiveTvManager = Resolve<ILiveTvManager>(); Video.LiveTvManager = Resolve<ILiveTvManager>();
Folder.UserViewManager = Resolve<IUserViewManager>(); Folder.UserViewManager = Resolve<IUserViewManager>();
UserView.TVSeriesManager = Resolve<ITVSeriesManager>(); UserView.TVSeriesManager = Resolve<ITVSeriesManager>();

View file

@ -8,6 +8,7 @@ using System.Linq;
using System.Text.Json.Serialization; using System.Text.Json.Serialization;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Jellyfin.Data.Entities; using Jellyfin.Data.Entities;
using Jellyfin.Data.Enums; using Jellyfin.Data.Enums;
using MediaBrowser.Common.Progress; using MediaBrowser.Common.Progress;
@ -35,45 +36,16 @@ namespace MediaBrowser.Controller.Entities
/// </summary> /// </summary>
public class Folder : BaseItem public class Folder : BaseItem
{ {
/// <summary> private static Lazy<SemaphoreSlim> _metadataRefreshThrottler = new Lazy<SemaphoreSlim>(() => {
/// Contains constants used when reporting scan progress. var concurrency = ConfigurationManager.Configuration.LibraryMetadataRefreshConcurrency;
/// </summary>
private static class ProgressHelpers
{
/// <summary>
/// Reported after the folders immediate children are retrieved.
/// </summary>
public const int RetrievedChildren = 5;
/// <summary> if (concurrency <= 0)
/// Reported after add, updating, or deleting child items from the LibraryManager.
/// </summary>
public const int UpdatedChildItems = 10;
/// <summary>
/// Reported once subfolders are scanned.
/// When scanning subfolders, the progress will be between [UpdatedItems, ScannedSubfolders].
/// </summary>
public const int ScannedSubfolders = 50;
/// <summary>
/// Reported once metadata is refreshed.
/// When refreshing metadata, the progress will be between [ScannedSubfolders, MetadataRefreshed].
/// </summary>
public const int RefreshedMetadata = 100;
/// <summary>
/// Gets the current progress given the previous step, next step, and progress in between.
/// </summary>
/// <param name="previousProgressStep">The previous progress step.</param>
/// <param name="nextProgressStep">The next progress step.</param>
/// <param name="currentProgress">The current progress step.</param>
/// <returns>The progress.</returns>
public static double GetProgress(int previousProgressStep, int nextProgressStep, double currentProgress)
{ {
return previousProgressStep + ((nextProgressStep - previousProgressStep) * (currentProgress / 100)); concurrency = Environment.ProcessorCount;
} }
}
return new SemaphoreSlim(concurrency);
});
public static IUserViewManager UserViewManager { get; set; } public static IUserViewManager UserViewManager { get; set; }
@ -508,19 +480,17 @@ namespace MediaBrowser.Controller.Entities
private Task RefreshMetadataRecursive(IList<BaseItem> children, MetadataRefreshOptions refreshOptions, bool recursive, IProgress<double> progress, CancellationToken cancellationToken) private Task RefreshMetadataRecursive(IList<BaseItem> children, MetadataRefreshOptions refreshOptions, bool recursive, IProgress<double> progress, CancellationToken cancellationToken)
{ {
var progressableTasks = children return RunTasks(
.Select<BaseItem, Func<IProgress<double>, Task>>(child => (baseItem, innerProgress) => RefreshChildMetadata(baseItem, refreshOptions, recursive && baseItem.IsFolder, innerProgress, cancellationToken),
innerProgress => RefreshChildMetadata(child, refreshOptions, recursive && child.IsFolder, innerProgress, cancellationToken)) children,
.ToList(); progress,
cancellationToken);
return RunTasks(progressableTasks, progress, cancellationToken);
} }
private async Task RefreshAllMetadataForContainer(IMetadataContainer container, MetadataRefreshOptions refreshOptions, IProgress<double> progress, CancellationToken cancellationToken) private async Task RefreshAllMetadataForContainer(IMetadataContainer container, MetadataRefreshOptions refreshOptions, IProgress<double> progress, CancellationToken cancellationToken)
{ {
// limit the amount of concurrent metadata refreshes // limit the amount of concurrent metadata refreshes
await TaskMethods.RunThrottled( await RunMetadataRefresh(
TaskMethods.SharedThrottleId.RefreshMetadata,
async () => async () =>
{ {
var series = container as Series; var series = container as Series;
@ -547,8 +517,7 @@ namespace MediaBrowser.Controller.Entities
if (refreshOptions.RefreshItem(child)) if (refreshOptions.RefreshItem(child))
{ {
// limit the amount of concurrent metadata refreshes // limit the amount of concurrent metadata refreshes
await TaskMethods.RunThrottled( await RunMetadataRefresh(
TaskMethods.SharedThrottleId.RefreshMetadata,
async () => await child.RefreshMetadata(refreshOptions, cancellationToken).ConfigureAwait(false), async () => await child.RefreshMetadata(refreshOptions, cancellationToken).ConfigureAwait(false),
cancellationToken).ConfigureAwait(false); cancellationToken).ConfigureAwait(false);
} }
@ -570,38 +539,33 @@ namespace MediaBrowser.Controller.Entities
/// <returns>Task.</returns> /// <returns>Task.</returns>
private Task ValidateSubFolders(IList<Folder> children, IDirectoryService directoryService, IProgress<double> progress, CancellationToken cancellationToken) private Task ValidateSubFolders(IList<Folder> children, IDirectoryService directoryService, IProgress<double> progress, CancellationToken cancellationToken)
{ {
var progressableTasks = children return RunTasks(
.Select<Folder, Func<IProgress<double>, Task>>(child => (folder, innerProgress) => folder.ValidateChildrenInternal(innerProgress, cancellationToken, true, false, null, directoryService),
innerProgress => child.ValidateChildrenInternal(innerProgress, cancellationToken, true, false, null, directoryService)) children,
.ToList(); progress,
cancellationToken);
return RunTasks(progressableTasks, progress, cancellationToken);
} }
/// <summary> /// <summary>
/// Runs a set of tasks concurrently with progress. /// Runs an action block on a list of children.
/// </summary> /// </summary>
/// <param name="tasks">A list of tasks.</param> /// <param name="task">The task to run for each child.</param>
/// <param name="children">The list of children.</param>
/// <param name="progress">The progress.</param> /// <param name="progress">The progress.</param>
/// <param name="cancellationToken">The cancellation token.</param> /// <param name="cancellationToken">The cancellation token.</param>
/// <returns>Task.</returns> /// <returns>Task.</returns>
private async Task RunTasks(IList<Func<IProgress<double>, Task>> tasks, IProgress<double> progress, CancellationToken cancellationToken) private async Task RunTasks<T>(Func<T, IProgress<double>, Task> task, IList<T> children, IProgress<double> progress, CancellationToken cancellationToken)
{ {
var childrenCount = tasks.Count; var childrenCount = children.Count;
var childrenProgress = new double[childrenCount]; var childrenProgress = new double[childrenCount];
var actions = new Func<Task>[childrenCount];
void UpdateProgress() void UpdateProgress()
{ {
progress.Report(childrenProgress.Average()); progress.Report(childrenProgress.Average());
} }
for (var i = 0; i < childrenCount; i++) var actionBlock = new ActionBlock<int>(
{ async i =>
var childIndex = i;
var child = tasks[childIndex];
actions[childIndex] = async () =>
{ {
var innerProgress = new ActionableProgress<double>(); var innerProgress = new ActionableProgress<double>();
@ -609,22 +573,33 @@ namespace MediaBrowser.Controller.Entities
{ {
// round the percent and only update progress if it changed to prevent excessive UpdateProgress calls // round the percent and only update progress if it changed to prevent excessive UpdateProgress calls
var innerPercentRounded = Math.Round(innerPercent); var innerPercentRounded = Math.Round(innerPercent);
if (childrenProgress[childIndex] != innerPercentRounded) if (childrenProgress[i] != innerPercentRounded)
{ {
childrenProgress[childIndex] = innerPercentRounded; childrenProgress[i] = innerPercentRounded;
UpdateProgress(); UpdateProgress();
} }
}); });
await tasks[childIndex](innerProgress).ConfigureAwait(false); await task(children[i], innerProgress).ConfigureAwait(false);
childrenProgress[childIndex] = 100; childrenProgress[i] = 100;
UpdateProgress(); UpdateProgress();
}; },
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = ConfigurationManager.Configuration.LibraryScanFanoutConcurrency,
CancellationToken = cancellationToken,
});
for (var i = 0; i < childrenCount; i++)
{
actionBlock.Post(i);
} }
await TaskMethods.WhenAllThrottled(TaskMethods.SharedThrottleId.ScanFanout, actions, cancellationToken).ConfigureAwait(false); actionBlock.Complete();
await actionBlock.Completion.ConfigureAwait(false);
} }
/// <summary> /// <summary>
@ -1272,6 +1247,26 @@ namespace MediaBrowser.Controller.Entities
return true; return true;
} }
/// <summary>
/// Runs multiple metadata refreshes concurrently.
/// </summary>
/// <param name="action">The action to run.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
private static async Task RunMetadataRefresh(Func<Task> action, CancellationToken cancellationToken)
{
await _metadataRefreshThrottler.Value.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await action().ConfigureAwait(false);
}
finally
{
_metadataRefreshThrottler.Value.Release();
}
}
public List<BaseItem> GetChildren(User user, bool includeLinkedChildren) public List<BaseItem> GetChildren(User user, bool includeLinkedChildren)
{ {
if (user == null) if (user == null)
@ -1819,5 +1814,45 @@ namespace MediaBrowser.Controller.Entities
} }
} }
} }
/// <summary>
/// Contains constants used when reporting scan progress.
/// </summary>
private static class ProgressHelpers
{
/// <summary>
/// Reported after the folders immediate children are retrieved.
/// </summary>
public const int RetrievedChildren = 5;
/// <summary>
/// Reported after add, updating, or deleting child items from the LibraryManager.
/// </summary>
public const int UpdatedChildItems = 10;
/// <summary>
/// Reported once subfolders are scanned.
/// When scanning subfolders, the progress will be between [UpdatedItems, ScannedSubfolders].
/// </summary>
public const int ScannedSubfolders = 50;
/// <summary>
/// Reported once metadata is refreshed.
/// When refreshing metadata, the progress will be between [ScannedSubfolders, MetadataRefreshed].
/// </summary>
public const int RefreshedMetadata = 100;
/// <summary>
/// Gets the current progress given the previous step, next step, and progress in between.
/// </summary>
/// <param name="previousProgressStep">The previous progress step.</param>
/// <param name="nextProgressStep">The next progress step.</param>
/// <param name="currentProgress">The current progress step.</param>
/// <returns>The progress.</returns>
public static double GetProgress(int previousProgressStep, int nextProgressStep, double currentProgress)
{
return previousProgressStep + ((nextProgressStep - previousProgressStep) * (currentProgress / 100));
}
}
} }
} }

View file

@ -1,133 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Controller.Configuration;
namespace MediaBrowser.Controller.Library
{
/// <summary>
/// Helper methods for running tasks concurrently.
/// </summary>
public static class TaskMethods
{
private static readonly int _processorCount = Environment.ProcessorCount;
private static readonly ConcurrentDictionary<SharedThrottleId, SemaphoreSlim> _sharedThrottlers = new ConcurrentDictionary<SharedThrottleId, SemaphoreSlim>();
/// <summary>
/// Throttle id for sharing a concurrency limit.
/// </summary>
public enum SharedThrottleId
{
/// <summary>
/// Library scan fan out
/// </summary>
ScanFanout,
/// <summary>
/// Refresh metadata
/// </summary>
RefreshMetadata,
}
/// <summary>
/// Gets or sets the configuration manager.
/// </summary>
public static IServerConfigurationManager ConfigurationManager { get; set; }
/// <summary>
/// Similiar to Task.WhenAll but only allows running a certain amount of tasks at the same time.
/// </summary>
/// <param name="throttleId">The throttle id. Multiple calls to this method with the same throttle id will share a concurrency limit.</param>
/// <param name="actions">List of actions to run.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
public static async Task WhenAllThrottled(SharedThrottleId throttleId, IEnumerable<Func<Task>> actions, CancellationToken cancellationToken)
{
var taskThrottler = throttleId == SharedThrottleId.ScanFanout ?
new SemaphoreSlim(GetConcurrencyLimit(throttleId)) :
_sharedThrottlers.GetOrAdd(throttleId, id => new SemaphoreSlim(GetConcurrencyLimit(id)));
try
{
var tasks = new List<Task>();
foreach (var action in actions)
{
await taskThrottler.WaitAsync(cancellationToken).ConfigureAwait(false);
tasks.Add(Task.Run(async () =>
{
try
{
await action().ConfigureAwait(false);
}
finally
{
taskThrottler.Release();
}
}));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
}
finally
{
if (throttleId == SharedThrottleId.ScanFanout)
{
taskThrottler.Dispose();
}
}
}
/// <summary>
/// Runs a task within a given throttler.
/// </summary>
/// <param name="throttleId">The throttle id. Multiple calls to this method with the same throttle id will share a concurrency limit.</param>
/// <param name="action">The action to run.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
public static async Task RunThrottled(SharedThrottleId throttleId, Func<Task> action, CancellationToken cancellationToken)
{
if (throttleId == SharedThrottleId.ScanFanout)
{
// just await the task instead
throw new InvalidOperationException("Invalid throttle id");
}
var taskThrottler = _sharedThrottlers.GetOrAdd(throttleId, id => new SemaphoreSlim(GetConcurrencyLimit(id)));
await taskThrottler.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await action().ConfigureAwait(false);
}
finally
{
taskThrottler.Release();
}
}
/// <summary>
/// Get the concurrency limit for the given throttle id.
/// </summary>
/// <param name="throttleId">The throttle id.</param>
/// <returns>The concurrency limit.</returns>
private static int GetConcurrencyLimit(SharedThrottleId throttleId)
{
var concurrency = throttleId == SharedThrottleId.RefreshMetadata ?
ConfigurationManager.Configuration.LibraryMetadataRefreshConcurrency :
ConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
if (concurrency <= 0)
{
concurrency = _processorCount;
}
return concurrency;
}
}
}

View file

@ -16,7 +16,8 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="3.1.8" /> <PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="3.1.8" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="3.1.8" /> <PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="3.1.8" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/> <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.1" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>