Как получить ссылку на текущую задачу?

Как я могу получить ссылку на задачу, в которой выполняется мой код?

ISomeInterface impl = new SomeImplementation();
Task.Factory.StartNew(() => impl.MethodFromSomeInterface(), new MyState());

...

void MethodFromSomeInterface()
{
    Task currentTask = Task.GetCurrentTask();    // No such method?
    MyState state = (MyState) currentTask.AsyncState();
}

Поскольку я вызываю какой-то метод интерфейса, я не могу просто передать только что созданную задачу в качестве дополнительного параметра.


person Reuven Bass    schedule 14.07.2011    source источник
comment
Можете ли вы передать его в качестве параметра конструктору SomeImplementation? Еще лучше, IMO, передать MyState конструктору и вообще не требовать знаний Task в MethodFromSomeInterface.   -  person Stephen Cleary    schedule 15.07.2011
comment
@ Стивен Клири, похоже, он не может изменить интерфейс.   -  person Filip Ekberg    schedule 15.07.2011
comment
Я не могу изменить ни интерфейс, ни реализацию. Итак, мне нужно связать экземпляр MyState с текущим экземпляром Task.   -  person Reuven Bass    schedule 15.07.2011
comment
Более того, MethodFromSomeInterface может вызываться одновременно в разных задачах.   -  person Reuven Bass    schedule 15.07.2011


Ответы (5)


Поскольку вы не можете изменить ни интерфейс, ни реализацию, вам придется сделать это самостоятельно, например, используя ThreadStaticAttribute:

static class SomeInterfaceTask
{
  [ThreadStatic]
  static Task Current { get; set; }
}

...

ISomeInterface impl = new SomeImplementation();
Task task = null;
task = Task.Factory.StartNew(() =>
{
  SomeInterfaceTask.Current = task;
  impl.MethodFromSomeInterface();
}, new MyState());

...

