Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[WIP] Multinode DAG minus FT changes#42173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Closed
Show file tree
Hide file tree
Changes from1 commit
Commits
Show all changes
86 commits
Select commitHold shift + click to select a range
3044247
ip
Nov 16, 2023
8c5efd8
basic working.
Nov 17, 2023
664b07a
enhancement
Nov 17, 2023
8f6f8d2
working now.
Nov 17, 2023
12b977d
initial commit
stephanie-wangNov 29, 2023
1c935b9
Add special calls for create and put mutable objects
stephanie-wangNov 29, 2023
c2dbf1f
feature flag for shared mem seal, only acquire once per ray.get
stephanie-wangNov 30, 2023
6d4aa94
put-get
stephanie-wangNov 30, 2023
bc4f1e9
rm shared mem seal
stephanie-wangNov 30, 2023
c4a2378
fix num_readers on first version, unit tests pass now
stephanie-wangNov 30, 2023
e40d3c8
mutable object -> channel
stephanie-wangNov 30, 2023
b79b7d1
micro
stephanie-wangNov 30, 2023
5ea0fe3
support different metadata
stephanie-wangDec 1, 2023
cbe257f
better error message
stephanie-wangDec 1, 2023
a68cefd
cleanup
stephanie-wangDec 1, 2023
ea57894
Test for errors, better error handling when too many readers
stephanie-wangDec 2, 2023
5bbf379
remove unneeded
stephanie-wangDec 2, 2023
1e16e09
java build
stephanie-wangDec 2, 2023
14c3a44
Merge remote-tracking branch 'sang/dag-api' into compiled-dag
stephanie-wangDec 2, 2023
580b3ad
rename
stephanie-wangDec 2, 2023
888950a
Merge remote-tracking branch 'upstream/master' into compiled-dag
stephanie-wangDec 2, 2023
bdfbb8a
tmp
stephanie-wangDec 2, 2023
fe11cc3
test metadata change in remote reader
stephanie-wangDec 2, 2023
b6a66f2
Merge branch 'mutable-objects-2' into compiled-dag
stephanie-wangDec 2, 2023
e11b614
build
stephanie-wangDec 4, 2023
b6150a3
scatter-gather DAG works
stephanie-wangDec 5, 2023
5336262
fix
stephanie-wangDec 5, 2023
99a38c2
fix
stephanie-wangDec 5, 2023
e88c40f
fix
stephanie-wangDec 6, 2023
95e871b
compile?
stephanie-wangDec 6, 2023
204bb9b
fix
stephanie-wangDec 6, 2023
4703f34
compile?
stephanie-wangDec 6, 2023
420bd1c
build
stephanie-wangDec 6, 2023
4cabbc5
x
stephanie-wangDec 6, 2023
b44ef8a
fix
stephanie-wangDec 6, 2023
13b1d53
Merge branch 'mutable-objects-2' into compiled-dag
stephanie-wangDec 6, 2023
e54972b
unit test
stephanie-wangDec 6, 2023
881d5ff
copyright
stephanie-wangDec 6, 2023
ef2cfb7
test
stephanie-wangDec 6, 2023
ca22a63
Merge remote-tracking branch 'upstream/master' into mutable-objects-2
stephanie-wangDec 6, 2023
5fbfac5
Merge branch 'mutable-objects-2' into compiled-dag
stephanie-wangDec 6, 2023
9396810
tmp
stephanie-wangDec 6, 2023
dbbb3d6
Only allocate PlasmaObjectHeader if is_mutable=true
stephanie-wangDec 7, 2023
9078776
Only call Read/Write Acquire/Release if is_mutable=true
stephanie-wangDec 7, 2023
2e677c3
x
stephanie-wangDec 7, 2023
f06b543
cpp test
stephanie-wangDec 7, 2023
950bbb4
Merge branch 'mutable-objects-2' into compiled-dag
stephanie-wangDec 7, 2023
494cb53
Revert "tmp"
stephanie-wangDec 7, 2023
05b002f
cleanup
stephanie-wangDec 7, 2023
521c73b
Support no-OutputNode DAGs
stephanie-wangDec 7, 2023
5b58250
Support non-DAG args
stephanie-wangDec 7, 2023
b5beca4
errors
stephanie-wangDec 7, 2023
cc2e795
lint
stephanie-wangDec 7, 2023
c17c367
doc
stephanie-wangDec 7, 2023
4dfa31e
skip tests on windows
stephanie-wangDec 7, 2023
126296f
Merge remote-tracking branch 'upstream/master' into mutable-objects-2
stephanie-wangDec 7, 2023
03f4fbd
larger CI machine
stephanie-wangDec 8, 2023
b1f3f34
Merge branch 'mutable-objects-2' into compiled-dag
stephanie-wangDec 8, 2023
3e7dfa2
Merge branch 'master' into mutable-objects-2
stephanie-wangDec 8, 2023
7dde158
cleanup
stephanie-wangDec 9, 2023
63cc16d
cleanup
stephanie-wangDec 9, 2023
dca1239
perf
stephanie-wangDec 9, 2023
7b8472b
add normal DAG
stephanie-wangDec 9, 2023
d94c485
Merge remote-tracking branch 'upstream/master' into compiled-dag
stephanie-wangDec 9, 2023
740169b
x
stephanie-wangDec 9, 2023
3d5ffca
Merge branch 'mutable-objects-2' into compiled-dag
stephanie-wangDec 11, 2023
8dde781
Merge remote-tracking branch 'upstream/master' into compiled-dag
stephanie-wangDec 12, 2023
905a5bc
merge
stephanie-wangDec 12, 2023
4436b1f
revert
stephanie-wangDec 12, 2023
f105ed5
revert
stephanie-wangDec 12, 2023
00f3f1c
x
stephanie-wangDec 12, 2023
257457d
buffer size bytes
stephanie-wangDec 12, 2023
71c32ae
optional
stephanie-wangDec 13, 2023
2ba93f0
x
stephanie-wangDec 13, 2023
3a4b2f4
Merge remote-tracking branch 'upstream/master' into compiled-dag
stephanie-wangDec 13, 2023
973ba68
Merge remote-tracking branch 'upstream/master' into compiled-dag
stephanie-wangDec 13, 2023
ac5fa55
x
stephanie-wangDec 13, 2023
35a37fd
lint?
stephanie-wangDec 13, 2023
1326331
test
stephanie-wangDec 13, 2023
fadec07
API
stephanie-wangDec 13, 2023
ff19557
x
stephanie-wangDec 13, 2023
449c917
multinode channel
stephanie-wangDec 18, 2023
94cd1f7
move channel manager to raylet
stephanie-wangDec 19, 2023
8407999
it works
stephanie-wangDec 21, 2023
b84c1bd
test multiple remote readers
stephanie-wangDec 21, 2023
76bbdc2
Fix segfault in plasma client, multinode DAG works
stephanie-wangDec 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
PrevPrevious commit
NextNext commit
Test for errors, better error handling when too many readers
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
  • Loading branch information
