Source code for rdf2vecgpu.helper.functions

import cudf
import cupy
import dask
import dask.dataframe as dd
import dask_cudf
import torch
from torch.utils.dlpack import to_dlpack


def _assign_ids(partition, offset):
    """Per-partition int32 token assignment via `cupy.arange + offset`.

    Used as the per-partition body of the multi-GPU vocab build. Each partition
    of the deduplicated vocab gets a contiguous block of integer ids starting
    at `offset` (computed from the cumulative sum of preceding partition
    sizes), so token ids are globally unique and form a contiguous `[0, n)`
    range without any cross-partition shuffle.
    """
    return cudf.DataFrame(
        {
            "word": partition["word"].astype("string[pyarrow]"),
            "token": (cupy.arange(len(partition), dtype="int32") + int(offset)),
        }
    )


def _generate_vocab(
    edge_df: cudf.DataFrame, multi_gpu: bool
) -> tuple[cudf.Series, cudf.Series]:
    """Build a token ↔ string vocabulary from a triple DataFrame.

    The function flattens the three columns *(subject, predicate, object)*,
    removes duplicates, and returns two parallel cuDF -Series:

    * **tokenisation** – integer category codes (contiguous in ``[0, n)``)
    * **word** – original string values (IRIs / literals)

    When *multi_gpu* is ``True`` the computation is performed with
    dask-cuDF—useful for datasets that exceed the memory of a single GPU.
    Otherwise, a plain cuDF workflow is used.

    Parameters
    ----------
    edge_df : cudf.DataFrame
        Triple table whose columns are named ``subject``, ``predicate``,
        ``object`` and contain **strings**.
    multi_gpu : bool
        If ``True`` run the unique/count/factorise steps on a Dask-CUDA
        cluster.

    Returns
    -------
    tuple[cudf.Series, cudf.Series]
        *(tokenisation, word)*, where both Series share the same length and
        index.  The first contains ``int32`` category IDs, the second the
        corresponding strings.

    Notes
    -----
    * For the single-GPU branch, the mapping is produced with
      :py:meth:`cudf.Series.factorize`, which guarantees deterministic,
      zero-based codes.
    * The Dask branch categorises the vocabulary to ensure identical codes
      across partitions before resetting the index.
    """
    if multi_gpu:
        # Build word2idx via hash-partition + per-partition arange.
        #
        # The previous approach was `concat(s, p, o).unique()` followed by
        # `vocabulary_df.categorize(columns=["word"])`, which funnels the full
        # deduplicated vocabulary onto a single worker for `sort_values()`. On
        # Wikidata-scale graphs (~390 M unique tokens) that single-worker sort
        # exceeds 40 GB of intermediate state and either OOMs the GPU or hangs
        # the worker. The replacement is embarrassingly parallel:
        #
        #   1. concat(s, p, o) into a single `word` column (still lazy)
        #   2. shuffle by `hash(word)` so identical strings co-locate on the
        #      same partition
        #   3. drop_duplicates() locally per partition — globally unique by
        #      construction
        #   4. persist so the follow-up `sizes.compute()` and per-partition id
        #      assignment see the *same* shuffle output (without persist, the
        #      shuffle re-rolls and partition assignments drift — we observed
        #      23 M → 15 M row loss between sizes.compute and the eventual
        #      write)
        #   5. compute partition sizes (a tiny cumsum payload) → per-partition
        #      offsets
        #   6. dask.delayed(_assign_ids) per partition: `cupy.arange + offset`
        #      gives globally-unique int32 token ids without any cross-
        #      partition shuffle
        n_hash_partitions = max(1, edge_df.npartitions)
        vocabulary_df = dd.concat(
            [edge_df["subject"], edge_df["predicate"], edge_df["object"]]
        ).to_frame(name="word")
        vocabulary_df = vocabulary_df.shuffle(
            on="word",
            npartitions=n_hash_partitions,
            shuffle_method="tasks",
        )
        vocabulary_df = vocabulary_df.drop_duplicates()
        # Persist so the follow-up `sizes.compute()` and per-partition id
        # assignment see the *same* shuffle output. The subsequent `compute()`
        # on partition sizes drains the persist, so an explicit
        # `distributed.wait` is unnecessary (and would require an active
        # Client, breaking unit tests that use the default synchronous
        # scheduler).
        vocabulary_df = vocabulary_df.persist()

        sizes = vocabulary_df.map_partitions(len).compute()
        offsets = [0]
        for s in sizes[:-1]:
            offsets.append(offsets[-1] + int(s))

        word2idx_meta = cudf.DataFrame(
            {
                "word": cudf.Series([], dtype="string[pyarrow]"),
                "token": cudf.Series([], dtype="int32"),
            }
        )
        delayed_parts = [
            dask.delayed(_assign_ids)(
                vocabulary_df.get_partition(i), offsets[i]
            )
            for i in range(vocabulary_df.npartitions)
        ]
        word2idx = dask_cudf.from_delayed(delayed_parts, meta=word2idx_meta)

        # Encode the three edge columns by broadcast-joining word2idx into the
        # edge table. The previous code used three plain `.merge()` calls
        # without `broadcast=True`, which default to a hash-shuffle join; with
        # a small word2idx partition count, the 1.38 B-row edge side gets
        # funneled through one worker (we observed 6+ hours at 100 % on a
        # single GPU with no progress). `broadcast=True` replicates the small
        # word2idx to every worker, so each edge partition does a local hash
        # join — no shuffle of the large side, one hash-table build per worker
        # per merge instead of one per partition.
        w2i_s = word2idx.rename(columns={"word": "subject", "token": "s_tok"})
        w2i_p = word2idx.rename(columns={"word": "predicate", "token": "p_tok"})
        w2i_o = word2idx.rename(columns={"word": "object", "token": "o_tok"})
        edge_df = (
            edge_df
            .merge(w2i_s, on="subject", broadcast=True)
            .merge(w2i_p, on="predicate", broadcast=True)
            .merge(w2i_o, on="object", broadcast=True)
        )
        edge_df = edge_df[["s_tok", "p_tok", "o_tok"]].rename(
            columns={"s_tok": "subject", "p_tok": "predicate", "o_tok": "object"}
        )
        edge_df = edge_df.astype(
            {"subject": "int32", "predicate": "int32", "object": "int32"}
        )
        return edge_df, word2idx

    else:
        vocabulary = cudf.concat(
            [edge_df["subject"], edge_df["predicate"], edge_df["object"]],
            ignore_index=True,
        ).unique()
        tokenization, word = vocabulary.factorize()
        word2idx = cudf.concat([cudf.Series(tokenization), cudf.Series(word)], axis=1)
        word2idx.columns = ["token", "word"]
        # Build a word→token lookup map and apply to all three columns at once
        lookup = cudf.Series(
            word2idx["token"].values, index=word2idx["word"]
        )
        for col in ("subject", "predicate", "object"):
            edge_df[col] = edge_df[col].map(lookup)
        edge_df = edge_df.dropna(subset=["subject", "predicate", "object"]).astype(
            {"subject": "int32", "predicate": "int32", "object": "int32"}
        )
        return edge_df, word2idx


[docs] def cudf_to_torch_tensor(df, column_name: str): if column_name not in df.columns: raise ValueError(f"Column '{column_name}' does not exist in the DataFrame.") return torch.utils.dlpack.from_dlpack(df[column_name].to_dlpack()).contiguous()
[docs] def torch_to_cudf(torch_tensor, multi_gpu: bool): if multi_gpu: raise NotImplementedError( "Conversion from torch Tensor to cuDF DataFrame is not implemented for multi-GPU." ) else: column_major_tensor = torch_tensor.t().contiguous().t() dlpack_capsule = to_dlpack(column_major_tensor) return cudf.from_dlpack(dlpack_capsule)