Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[Offload] Make olLaunchKernel test thread safe#149497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
RossBrunton wants to merge3 commits intollvm:main
base:main
Choose a base branch
Loading
fromRossBrunton:launchthreadsafe

Conversation

RossBrunton
Copy link
Contributor

This sprinkles a few mutexes around the plugin interface so that the
olLaunchKernel CTS test now passes when ran on multiple threads.

Part of this also involved changing the interface for device synchronise
so that it can optionally not free the underlying queue (which
introduced a race condition in liboffload).

This sprinkles a few mutexes around the plugin interface so that theolLaunchKernel CTS test now passes when ran on multiple threads.Part of this also involved changing the interface for device synchroniseso that it can optionally not free the underlying queue (whichintroduced a race condition in liboffload).
@llvmbot
Copy link
Member

llvmbot commentedJul 18, 2025
edited
Loading

@llvm/pr-subscribers-offload

@llvm/pr-subscribers-backend-amdgpu

Author: Ross Brunton (RossBrunton)

Changes

This sprinkles a few mutexes around the plugin interface so that the
olLaunchKernel CTS test now passes when ran on multiple threads.

Part of this also involved changing the interface for device synchronise
so that it can optionally not free the underlying queue (which
introduced a race condition in liboffload).


Full diff:https://github.com/llvm/llvm-project/pull/149497.diff

9 Files Affected:

  • (modified) offload/include/Shared/APITypes.h (+4)
  • (modified) offload/liboffload/src/OffloadImpl.cpp (+1-7)
  • (modified) offload/plugins-nextgen/amdgpu/src/rtl.cpp (+11-3)
  • (modified) offload/plugins-nextgen/common/include/PluginInterface.h (+6-2)
  • (modified) offload/plugins-nextgen/common/src/PluginInterface.cpp (+5-2)
  • (modified) offload/plugins-nextgen/cuda/src/rtl.cpp (+11-4)
  • (modified) offload/plugins-nextgen/host/src/rtl.cpp (+2-1)
  • (modified) offload/unittests/OffloadAPI/common/Fixtures.hpp (+18)
  • (modified) offload/unittests/OffloadAPI/kernel/olLaunchKernel.cpp (+23)