@stephanie-wang
stephanie-wang committedDec 2, 2023
commitea57894f405c757d8faa6ed6077aecfc7c9db0cf
10 changes: 6 additions & 4 deletionspython/ray/_private/ray_perf.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -296,9 +296,9 @@ def async_actor_multi():

ray.init()

def put_channel_small(chans,num_readers=1,do_get=False, do_release=False):
def put_channel_small(chans, do_get=False, do_release=False):
for chan in chans:
chan.write(b"0", num_readers=num_readers)
chan.write(b"0")
if do_get:
chan.begin_read()
if do_release:
Expand DownExpand Up@@ -337,14 +337,14 @@ def read(self, chans):
n_cpu = multiprocessing.cpu_count() // 2
print(f"Testing multiple readers/channels, n={n_cpu}")

chans = [ray_channel.Channel(1000)]
chans = [ray_channel.Channel(1000, num_readers=n_cpu)]
readers = [ChannelReader.remote() for _ in range(n_cpu)]
ray.get([reader.ready.remote() for reader in readers])
for reader in readers:
reader.read.remote(chans)
results += timeit(
"local put:n remote get, single channel calls",
lambda: put_channel_small(chans, num_readers=n_cpu),
lambda: put_channel_small(chans),
)
for reader in readers:
ray.kill(reader)
Expand All@@ -369,6 +369,8 @@ def read(self, chans):
for reader in readers:
ray.kill(reader)