void MethodFromSomeInterface()
{
  Task currentTask = SomeInterfaceTask.Current;
  MyState state = (MyState) currentTask.AsyncState();
}
person Stephen Cleary    schedule 14.07.2011
comment
Это действительно потокобезопасно? На самом деле, есть ли что-нибудь кроме использования лямбда-параметра в потокобезопасности StartNew? Мне кажется, что переменная task может выйти из области видимости еще до того, как лямбда запустится. - person wilx; 29.11.2011
comment
ThreadStatic отличается от TaskStatic, если бы он существовал. Это может быть безопасно с планировщиком по умолчанию, но нет гарантии, что поток не будет повторно использоваться для нескольких задач, и в этом случае у него нет защиты, которую вы можете ожидать. Это может быть наихудший вид ошибки — выглядит хорошо, работает хорошо (обычно), а затем где-то внизу возникает трудно воспроизводимая ошибка. - person danwyand; 06.06.2013
comment
В этом коде Current всегда устанавливается (в том же потоке) непосредственно перед вызовом MethodFromSomeInterface. Не имеет значения, используется ли поток повторно для другой задачи. - person Stephen Cleary; 06.06.2013
comment
@StephenCleary, это неверно - нет блокировки вокруг устанавливаемой и считываемой переменной задачи. Используя эту модель, вы можете задать задачу в переменной Current; затем перед вызовом метода в том же потоке запускается другая задача, и та же ячейка памяти перезаписывается. Тогда первая задача прочитает неправильное значение. Если конкретный планировщик TPL не является вытесняющим, это действительно сработает; но это не безопасное предположение в данном контексте. - person danwyand; 25.06.2013
comment
Я вижу, что ты сейчас говоришь. Но пул потоков никогда этого не сделает; любая (полностью синхронная) работа, поставленная в очередь в пул потоков, после ее запуска всегда будет выполняться до завершения. Реализовать приостановку текущей задачи и вместо этого запустить другую задачу в том же потоке было бы невероятно сложно и привело бы к хуже производительности и пропускной способности, поэтому нет оснований полагать, что что пул потоков когда-либо сделает это. - person Stephen Cleary; 25.06.2013
comment
@StephenCleary Как вы сказали, после запуска задачи она всегда будет выполняться до завершения в указанном потоке. Но в модели асинхронного/ожидания задача, кажется, выдаст текущий поток, когда встретится с операцией блока (ключевое слово await), и продолжится на SynchronizationContext. Таким образом, когда задача уступает текущему потоку, а следующая задача подбирает поток и устанавливает SomeInterfaceTask.Current, тогда предыдущая задача возобновляется в своем начальном потоке (или другом потоке), SomeInterfaceTask.Current будет изменен. - person JasonMing; 17.08.2014
comment
Существует состояние гонки между установкой объекта задачи в task = Task.Factory.StartNew... и SomeInterfaceTask.Current = task;... - person thab; 01.10.2015
comment
@thab Можете ли вы предоставить образец состояния гонки :-)? - person Legends; 30.08.2016
comment
@danwyand Можете ли вы предоставить образец состояния гонки? - person Legends; 30.08.2016
comment
Представьте, что у вас есть класс со статической переменной и метод экземпляра, который выводит эту переменную. Очевидно, что Bad.STATIC = "message"; new Bad().printMessage(); проблематичен при работе с несколькими потоками. Это делает то же самое — атрибут [ThreadStatic] не помогает, так как несколько задач могут выполняться в одном и том же потоке. На самом деле это не состояние гонки - просто небезопасное/неправильное использование статики. - person danwyand; 30.08.2016
comment
Я понимаю, что вы имеете в виду, но в этом случае я не получаю условия гонки: вот скрипта. Скопируйте и выполните локально. - person Legends; 30.08.2016
comment
На самом деле, я считаю, что состояние гонки возникает между назначением переменной задачи в task = Task.Factory.StartNew( и копированием переменной задачи в SomeInterfaceTask.Current = task;. Нет ничего, чтобы гарантировать порядок, который я вижу... Если бы вы начали его в два этапа, он бы работал (например, var task = new Task(() => { SomeInterfaceTask.Current = task; }); task.Start(Task.Factory.Scheduler);) - person thab; 30.08.2016

Вот «хакерский» класс, который можно использовать для этого.
Просто используйте свойство CurrentTask, чтобы получить текущую запущенную задачу.
Я настоятельно не рекомендую использовать его где-либо рядом с рабочим кодом!

public static class TaskGetter
{
    private static string _propertyName;
    private static Type _taskType;
    private static PropertyInfo _property;
    private static Func<Task> _getter;

    static TaskGetter()
    {
        _taskType = typeof(Task);
        _propertyName = "InternalCurrent";
        SetupGetter();
    }

    public static void SetPropertyName(string newName)
    {
        _propertyName = newName;
        SetupGetter();
    }

    public static Task CurrentTask
    {
        get
        {
            return _getter();
        }
    }

    private static void SetupGetter()
    {
        _getter = () => null;
        _property = _taskType.GetProperties(BindingFlags.Static | BindingFlags.NonPublic).Where(p => p.Name == _propertyName).FirstOrDefault();
        if (_property != null)
        {
            _getter = () =>
            {
                var val = _property.GetValue(null);
                return val == null ? null : (Task)val;
            };
        }
    }
}
person hofi    schedule 27.10.2015
comment
Это хороший вид взлома, и он рано сломается (разрешение свойства), так что это не так уж плохо. AsyncState (и случайные хранилища данных, не имеющие никакого отношения к пространствам имен в целом) никому не следует использовать, но если бы это было так, я бы с удовольствием использовал это в продакшене. Пальцы вверх. - person Seth; 29.06.2018

Если вы можете использовать .NET 4.6 или более позднюю версию, .NET Standard или .NET Core, они решили эту проблему с помощью AsyncLocal. https://docs.microsoft.com/en-gb/dotnet/api/system.threading.asynclocal-1?view=netframework-4.7.1

Если нет, вам нужно настроить хранилище данных до его использования и получить к нему доступ через замыкание, а не через поток или задачу. ConcurrentDictionary поможет скрыть любые ошибки, которые вы допускаете при этом.

Когда код ожидает, текущая задача освобождает поток, то есть потоки не связаны с задачами, по крайней мере, в модели программирования.

Демо:

// I feel like demo code about threading needs to guarantee
// it actually has some in the first place :)
// The second number is IOCompletionPorts which would be relevant
// if we were using IO (strangely enough).
var threads = Environment.ProcessorCount * 4;
ThreadPool.SetMaxThreads(threads, threads);
ThreadPool.SetMinThreads(threads, threads);

var rand = new Random(DateTime.Now.Millisecond);

var tasks = Enumerable.Range(0, 50)
    .Select(_ =>
    {
        // State store tied to task by being created in the same closure.
        var taskState = new ConcurrentDictionary<string, object>();
        // There is absolutely no need for this to be a thread-safe
        // data structure in this instance but given the copy-pasta,
        // I thought I'd save people some trouble.

        return Task.Run(async () =>
        {
            taskState["ThreadId"] = Thread.CurrentThread.ManagedThreadId;
            await Task.Delay(rand.Next() % 100);
            return Thread.CurrentThread.ManagedThreadId == (int)taskState["ThreadId"];
        });
    })
    .ToArray();

Task.WaitAll(tasks);
Console.WriteLine("Tasks that stayed on the same thread: " + tasks.Count(t => t.Result));
Console.WriteLine("Tasks that didn't stay on the same thread: " + tasks.Count(t => !t.Result));
person Seth    schedule 29.06.2018

В следующем примере показано, как этого можно добиться, решая проблему с помощью ответа, предоставленного @stephen-cleary. Это немного запутанно, но, по сути, ключ находится в классе TaskContext, который использует CallContext.LogicalSetData, CallContext.LogicalGetData и CallContext. .FreeNamedDataSlot, которые полезны для создания собственных контекстов задач. Остальная часть пуха должна ответить на вопрос ОП:

class Program
{
    static void Main(string[] args)
    {
        var t1 = Task.Factory.StartNewWithContext(async () => { await DoSomething(); });
        var t2 = Task.Factory.StartNewWithContext(async () => { await DoSomething(); });

        Task.WaitAll(t1, t2);
    }

    private static async Task DoSomething()
    {
        var id1 = TaskContext.Current.Task.Id;
        Console.WriteLine(id1);
        await Task.Delay(1000);

        var id2 = TaskContext.Current.Task.Id;
        Console.WriteLine(id2);
        Console.WriteLine(id1 == id2);
    }
}

public static class TaskFactoryExtensions
{
    public static Task StartNewWithContext(this TaskFactory factory, Action action)
    {
        Task task = null;

        task = new Task(() =>
        {
            Debug.Assert(TaskContext.Current == null);
            TaskContext.Current = new TaskContext(task);
            try
            {
                action();
            }
            finally
            {
                TaskContext.Current = null;
            }
        });

        task.Start();

        return task;
    }

    public static Task StartNewWithContext(this TaskFactory factory, Func<Task> action)
    {
        Task<Task> task = null;

        task = new Task<Task>(async () =>
        {
            Debug.Assert(TaskContext.Current == null);
            TaskContext.Current = new TaskContext(task);
            try
            {
                await action();
            }
            finally
            {
                TaskContext.Current = null;
            }
        });

        task.Start();

        return task.Unwrap();
    }
}

public sealed class TaskContext
{
    // Use your own unique key for better performance
    private static readonly string contextKey = Guid.NewGuid().ToString();

    public TaskContext(Task task)
    {
        this.Task = task;
    }

    public Task Task { get; private set; }

    public static TaskContext Current
    {
        get { return (TaskContext)CallContext.LogicalGetData(contextKey); }
        internal set
        {
            if (value == null)
            {
                CallContext.FreeNamedDataSlot(contextKey);
            }
            else
            {
                CallContext.LogicalSetData(contextKey, value);
            }
        }
    }
}
person Ananke    schedule 08.09.2015

Если бы вы могли изменить интерфейс (что не было для меня ограничением, когда у меня была похожая проблема), мне казалось, что Lazy<Task> можно было бы использовать для решения этой проблемы. Так что я попробовал это.

Это работает, по крайней мере, для того, что я хочу иметь в виду под «текущей задачей». Но это тонкий код, потому что AsyncMethodThatYouWantToRun должен делать Task.Yield().

Если вы не уступите, произойдет сбой с System.AggregateException: 'One or more errors occurred. (ValueFactory attempted to access the Value property of this instance.)'

Lazy<Task> eventuallyATask = null; // silly errors about uninitialized variables :-/
eventuallyATask = new Lazy<Task>(
    () => AsyncMethodThatYouWantToRun(eventuallyATask));

Task t = eventuallyATask.Value; // actually start the task!

async Task AsyncMethodThatYouWantToRun(Lazy<Task> lazyThisTask)
{
    await Task.Yield(); // or else, the 'task' object won't finish being created!

    Task thisTask = lazyThisTask.Value;
    Console.WriteLine("you win! Your task got a reference to itself");
}

t.Wait();

В качестве альтернативы вместо тонкости Task.Yield мы могли бы просто пройти весь путь до конца и использовать TaskCompletionSource<Task> для ее решения. (исключая любые потенциальные ошибки/взаимоблокировки, поскольку наша задача безопасно освобождает поток, пока он не узнает себя!)

    var eventuallyTheTask = new TaskCompletionSource<Task>();
    Task t = AsyncMethodThatYouWantToRun(eventuallyTheTask.Task); // start the task!
    eventuallyTheTask.SetResult(t); //unblock the task and give it self-knowledge

    async Task AsyncMethodThatYouWantToRun(Task<Task> thisTaskAsync)
    {
        Task thisTask = await thisTaskAsync; // gets this task :)
        Console.WriteLine("you win! Your task got a reference to itself (== 't')");
    }

    t.Wait();
person Tim Lovell-Smith    schedule 06.12.2019