diff --git a/offload/include/Shared/APITypes.h b/offload/include/Shared/APITypes.hindex 978b53d5d69b9..a988edce481e6 100644--- a/offload/include/Shared/APITypes.h+++ b/offload/include/Shared/APITypes.h@@ -21,6 +21,7 @@  #include <cstddef> #include <cstdint>+#include <mutex>  extern "C" {@@ -75,6 +76,9 @@ struct __tgt_async_info {   /// should be freed after finalization.   llvm::SmallVector<void *, 2> AssociatedAllocations;+  /// Mutex to guard access to AssociatedAllocations+  std::mutex AllocationsMutex;+   /// The kernel launch environment used to issue a kernel. Stored here to   /// ensure it is a valid location while the transfer to the device is   /// happening.diff --git a/offload/liboffload/src/OffloadImpl.cpp b/offload/liboffload/src/OffloadImpl.cppindex ffc9016bca0a3..d0dced8be7a61 100644--- a/offload/liboffload/src/OffloadImpl.cpp+++ b/offload/liboffload/src/OffloadImpl.cpp@@ -487,16 +487,10 @@ Error olWaitQueue_impl(ol_queue_handle_t Queue) {   // Host plugin doesn't have a queue set so it's not safe to call synchronize   // on it, but we have nothing to synchronize in that situation anyway.   if (Queue->AsyncInfo->Queue) {-    if (auto Err = Queue->Device->Device->synchronize(Queue->AsyncInfo))+    if (auto Err = Queue->Device->Device->synchronize(Queue->AsyncInfo, false))       return Err;   }-  // Recreate the stream resource so the queue can be reused-  // TODO: Would be easier for the synchronization to (optionally) not release-  // it to begin with.-  if (auto Res = Queue->Device->Device->initAsyncInfo(&Queue->AsyncInfo))-    return Res;-   return Error::success(); }diff --git a/offload/plugins-nextgen/amdgpu/src/rtl.cpp b/offload/plugins-nextgen/amdgpu/src/rtl.cppindex b2fd950c9d500..509f6c03e21fe 100644--- a/offload/plugins-nextgen/amdgpu/src/rtl.cpp+++ b/offload/plugins-nextgen/amdgpu/src/rtl.cpp@@ -2227,6 +2227,7 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {   /// Get the stream of the asynchronous info structure or get a new one.   Error getStream(AsyncInfoWrapperTy &AsyncInfoWrapper,                   AMDGPUStreamTy *&Stream) {+    std::lock_guard<std::mutex> StreamLock{StreamMutex};     // Get the stream (if any) from the async info.     Stream = AsyncInfoWrapper.getQueueAs<AMDGPUStreamTy *>();     if (!Stream) {@@ -2291,7 +2292,8 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {   }    /// Synchronize current thread with the pending operations on the async info.-  Error synchronizeImpl(__tgt_async_info &AsyncInfo) override {+  Error synchronizeImpl(__tgt_async_info &AsyncInfo,+                        bool RemoveQueue) override {     AMDGPUStreamTy *Stream =         reinterpret_cast<AMDGPUStreamTy *>(AsyncInfo.Queue);     assert(Stream && "Invalid stream");@@ -2302,8 +2304,11 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {     // Once the stream is synchronized, return it to stream pool and reset     // AsyncInfo. This is to make sure the synchronization only works for its     // own tasks.-    AsyncInfo.Queue = nullptr;-    return AMDGPUStreamManager.returnResource(Stream);+    if (RemoveQueue) {+      AsyncInfo.Queue = nullptr;+      return AMDGPUStreamManager.returnResource(Stream);+    }+    return Plugin::success();   }    /// Query for the completion of the pending operations on the async info.@@ -3013,6 +3018,9 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {   /// True is the system is configured with XNACK-Enabled.   /// False otherwise.   bool IsXnackEnabled = false;++  /// Mutex to guard getting/setting the stream+  std::mutex StreamMutex; };  Error AMDGPUDeviceImageTy::loadExecutable(const AMDGPUDeviceTy &Device) {diff --git a/offload/plugins-nextgen/common/include/PluginInterface.h b/offload/plugins-nextgen/common/include/PluginInterface.hindex 162b149ab483e..5fd34a9236f83 100644--- a/offload/plugins-nextgen/common/include/PluginInterface.h+++ b/offload/plugins-nextgen/common/include/PluginInterface.h@@ -104,6 +104,7 @@ struct AsyncInfoWrapperTy {   /// Register \p Ptr as an associated allocation that is freed after   /// finalization.   void freeAllocationAfterSynchronization(void *Ptr) {+    std::lock_guard<std::mutex> AllocationGuard{AsyncInfoPtr->AllocationsMutex};     AsyncInfoPtr->AssociatedAllocations.push_back(Ptr);   }@@ -772,8 +773,9 @@ struct GenericDeviceTy : public DeviceAllocatorTy {    /// Synchronize the current thread with the pending operations on the   /// __tgt_async_info structure.-  Error synchronize(__tgt_async_info *AsyncInfo);-  virtual Error synchronizeImpl(__tgt_async_info &AsyncInfo) = 0;+  Error synchronize(__tgt_async_info *AsyncInfo, bool RemoveQueue = true);+  virtual Error synchronizeImpl(__tgt_async_info &AsyncInfo,+                                bool RemoveQueue) = 0;    /// Invokes any global constructors on the device if present and is required   /// by the target.@@ -1501,6 +1503,8 @@ template <typename ResourceRef> class GenericDeviceResourceManagerTy {   /// Deinitialize the resource pool and delete all resources. This function   /// must be called before the destructor.   virtual Error deinit() {+    const std::lock_guard<std::mutex> Lock(Mutex);+     if (NextAvailable)       DP("Missing %d resources to be returned\n", NextAvailable);diff --git a/offload/plugins-nextgen/common/src/PluginInterface.cpp b/offload/plugins-nextgen/common/src/PluginInterface.cppindex 81b9d423e13d8..4844f88229fb2 100644--- a/offload/plugins-nextgen/common/src/PluginInterface.cpp+++ b/offload/plugins-nextgen/common/src/PluginInterface.cpp@@ -1329,12 +1329,15 @@ Error PinnedAllocationMapTy::unlockUnmappedHostBuffer(void *HstPtr) {   return eraseEntry(*Entry); }-Error GenericDeviceTy::synchronize(__tgt_async_info *AsyncInfo) {+Error GenericDeviceTy::synchronize(__tgt_async_info *AsyncInfo,+                                   bool RemoveQueue) {+  std::lock_guard<std::mutex> AllocationGuard{AsyncInfo->AllocationsMutex};+   if (!AsyncInfo || !AsyncInfo->Queue)     return Plugin::error(ErrorCode::INVALID_ARGUMENT,                          "invalid async info queue");-  if (auto Err = synchronizeImpl(*AsyncInfo))+  if (auto Err = synchronizeImpl(*AsyncInfo, RemoveQueue))     return Err;    for (auto *Ptr : AsyncInfo->AssociatedAllocations)diff --git a/offload/plugins-nextgen/cuda/src/rtl.cpp b/offload/plugins-nextgen/cuda/src/rtl.cppindex b787376eb1770..f637379c5b29d 100644--- a/offload/plugins-nextgen/cuda/src/rtl.cpp+++ b/offload/plugins-nextgen/cuda/src/rtl.cpp@@ -522,6 +522,7 @@ struct CUDADeviceTy : public GenericDeviceTy {    /// Get the stream of the asynchronous info structure or get a new one.   Error getStream(AsyncInfoWrapperTy &AsyncInfoWrapper, CUstream &Stream) {+    std::lock_guard<std::mutex> StreamLock{StreamMutex};     // Get the stream (if any) from the async info.     Stream = AsyncInfoWrapper.getQueueAs<CUstream>();     if (!Stream) {@@ -642,7 +643,8 @@ struct CUDADeviceTy : public GenericDeviceTy {   }    /// Synchronize current thread with the pending operations on the async info.-  Error synchronizeImpl(__tgt_async_info &AsyncInfo) override {+  Error synchronizeImpl(__tgt_async_info &AsyncInfo,+                        bool RemoveQueue) override {     CUstream Stream = reinterpret_cast<CUstream>(AsyncInfo.Queue);     CUresult Res;     Res = cuStreamSynchronize(Stream);@@ -650,9 +652,11 @@ struct CUDADeviceTy : public GenericDeviceTy {     // Once the stream is synchronized, return it to stream pool and reset     // AsyncInfo. This is to make sure the synchronization only works for its     // own tasks.-    AsyncInfo.Queue = nullptr;-    if (auto Err = CUDAStreamManager.returnResource(Stream))-      return Err;+    if (RemoveQueue) {+      AsyncInfo.Queue = nullptr;+      if (auto Err = CUDAStreamManager.returnResource(Stream))+        return Err;+    }      return Plugin::check(Res, "error in cuStreamSynchronize: %s");   }@@ -1281,6 +1285,9 @@ struct CUDADeviceTy : public GenericDeviceTy {   /// The maximum number of warps that can be resident on all the SMs   /// simultaneously.   uint32_t HardwareParallelism = 0;++  /// Mutex to guard getting/setting the stream+  std::mutex StreamMutex; };  Error CUDAKernelTy::launchImpl(GenericDeviceTy &GenericDevice,diff --git a/offload/plugins-nextgen/host/src/rtl.cpp b/offload/plugins-nextgen/host/src/rtl.cppindex d950572265b4c..725a37c280248 100644--- a/offload/plugins-nextgen/host/src/rtl.cpp+++ b/offload/plugins-nextgen/host/src/rtl.cpp@@ -297,7 +297,8 @@ struct GenELF64DeviceTy : public GenericDeviceTy {    /// All functions are already synchronous. No need to do anything on this   /// synchronization function.-  Error synchronizeImpl(__tgt_async_info &AsyncInfo) override {+  Error synchronizeImpl(__tgt_async_info &AsyncInfo,+                        bool RemoveQueue) override {     return Plugin::success();   }diff --git a/offload/unittests/OffloadAPI/common/Fixtures.hpp b/offload/unittests/OffloadAPI/common/Fixtures.hppindex 546921164f691..4fe57bd80d704 100644--- a/offload/unittests/OffloadAPI/common/Fixtures.hpp+++ b/offload/unittests/OffloadAPI/common/Fixtures.hpp@@ -9,6 +9,7 @@ #include <OffloadAPI.h> #include <OffloadPrint.hpp> #include <gtest/gtest.h>+#include <thread>  #include "Environment.hpp"@@ -57,6 +58,23 @@ inline std::string SanitizeString(const std::string &Str) {   return NewStr; }+template <typename Fn> inline void threadify(Fn body) {+  std::vector<std::thread> Threads;+  for (size_t I = 0; I < 20; I++) {+    Threads.emplace_back(+        [&body](size_t I) {+          std::string ScopeMsg{"Thread #"};+          ScopeMsg.append(std::to_string(I));+          SCOPED_TRACE(ScopeMsg);+          body(I);+        },+        I);+  }+  for (auto &T : Threads) {+    T.join();+  }+}+ struct OffloadTest : ::testing::Test {   ol_device_handle_t Host = TestEnvironment::getHostDevice(); };diff --git a/offload/unittests/OffloadAPI/kernel/olLaunchKernel.cpp b/offload/unittests/OffloadAPI/kernel/olLaunchKernel.cppindex e7e608f2a64d4..3e128d1e84645 100644--- a/offload/unittests/OffloadAPI/kernel/olLaunchKernel.cpp+++ b/offload/unittests/OffloadAPI/kernel/olLaunchKernel.cpp@@ -104,6 +104,29 @@ TEST_P(olLaunchKernelFooTest, Success) {   ASSERT_SUCCESS(olMemFree(Mem)); }+TEST_P(olLaunchKernelFooTest, SuccessThreaded) {+  threadify([&](size_t) {+    void *Mem;+    ASSERT_SUCCESS(olMemAlloc(Device, OL_ALLOC_TYPE_MANAGED,+                              LaunchArgs.GroupSize.x * sizeof(uint32_t), &Mem));+    struct {+      void *Mem;+    } Args{Mem};++    ASSERT_SUCCESS(olLaunchKernel(Queue, Device, Kernel, &Args, sizeof(Args),+                                  &LaunchArgs, nullptr));++    ASSERT_SUCCESS(olWaitQueue(Queue));++    uint32_t *Data = (uint32_t *)Mem;+    for (uint32_t i = 0; i < 64; i++) {+      ASSERT_EQ(Data[i], i);+    }++    ASSERT_SUCCESS(olMemFree(Mem));+  });+}+ TEST_P(olLaunchKernelNoArgsTest, Success) {   ASSERT_SUCCESS(       olLaunchKernel(Queue, Device, Kernel, nullptr, 0, &LaunchArgs, nullptr));

