Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/uct/cuda/base/cuda_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ uct_cuda_base_query_devices_common(
uct_tl_device_resource_t **tl_devices_p, unsigned *num_tl_devices_p)
{
ucs_sys_device_t sys_device = UCS_SYS_DEVICE_ID_UNKNOWN;
ucs_sys_bus_id_t bus_id;
CUdevice cuda_device;

if (cuCtxGetDevice(&cuda_device) == CUDA_SUCCESS) {
uct_cuda_base_get_sys_dev(cuda_device, &sys_device);
uct_cuda_base_get_sys_dev(cuda_device, &sys_device, &bus_id);
}

return uct_single_device_resource(md, UCT_CUDA_DEV_NAME, dev_type,
Expand Down
3 changes: 2 additions & 1 deletion src/uct/cuda/base/cuda_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ uct_cuda_base_query_devices(

ucs_status_t
uct_cuda_base_get_sys_dev(CUdevice cuda_device,
ucs_sys_device_t *sys_dev_p);
ucs_sys_device_t *sys_dev_p,
ucs_sys_bus_id_t *bus_id);

#endif
51 changes: 33 additions & 18 deletions src/uct/cuda/base/cuda_md.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <ucs/sys/module.h>
#include <ucs/sys/string.h>
#include <ucs/sys/topo/base/topo.h>
#include <ucs/memory/memtype_cache.h>
#include <ucs/type/spinlock.h>
#include <ucs/profile/profile.h>
Expand All @@ -21,16 +22,14 @@
#include <cuda_runtime.h>
#include <cuda.h>

#define UCT_CUDA_DEV_NAME_MAX_LEN 64
#define UCT_CUDA_MAX_DEVICES 32

ucs_spinlock_t uct_cuda_base_lock;
uct_cuda_base_sys_dev_map_t uct_cuda_sys_dev_bus_id_map;


ucs_status_t uct_cuda_base_get_sys_dev(CUdevice cuda_device,
ucs_sys_device_t *sys_dev_p)
ucs_sys_device_t *sys_dev_p,
ucs_sys_bus_id_t *bus_id)
{
ucs_sys_bus_id_t bus_id;
CUresult cu_err;
int attrib;

Expand All @@ -40,28 +39,28 @@ ucs_status_t uct_cuda_base_get_sys_dev(CUdevice cuda_device,
if (cu_err != CUDA_SUCCESS) {
return UCS_ERR_IO_ERROR;
}
bus_id.domain = (uint16_t)attrib;
bus_id->domain = (uint16_t)attrib;

/* PCI bus id */
cu_err = cuDeviceGetAttribute(&attrib, CU_DEVICE_ATTRIBUTE_PCI_BUS_ID,
cuda_device);
if (cu_err != CUDA_SUCCESS) {
return UCS_ERR_IO_ERROR;
}
bus_id.bus = (uint8_t)attrib;
bus_id->bus = (uint8_t)attrib;

/* PCI slot id */
cu_err = cuDeviceGetAttribute(&attrib, CU_DEVICE_ATTRIBUTE_PCI_DEVICE_ID,
cuda_device);
if (cu_err != CUDA_SUCCESS) {
return UCS_ERR_IO_ERROR;
}
bus_id.slot = (uint8_t)attrib;
bus_id->slot = (uint8_t)attrib;

/* Function - always 0 */
bus_id.function = 0;
bus_id->function = 0;

return ucs_topo_find_device_by_bus_id(&bus_id, sys_dev_p);
return ucs_topo_find_device_by_bus_id(bus_id, sys_dev_p);
}

static size_t
Expand Down Expand Up @@ -121,6 +120,7 @@ uct_cuda_base_query_attributes(uct_cuda_copy_md_t *md, const void *address,
ucs_status_t status;
size_t total_bytes;
CUresult cu_err;
ucs_sys_bus_id_t bus_id;

attr_type[0] = CU_POINTER_ATTRIBUTE_MEMORY_TYPE;
attr_data[0] = &cuda_mem_mype;
Expand All @@ -136,7 +136,8 @@ uct_cuda_base_query_attributes(uct_cuda_copy_md_t *md, const void *address,
return UCS_ERR_INVALID_ADDR;
}

status = uct_cuda_base_get_sys_dev(cuda_device, &mem_info->sys_dev);
status = uct_cuda_base_get_sys_dev(cuda_device, &mem_info->sys_dev,
&bus_id);
if (status != UCS_OK) {
return status;
}
Expand Down Expand Up @@ -280,28 +281,42 @@ uct_cuda_base_query_md_resources(uct_component_t *component,
ucs_status_t status;
char device_name[10];
int num_gpus;
ucs_sys_bus_id_t bus_id;

cudaErr = cudaGetDeviceCount(&num_gpus);
if ((cudaErr != cudaSuccess) || (num_gpus == 0)) {
return uct_md_query_empty_md_resource(resources_p, num_resources_p);
}

for (cuda_device = 0; cuda_device < num_gpus; ++cuda_device) {
status = uct_cuda_base_get_sys_dev(cuda_device, &sys_dev);
if (status == UCS_OK) {
ucs_snprintf_safe(device_name, sizeof(device_name), "GPU%d",
cuda_device);
status = ucs_topo_sys_device_set_name(sys_dev, device_name);
ucs_assert_always(status == UCS_OK);
ucs_spin_lock(&uct_cuda_base_lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it needed to protect uct_cuda_sys_dev_bus_id_map? If yes, when can it be accessed concurrently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this logic is called as part of md_query functionality and because we access global structure uct_cuda_sys_dev_bus_id_map, I used spin_lock around the access. Is this unnecessary? Is it guaranteed that query_md_resources will be called by only thread at any given time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we support multi-threading for UCP only, and all UCP API calls (including progress) are protected by a global lock.
Imo, there is not need for lock here then.


if (uct_cuda_sys_dev_bus_id_map.count == 0) {
uct_cuda_sys_dev_bus_id_map.count = (uint8_t)num_gpus;

for (cuda_device = 0; cuda_device < num_gpus; ++cuda_device) {
status = uct_cuda_base_get_sys_dev(cuda_device, &sys_dev, &bus_id);
if (status == UCS_OK) {
ucs_snprintf_safe(device_name, sizeof(device_name), "GPU%d",
cuda_device);
status = ucs_topo_sys_device_set_name(sys_dev, device_name);
ucs_assert_always(status == UCS_OK);

/* populate sys_dev -> bus_id map */
uct_cuda_sys_dev_bus_id_map.sys_dev[cuda_device] = sys_dev;
uct_cuda_sys_dev_bus_id_map.bus_id[cuda_device] = bus_id.bus;
}
}
}

ucs_spin_unlock(&uct_cuda_base_lock);

return uct_md_query_single_md_resource(component, resources_p,
num_resources_p);
}

UCS_STATIC_INIT {
ucs_spinlock_init(&uct_cuda_base_lock, 0);
uct_cuda_sys_dev_bus_id_map.count = 0;
}

UCS_STATIC_CLEANUP {
Expand Down
15 changes: 15 additions & 0 deletions src/uct/cuda/base/cuda_md.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@

#include <uct/base/uct_md.h>

#define UCT_CUDA_DEV_NAME_MAX_LEN 64
#define UCT_CUDA_MAX_DEVICES 16


typedef struct uct_cuda_base_sys_dev_map {
pid_t pid;
uint8_t count;
ucs_sys_device_t sys_dev[UCT_CUDA_MAX_DEVICES];
uint8_t bus_id[UCT_CUDA_MAX_DEVICES];
} uct_cuda_base_sys_dev_map_t;


extern uct_cuda_base_sys_dev_map_t uct_cuda_sys_dev_bus_id_map;


ucs_status_t uct_cuda_base_detect_memory_type(uct_md_h md, const void *address,
size_t length,
ucs_memory_type_t *mem_type_p);
Expand Down
34 changes: 31 additions & 3 deletions src/uct/cuda/cuda_ipc/cuda_ipc_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,41 @@

static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_ep_t, const uct_ep_params_t *params)
{
uct_cuda_ipc_iface_t *iface = ucs_derived_of(params->iface,
uct_cuda_ipc_iface_t);
uct_cuda_ipc_iface_t *iface = ucs_derived_of(params->iface,
uct_cuda_ipc_iface_t);
uct_cuda_base_sys_dev_map_t *remote = (uct_cuda_base_sys_dev_map_t*)
params->iface_addr;
uct_cuda_base_sys_dev_map_t *hash;
khiter_t khiter;
int khret;
int i;

ucs_recursive_spin_lock(&iface->rem_iface_addr_lock);

khiter = kh_put(cuda_ipc_rem_iface_addr, &iface->rem_iface_addr_hash,
remote->pid, &khret);
if ((khret == UCS_KH_PUT_BUCKET_EMPTY) ||
(khret == UCS_KH_PUT_BUCKET_CLEAR)) {
hash = &kh_val(&iface->rem_iface_addr_hash, khiter);
hash->count = remote->count;
hash->pid = remote->pid;

for (i = 0; i < remote->count; i++) {
hash->sys_dev[i] = remote->sys_dev[i];
hash->bus_id[i] = remote->bus_id[i];
ucs_trace("peer pid %ld sys_dev %u bus_id %u",
(long)hash->pid, (unsigned)hash->sys_dev[i],
(unsigned)hash->bus_id[i]);
}
} else if (khret != UCS_KH_PUT_KEY_PRESENT) {
ucs_error("unable to use cuda_ipc remote_iface_addr hash");
}
ucs_recursive_spin_unlock(&iface->rem_iface_addr_lock);

UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(params);
UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super);

self->remote_pid = *(const pid_t*)params->iface_addr;
self->remote_pid = remote->pid;

return uct_ep_keepalive_init(&self->keepalive, self->remote_pid);
}
Expand Down
26 changes: 22 additions & 4 deletions src/uct/cuda/cuda_ipc/cuda_ipc_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cuda_ipc_ep.h"

#include <uct/cuda/base/cuda_iface.h>
#include <uct/cuda/base/cuda_md.h>
#include <ucs/type/class.h>
#include <ucs/sys/string.h>
#include <ucs/debug/assert.h>
Expand Down Expand Up @@ -74,18 +75,29 @@ ucs_status_t uct_cuda_ipc_iface_get_device_address(uct_iface_t *tl_iface,
static ucs_status_t uct_cuda_ipc_iface_get_address(uct_iface_h tl_iface,
uct_iface_addr_t *iface_addr)
{
*(pid_t*)iface_addr = getpid();
uct_cuda_base_sys_dev_map_t *local = (uct_cuda_base_sys_dev_map_t*)iface_addr;
int i;

local->pid = getpid();
local->count = uct_cuda_sys_dev_bus_id_map.count;

for (i = 0; i < local->count; i++) {
local->sys_dev[i] = uct_cuda_sys_dev_bus_id_map.sys_dev[i];
local->bus_id[i] = uct_cuda_sys_dev_bus_id_map.bus_id[i];
}

return UCS_OK;
}

static int uct_cuda_ipc_iface_is_reachable(const uct_iface_h tl_iface,
const uct_device_addr_t *dev_addr,
const uct_iface_addr_t *iface_addr)
{
uct_cuda_ipc_iface_t *iface = ucs_derived_of(tl_iface, uct_cuda_ipc_iface_t);
uct_cuda_ipc_iface_t *iface = ucs_derived_of(tl_iface, uct_cuda_ipc_iface_t);

return ((uct_cuda_ipc_iface_node_guid(&iface->super) ==
*((const uint64_t *)dev_addr)) && ((getpid() != *(pid_t *)iface_addr)));
*((const uint64_t *)dev_addr)) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will extra checks be added later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No further checks here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reason to update hash in this function? How it is it going to be used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brminich updated the PR description as well. Lmk if it makes sense.

UCT perf_estimate API takes two system devices (local, remote) to provide bandwidth/latency estimates. In the case of cuda_ipc, local GPU's sys_dev and remote GPU's sys_dev. But remote GPU's sys_dev may not match the same bus_id on the local process because of the order in which sys_devices are populated. For this reason, we need a way to interpret remote sys_dev and translate to local sys_dev and use in cuda_ipc perf estimate to check if there are nvlinks between the devices or not. Iface address exchange phase seemed like the most convenient place to add this logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean uct_iface_estimate_perf API, right? So, we implicitly assume that iface_is_reachable for the corresponding iface (created on needed device id) has to be called before this perf estimate routine? If yes, wouldn't it be better to cache device id in ep creation routine rather than during reachability check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brminich addressed this

((getpid() != ((uct_cuda_base_sys_dev_map_t*)iface_addr)->pid)));
}

static double uct_cuda_ipc_iface_get_bw()
Expand Down Expand Up @@ -198,7 +210,7 @@ static ucs_status_t uct_cuda_ipc_iface_query(uct_iface_h tl_iface,

uct_base_iface_query(&iface->super, iface_attr);

iface_attr->iface_addr_len = sizeof(pid_t);
iface_attr->iface_addr_len = sizeof(uct_cuda_base_sys_dev_map_t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we define the length according to the real number of gpus? (to avoid unnecessary address increase)

Copy link
Contributor Author

@Akshay-Venkatesh Akshay-Venkatesh Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brminich Kept it this way to handle the case when different processes within the same node use different number of devices in CUDA_VISIBLE_DEVICES. For example, if process 0 uses CUDA_VISIBLE_DEVICES=0 and process 1 uses CUDA_VISIBLE_DEVICES=1,2 then the instances of cuda_ipc iface for the two processes will have different iface_addr_len. I wasn't sure if iface address lengths have to be consistent across processes during iface_addr exchange.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it is ok to have different address lengths, but it will not work with unified mode (which is disabled by default)
@yosefe, wdyt?

iface_attr->device_addr_len = sizeof(uint64_t);
iface_attr->ep_addr_len = 0;
iface_attr->max_conn_priv = 0;
Expand Down Expand Up @@ -530,6 +542,9 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_iface_t, uct_md_h md, uct_worker_h worke
self->cuda_context = 0;
ucs_queue_head_init(&self->outstanding_d2d_event_q);

ucs_recursive_spinlock_init(&self->rem_iface_addr_lock, 0);
kh_init_inplace(cuda_ipc_rem_iface_addr, &self->rem_iface_addr_hash);

return UCS_OK;
}

Expand Down Expand Up @@ -560,6 +575,9 @@ static UCS_CLASS_CLEANUP_FUNC(uct_cuda_ipc_iface_t)
if (self->eventfd != -1) {
close(self->eventfd);
}

kh_destroy_inplace(cuda_ipc_rem_iface_addr, &self->rem_iface_addr_hash);
ucs_recursive_spinlock_destroy(&self->rem_iface_addr_lock);
}

ucs_status_t
Expand Down
26 changes: 25 additions & 1 deletion src/uct/cuda/cuda_ipc/cuda_ipc_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
#define UCT_CUDA_IPC_IFACE_H

#include <uct/base/uct_iface.h>
#include <uct/cuda/base/cuda_md.h>
#include <ucs/arch/cpu.h>
#include <cuda_runtime.h>
#include <cuda.h>
#include <ucs/datastruct/khash.h>

#include "cuda_ipc_md.h"
#include "cuda_ipc_ep.h"
Expand All @@ -19,6 +21,26 @@

#define UCT_CUDA_IPC_MAX_PEERS 16


static UCS_F_ALWAYS_INLINE int
uct_cuda_ipc_iface_addr_hash_equal(pid_t pid1, pid_t pid2)
{
return (pid1 == pid2);
}


static UCS_F_ALWAYS_INLINE khint32_t
uct_cuda_ipc_iface_addr_hash_func(pid_t pid)
{
return kh_int_hash_func(pid << 8);
}


KHASH_INIT(cuda_ipc_rem_iface_addr, pid_t,
uct_cuda_base_sys_dev_map_t, 1, uct_cuda_ipc_iface_addr_hash_func,
uct_cuda_ipc_iface_addr_hash_equal);


typedef struct uct_cuda_ipc_iface {
uct_base_iface_t super;
ucs_mpool_t event_desc; /* cuda event desc */
Expand All @@ -30,6 +52,9 @@ typedef struct uct_cuda_ipc_iface {
/* per-peer stream */
unsigned long stream_refcount[UCT_CUDA_IPC_MAX_PEERS];
/* per stream outstanding ops */
ucs_recursive_spinlock_t rem_iface_addr_lock; /* protects remote sys_dev map */
khash_t(cuda_ipc_rem_iface_addr) rem_iface_addr_hash;

struct {
unsigned max_poll; /* query attempts w.o success */
unsigned max_streams; /* # concurrent streams for || progress*/
Expand Down Expand Up @@ -61,6 +86,5 @@ typedef struct uct_cuda_ipc_event_desc {
pid_t pid;
} uct_cuda_ipc_event_desc_t;


ucs_status_t uct_cuda_ipc_iface_init_streams(uct_cuda_ipc_iface_t *iface);
#endif