ray.shutdown()

############################
# End of channel perf tests.
############################
Expand Down
29 changes: 18 additions & 11 deletionspython/ray/experimental/channel.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -52,7 +52,7 @@ class Channel:
ray.wait.
"""

def __init__(self, buffer_size: Optional[int] = None):
def __init__(self, buffer_size: Optional[int] = None, num_readers: int = 1):
"""
Create a channel that can be read and written by co-located Ray processes.

Expand All@@ -71,19 +71,20 @@ def __init__(self, buffer_size: Optional[int] = None):
else:
self._base_ref = _create_channel_ref(buffer_size)

self.worker = ray._private.worker.global_worker
self.worker.check_connected()
self._num_readers = num_readers
self._worker = ray._private.worker.global_worker
self._worker.check_connected()

@staticmethod
def _from_base_ref(base_ref: "ray.ObjectRef") -> "Channel":
chan = Channel()
def _from_base_ref(base_ref: "ray.ObjectRef", num_readers: int) -> "Channel":
chan = Channel(num_readers=num_readers)
chan._base_ref = base_ref
return chan

def __reduce__(self):
return self._from_base_ref, (self._base_ref,)
return self._from_base_ref, (self._base_ref, self._num_readers)

def write(self, value: Any, num_readers: int):
def write(self, value: Any, num_readers:Optional[int] = None):
"""
Write a value to the channel.

Expand All@@ -96,11 +97,13 @@ def write(self, value: Any, num_readers: int):
num_readers: The number of readers that must read and release the value
before we can write again.
"""
if num_readers is None:
num_readers = self._num_readers
if num_readers <= 0:
raise ValueError("``num_readers`` must be a positive integer.")

try:
serialized_value = self.worker.get_serialization_context().serialize(value)
serialized_value = self._worker.get_serialization_context().serialize(value)
except TypeError as e:
sio = io.StringIO()
ray.util.inspect_serializability(value, print_file=sio)
Expand All@@ -111,7 +114,7 @@ def write(self, value: Any, num_readers: int):
)
raise TypeError(msg) from e

