Source code for rdf2vecgpu.reader.kg_file_reader

from pathlib import Path
from typing import Union
import cudf
from loguru import logger
from rdflib.util import guess_format
from rdflib import Graph as rdfGraph
from tqdm.auto import tqdm
import pandas as pd

try:
    import dask
    import dask.dataframe as dd

    HAS_DASK = True
except ImportError:  # pragma: no cover - env dependent
    dask = None  # type: ignore
    dd = None  # type: ignore
    HAS_DASK = False

DataFrameLike = Union[cudf.DataFrame, "dd.DataFrame"]


[docs] class KGFileReader: def __init__( self, file_path: str, multi_gpu: bool, col_map: dict[str, str] | None = None, read_kwargs: dict | None = None, walk_weighted: bool = False, ): self.file_path = Path(file_path) self.multi_gpu = multi_gpu self.walk_weighted = walk_weighted self.file_ending = self.file_path.suffix self.col_map = col_map or { "subject": "subject", "predicate": "predicate", "object": "object", } self.read_kwargs = read_kwargs or {} if self.multi_gpu: self._ensure_dask_cudf() dask.config.set({"dataframe.backend": "cudf"}) def _ensure_dask_cudf(self) -> None: """ Checks if dask cudf can be imported """ if not HAS_DASK: raise ImportError( "multi_gpu=True requires Dask and dask.dataframe to be installed.\n" "Install the multi-GPU extras, e.g.: `pip install gpu-rdf2vec[multi-gpu]`" ) def _change_column_names(self, df: DataFrameLike) -> DataFrameLike: if self.multi_gpu: renamed_df = df.rename(columns=self.col_map) else: renamed_df = df.rename(mapper=self.col_map, axis=1) cols = ["subject", "predicate", "object"] if self.walk_weighted: if "weights" not in renamed_df.columns: raise ValueError( "walk_weighted=True but no 'weights' column found in data. " "Provide a 'weights' column or add it to col_map." ) cols.append("weights") renamed_df = renamed_df[cols] return renamed_df
[docs] def read(self) -> DataFrameLike: # Check sequence for file ending if self.file_ending == ".parquet": df = self._parquet_reader() elif self.file_ending in [".nt", ".nq"]: df = self._nt_reader() elif self.file_ending == ".csv": df = self._csv_reader() elif self.file_ending == ".orc": df = self._orc_reader() else: # Check if file format is parseable by rdflib rdf_format = guess_format(str(self.file_path)) if rdf_format is not None: df = self._rdf_lib_reader() else: logger.error( f"Parsing of file format {self.file_ending} is currently not supported." ) raise NotImplementedError( f"Parsing of file format {self.file_ending} is currently not supported." ) renamed_df = self._change_column_names(df) return renamed_df
def _parquet_reader(self) -> DataFrameLike: columns = [ self.col_map["subject"], self.col_map["predicate"], self.col_map["object"], ] if self.walk_weighted: columns.append("weights") if self.multi_gpu: kg_data = dd.read_parquet( self.file_path, columns=columns, **self.read_kwargs, ) else: kg_data = cudf.read_parquet( self.file_path, columns=columns, **self.read_kwargs, ) return kg_data def _csv_reader(self) -> DataFrameLike: if self.multi_gpu: kg_data = dd.read_csv( self.file_path, **self.read_kwargs, ) else: kg_data = cudf.read_csv( self.file_path, **self.read_kwargs, ) return kg_data def _nt_reader(self) -> DataFrameLike: if self.multi_gpu: df = dd.read_text(self.file_path).to_dataframe() df.columns = ["raw"] else: df = cudf.read_text(self.file_path, delimiter="\n").to_frame() df.columns = ["raw"] nt_pattern = r"^<([^>]+)>\s+<([^>]+)>\s+(.*)\s+\.\s*$" extracted = df["raw"].str.extract(nt_pattern) extracted.columns = ["subject", "predicate", "object"] extracted["object"] = extracted["object"].str.replace(r"^<|>$", "", regex=True) kg_data = extracted[["subject", "predicate", "object"]] return kg_data def _orc_reader(self) -> DataFrameLike: if self.multi_gpu: kg_data = dd.read_orc( self.file_path, **self.read_kwargs, ) else: kg_data = cudf.read_orc( self.file_path, **self.read_kwargs, ) return kg_data def _rdf_lib_reader(self) -> DataFrameLike: kg = rdfGraph() kg.parse(self.file_path) kg.close() edge_list = [triple for triple in tqdm(kg, desc="Parsing RDF with rdflib")] pd_edge_df = pd.DataFrame(edge_list, columns=["subject", "predicate", "object"]) if self.multi_gpu: import dask_cudf cudf_df = cudf.DataFrame.from_pandas(pd_edge_df) edge_df = dask_cudf.from_cudf(cudf_df, npartitions=1) else: edge_df = cudf.DataFrame.from_pandas(pd_edge_df) return edge_df