@@ -104,6 +104,29 @@ TEST_P(olLaunchKernelFooTest, Success) {
ASSERT_SUCCESS(olMemFree(Mem));
}

TEST_P(olLaunchKernelFooTest, SuccessThreaded) {
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I'd love to be able to add anOFFLOAD_TEST_THREADED_P macro so that you'd get threaded and non-threaded tests "for free" without copy-pasting the test body. But I can't think of a good way of actually implementing that with gtest, anyone have any ideas?

@RossBruntonRossBrunton requested review fromcallumfare andjhuber6 and removed request forcallumfareJuly 18, 2025 11:32
@RossBruntonRossBrunton marked this pull request as draftJuly 18, 2025 12:39
@RossBruntonRossBrunton marked this pull request as ready for reviewJuly 18, 2025 13:10
@@ -2227,6 +2227,7 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {
/// Get the stream of the asynchronous info structure or get a new one.
Error getStream(AsyncInfoWrapperTy &AsyncInfoWrapper,
AMDGPUStreamTy *&Stream) {
std::lock_guard<std::mutex> StreamLock{StreamMutex};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Do we only need this when we create a new one?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Multiple threads can callgetStream, see that the stream doesn't exist and create a new one. This can result in multiple streams being created in error.

@@ -2302,8 +2304,11 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {
// Once the stream is synchronized, return it to stream pool and reset
// AsyncInfo. This is to make sure the synchronization only works for its
// own tasks.
AsyncInfo.Queue = nullptr;
return AMDGPUStreamManager.returnResource(Stream);
if (RemoveQueue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Why do we now need a conditional for this? It's supposed to consume it.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Liboffload contains this:

ErrorolWaitQueue_impl(ol_queue_handle_t Queue) {// Host plugin doesn't have a queue set so it's not safe to call synchronize// on it, but we have nothing to synchronize in that situation anyway.if (Queue->AsyncInfo->Queue) {if (auto Err = Queue->Device->Device->synchronize(Queue->AsyncInfo,false))return Err;  }// Recreate the stream resource so the queue can be reused// TODO: Would be easier for the synchronization to (optionally) not release// it to begin with.if (auto Res = Queue->Device->Device->initAsyncInfo(&Queue->AsyncInfo))return Res;returnError::success();}

This has to be done atomically so that, for example, we don't try to synchronise an absent queue. I could add a mutex tool_queue_impl_t, but I figured it'd be better to just implement what the comment says. Specifically, we avoid dropping the AsyncInfo just to immediately recreate it right after.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I thought the whole point of the resource managers we used was to make acquiring / releasing resources cheap.@kevinsala was the one to implement this originally so I'll see if he knows the proper approach here.

Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@jhuber6jhuber6jhuber6 left review comments

@callumfarecallumfareAwaiting requested review from callumfare

Assignees
No one assigned
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

3 participants
@RossBrunton@llvmbot@jhuber6

[8]ページ先頭

©2009-2025 Movatter.jp