self.worker.core_worker.experimental_mutable_object_put_serialized(
self._worker.core_worker.experimental_mutable_object_put_serialized(
serialized_value,
self._base_ref,
num_readers,
Expand All@@ -122,10 +125,14 @@ def begin_read(self) -> Any:
Read the latest value from the channel. This call will block until a
value is available to read.

Subsequent calls to begin_read() will return the same value, until
end_read() is called. Then, the client must begin_read() again to get
the next value.

Returns:
Any: The deserialized value.
"""
values, _ = self.worker.get_objects(
values, _ = self._worker.get_objects(
[self._base_ref], _is_experimental_mutable_object=True
)
return values[0]
Expand All@@ -137,6 +144,6 @@ def end_read(self):
If begin_read is not called first, then this call will block until a
value is written, then drop the value.
"""
self.worker.core_worker.experimental_mutable_object_read_release(
self._worker.core_worker.experimental_mutable_object_read_release(
[self._base_ref]
)
49 changes: 49 additions & 0 deletionspython/ray/tests/test_accelerated_dag.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -21,9 +21,57 @@ def test_put_local_get(ray_start_regular):
val = i.to_bytes(8, "little")
chan.write(val, num_readers=1)
assert chan.begin_read() == val

# Begin read multiple times will return the same value.
assert chan.begin_read() == val

chan.end_read()


def test_errors(ray_start_regular):
@ray.remote
class Actor:
def make_chan(self, do_write=True):
self.chan = ray_channel.Channel(1000)
if do_write:
self.chan.write(b"hello", num_readers=1)
return self.chan

a = Actor.remote()
# Only original creator can write.
chan = ray.get(a.make_chan.remote(do_write=False))
with pytest.raises(ray.exceptions.RaySystemError):
chan.write(b"hi")

# Only original creator can write.
chan = ray.get(a.make_chan.remote(do_write=True))
assert chan.begin_read() == b"hello"
with pytest.raises(ray.exceptions.RaySystemError):
chan.write(b"hi")

# Multiple consecutive reads from the same process are fine.
chan = ray.get(a.make_chan.remote(do_write=True))
assert chan.begin_read() == b"hello"
assert chan.begin_read() == b"hello"
chan.end_read()

@ray.remote
class Reader:
def __init__(self):
pass

def read(self, chan):
return chan.begin_read()

# Multiple reads from n different processes, where n > num_readers, errors.
chan = ray.get(a.make_chan.remote(do_write=True))
readers = [Reader.remote(), Reader.remote()]
# At least 1 reader
with pytest.raises(ray.exceptions.RayTaskError) as exc_info:
ray.get([reader.read.remote(chan) for reader in readers])
assert "ray.exceptions.RaySystemError" in str(exc_info.value)


def test_put_different_meta(ray_start_regular):
chan = ray_channel.Channel(1000)

Expand All@@ -42,6 +90,7 @@ def _test(val):
_test(1000)
_test(np.random.rand(10))

# Cannot put a serialized value larger than the allocated buffer.
with pytest.raises(ValueError):
_test(np.random.rand(100))

Expand Down
44 changes: 26 additions & 18 deletionssrc/ray/object_manager/common.cc
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -56,6 +56,7 @@ void PlasmaObjectHeader::WriteAcquire(int64_t write_version,
<< ". Are you sure this is the only writer?";

version = write_version;
is_sealed = false;
data_size = write_data_size;
metadata_size = write_metadata_size;
num_readers = write_num_readers;
Expand All@@ -76,6 +77,7 @@ void PlasmaObjectHeader::WriteRelease(int64_t write_version) {
<< version << ". Are you sure this is the only writer?";

version = write_version;
is_sealed = true;
RAY_CHECK(num_readers != 0) << num_readers;
num_read_acquires_remaining = num_readers;
num_read_releases_remaining = num_readers;
Expand All@@ -87,30 +89,36 @@ void PlasmaObjectHeader::WriteRelease(int64_t write_version) {
RAY_CHECK(pthread_cond_broadcast(&cond) == 0);
}

int64_t PlasmaObjectHeader::ReadAcquire(int64_tread_version) {
RAY_LOG(DEBUG) << "ReadAcquire waiting version " <<read_version;
bool PlasmaObjectHeader::ReadAcquire(int64_tversion_to_read, int64_t *version_read) {
RAY_LOG(DEBUG) << "ReadAcquire waiting version " <<version_to_read;
RAY_CHECK(pthread_mutex_lock(&wr_mut) == 0);
RAY_LOG(DEBUG) << "ReadAcquire " <<read_version;
RAY_LOG(DEBUG) << "ReadAcquire " <<version_to_read;
PrintPlasmaObjectHeader(this);

while (version < read_version || num_read_acquires_remaining == 0) {
// Wait for the requested version (or a more recent one) to be sealed.
while (version < version_to_read || !is_sealed) {
RAY_CHECK(pthread_cond_wait(&cond, &wr_mut) == 0);
}

if (version > read_version) {
RAY_LOG(WARNING) << "Version " << version << " already exceeds version to read "
<< read_version << ". May have missed earlier reads.";
}

if (num_readers != -1) {
num_read_acquires_remaining--;
RAY_CHECK(num_read_acquires_remaining >= 0)
<< "readers acquired exceeds max readers " << num_readers;
// This object can only be read a constant number of times. Tell the caller
// which version was read.
read_version = version;
bool success = false;
if (num_readers == -1) {
// Object is a normal immutable object. Read succeeds.
*version_read = 0;
success = true;
} else {
read_version = 0;
*version_read = version;
if (version == version_to_read && num_read_acquires_remaining > 0) {
// This object is at the right version and still has reads remaining. Read
// succeeds.
num_read_acquires_remaining--;
success = true;
} else if (version > version_to_read) {
RAY_LOG(WARNING) << "Version " << version << " already exceeds version to read "
<< version_to_read;
} else {
RAY_LOG(WARNING) << "Version " << version << " already has " << num_readers
<< "readers";
}
}

RAY_LOG(DEBUG) << "ReadAcquire done";
Expand All@@ -119,7 +127,7 @@ int64_t PlasmaObjectHeader::ReadAcquire(int64_t read_version) {
RAY_CHECK(pthread_mutex_unlock(&wr_mut) == 0);
// Signal to other readers that they may read.
RAY_CHECK(pthread_cond_signal(&cond) == 0);
returnread_version;
returnsuccess;
}

void PlasmaObjectHeader::ReadRelease(int64_t read_version) {
Expand Down
39 changes: 25 additions & 14 deletionssrc/ray/object_manager/common.h
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -57,6 +57,12 @@ struct PlasmaObjectHeader {
// the first write and then should never be modified. For mutable objects,
// each new write must increment the version before releasing to readers.
int64_t version = 0;
// Indicates whether the current version has been written. is_sealed=false
// means that there is a writer who has WriteAcquire'd but not yet
// WriteRelease'd the current version. is_sealed=true means that `version`
// has been WriteRelease'd. A reader may read the actual object value if
// is_sealed=true and num_read_acquires_remaining != 0.
bool is_sealed = false;
// The total number of reads allowed before the writer can write again. This
// value should be set by the writer before releasing to readers.
// For immutable objects, this is set to -1 and infinite reads are allowed.
Expand All@@ -66,6 +72,10 @@ struct PlasmaObjectHeader {
// objects, readers must ensure this is > 0 and decrement before they read.
// Once this value reaches 0, no more readers are allowed until the writer
// writes a new version.
// NOTE(swang): Technically we do not need this because
// num_read_releases_remaining protects against too many readers. However,
// this allows us to throw an error as soon as the n+1-th reader begins,
// instead of waiting to error until the n+1-th reader is done reading.
int64_t num_read_acquires_remaining = 0;
// The number of readers who must release the current version before a new
// version can be written. For mutable objects, readers must decrement this
Expand All@@ -79,13 +89,15 @@ struct PlasmaObjectHeader {
uint64_t data_size = 0;
uint64_t metadata_size = 0;

/// Setup synchronization primitives.
void Init();

/// Destroy synchronization primitives.
void Destroy();

/// Blocks until all readers for the previous write have ReadRelease'd the value.
///Caller must ensure there is one writer at a time. Caller must pass
///consecutiveversions on each new write, starting with write_version=1.
/// Blocks until all readers for the previous write have ReadRelease'd the
///value. Protects against concurrent writers. Caller must pass consecutive
/// versions on each new write, starting with write_version=1.
///
/// \param write_version The new version for write.
/// \param data_size The new data size of the object.
Expand All@@ -96,23 +108,22 @@ struct PlasmaObjectHeader {
uint64_t metadata_size,
int64_t num_readers);

// Call after completing a write to signal that readers may read.
// num_readers should be set before calling this.
/// Call after completing a write to signal that readers may read.
/// num_readers should be set before calling this.
///
/// \param write_version The new version for write. This must match the
/// version previously passed to WriteAcquire.
void WriteRelease(int64_t write_version);

// Blocks until the given versionor a more recent versionis ready to read.
//If num_readershave already readthis version, then this call will hang.
// Blocks until the given version is ready to read. Returns false if the
//maximum number of readershave already readthe requested version.
//
// \param read_version The minimum version to wait for.
// \return The version that was read. This should be passed to ReadRelease
// when the reader is done. Returns 0 if the object is a normal immutable
// object, meaning no ReadRelease is needed.
///
/// \param read_version Read at least this version.
int64_t ReadAcquire(int64_t read_version);
// \param[in] read_version The version to read.
// \param[out] version_read For normal immutable objects, this will be set to
// 0. Otherwise, the current version.
// \return success Whether the correct version was read and there were still
// reads remaining.
bool ReadAcquire(int64_t version_to_read, int64_t *version_read);

// Finishes the read. If all reads are done, signals to the writer. This is
// not necessary to call for objects that have num_readers=-1.
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp