Skip to content

Graph Based RecSys

GraphBasedRS(algorithm, graph)

Bases: RecSys

Class for recommender systems which use a graph in order to make predictions

Every GBRS differ from each other based the algorithm used.

Examples:

In case you perform a splitting of the dataset which returns a single train and test set (e.g. HoldOut technique):

Single split train
from clayrs import recsys as rs
from clayrs import content_analyzer as ca

original_rat = ca.Ratings(ca.CSVFile(ratings_path))

[train], [test] = rs.HoldOutPartitioning().split_all(original_rat)

alg = rs.NXPageRank()  # any gb algorithm

graph = rs.NXBipartiteGraph(original_rat)

# remove from the graph interaction of the test set
for user, item in zip(test.user_id_column, test.item_id_column):
    user_node = rs.UserNode(user)
    item_node = rs.ItemNode(item)

    graph.remove_link(user_node, item_node)

gbrs = rs.GraphBasedRS(alg, graph)

rank = gbrs.rank(test, n_recs=10)

In case you perform a splitting of the dataset which returns a multiple train and test sets (KFold technique):

Multiple split train
from clayrs import recsys as rs
from clayrs import content_analyzer as ca

original_rat = ca.Ratings(ca.CSVFile(ratings_path))

train_list, test_list = rs.KFoldPartitioning(n_splits=5).split_all(original_rat)

alg = rs.NXPageRank()  # any gb algorithm

for train_set, test_set in zip(train_list, test_list):

    graph = rs.NXBipartiteGraph(original_rat)

    # remove from the graph interaction of the test set
    for user, item in zip(test_set.user_id_column, test_set.item_id_column):
        user_node = rs.UserNode(user)
        item_node = rs.ItemNode(item)

        graph.remove_link(user_node, item_node)

    gbrs = rs.GraphBasedRS(alg, graph)
    rank_to_append = gbrs.rank(test_set)

    result_list.append(rank_to_append)

result_list will contain recommendation lists for each split

PARAMETER DESCRIPTION
algorithm

the graph based algorithm that will be used in order to rank or make score prediction

TYPE: GraphBasedAlgorithm

graph

A graph which models interactions of users and items

TYPE: FullDiGraph

Source code in clayrs/recsys/recsys.py
556
557
558
559
560
561
def __init__(self,
             algorithm: GraphBasedAlgorithm,
             graph: FullDiGraph):

    self.__graph = graph
    super().__init__(algorithm)

algorithm: GraphBasedAlgorithm property

The graph based algorithm chosen

graph: FullDiGraph property

The graph containing interactions

users: Set[UserNode] property

Set of UserNode objects for each user of the graph

predict(test_set, user_list=None, methodology=TestRatingsMethodology(), num_cpus=1)

Method used to calculate score predictions for all users in test set or all users in user_list parameter. The user_list parameter could contain users with their string id or with their mapped integer

BE CAREFUL: not all algorithms are able to perform score prediction

Via the methodology parameter you can perform different candidate item selection. By default, the TestRatingsMethodology() is used: so for each user items in its test set only will be considered for score prediction

If the algorithm couldn't perform score prediction for some users, they will be skipped and a warning message is printed showing the number of users for which the alg couldn't produce a score prediction

PARAMETER DESCRIPTION
test_set

Ratings object which represents the ground truth of the split considered

TYPE: Ratings

user_list

List of users for which you want to compute score prediction. If None, the ranking will be computed for all users of the test_set. The list should contain user id as strings or user ids mapped to their integers

TYPE: List[str] DEFAULT: None

methodology

Methodology object which governs the candidate item selection. Default is TestRatingsMethodology. If None, AllItemsMethodology() will be used

TYPE: Union[Methodology, None] DEFAULT: TestRatingsMethodology()

num_cpus

number of processors that must be reserved for the method. If set to 0, all cpus available will be used. Be careful though: multiprocessing in python has a substantial memory overhead!

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
Prediction

Prediction object containing score prediction lists for all users of the test set or for all users in user_list

Source code in clayrs/recsys/recsys.py
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
def predict(self, test_set: Ratings, user_list: List[str] = None,
            methodology: Union[Methodology, None] = TestRatingsMethodology(),
            num_cpus: int = 1) -> Prediction:
    """
    Method used to calculate score predictions for all users in test set or all users in `user_list` parameter.
    The `user_list` parameter could contain users with their string id or with their mapped integer

    **BE CAREFUL**: not all algorithms are able to perform *score prediction*

    Via the `methodology` parameter you can perform different candidate item selection. By default, the
    `TestRatingsMethodology()` is used: so for each user items in its test set only will be considered for score
    prediction

    If the algorithm couldn't perform score prediction for some users, they will be skipped and a warning message is
    printed showing the number of users for which the alg couldn't produce a score prediction

    Args:
        test_set: Ratings object which represents the ground truth of the split considered
        user_list: List of users for which you want to compute score prediction. If None, the ranking
            will be computed for all users of the `test_set`. The list should contain user id as strings or user ids
            mapped to their integers
        methodology: `Methodology` object which governs the candidate item selection. Default is
            `TestRatingsMethodology`. If None, AllItemsMethodology() will be used
        num_cpus: number of processors that must be reserved for the method. If set to `0`, all cpus available will
            be used. Be careful though: multiprocessing in python has a substantial memory overhead!

    Returns:
        Prediction object containing score prediction lists for all users of the test set or for all users in
            `user_list`
    """

    train_set = self.graph.to_ratings(user_map=test_set.user_map, item_map=test_set.item_map)

    logger.info("Don't worry if it looks stuck at first")
    logger.info("First iterations will stabilize the estimated remaining time")

    # in the graph recsys, each graph algorithm works with strings,
    # so in case we should convert int to strings
    all_users = test_set.unique_user_id_column
    if user_list is not None:
        all_users = np.array(user_list)
        if np.issubdtype(all_users.dtype, int):
            all_users = train_set.user_map.convert_seq_int2str(all_users)

    all_users = set(all_users)

    if methodology is None:
        methodology = AllItemsMethodology()

    methodology.setup(train_set, test_set)

    pred = self.algorithm.predict(self.graph, train_set, test_set, all_users, methodology, num_cpus)
    # we should remove empty uir matrices otherwise vstack won't work due to dimensions mismatch
    pred = [uir_pred for uir_pred in pred if len(uir_pred) != 0]

    # can't vstack when pred is empty
    if len(pred) == 0:
        pred = Prediction.from_uir(np.array([]), user_map=test_set.user_map, item_map=test_set.item_map)
        return pred

    pred = np.vstack(pred)
    pred_users_idx = train_set.user_map.convert_seq_str2int(pred[:, 0])
    pred_items_idx = train_set.item_map.convert_seq_str2int([item_node.value for item_node in pred[:, 1]])
    pred[:, 0] = pred_users_idx
    pred[:, 1] = pred_items_idx
    pred = pred.astype(np.float64)
    pred = Prediction.from_uir(pred, user_map=test_set.user_map, item_map=test_set.item_map)

    self._yaml_report = {'graph': repr(self.graph), 'mode': 'score_prediction', 'methodology': repr(methodology)}

    return pred

rank(test_set, n_recs=10, user_list=None, methodology=TestRatingsMethodology(), num_cpus=1)

Method used to calculate ranking for all users in test set or all users in user_list parameter. The user_list parameter could contain users with their string id or with their mapped integer

If the n_recs is specified, then the rank will contain the top-n items for the users. Otherwise, the rank will contain all unrated items of the particular users. By default the top-10 ranking is computed for each user

Via the methodology parameter you can perform different candidate item selection. By default, the TestRatingsMethodology() is used: so, for each user, items in its test set only will be ranked

If the algorithm couldn't produce a ranking for some users, they will be skipped and a warning message is printed showing the number of users for which the alg couldn't produce a ranking

PARAMETER DESCRIPTION
test_set

Ratings object which represents the ground truth of the split considered

TYPE: Ratings

n_recs

Number of the top items that will be present in the ranking of each user. If None all candidate items will be returned for the user. Default is 10 (top-10 for each user will be computed)

TYPE: int DEFAULT: 10

user_list

List of users for which you want to compute score prediction. If None, the ranking will be computed for all users of the test_set. The list should contain user id as strings or user ids mapped to their integers

TYPE: List[str] DEFAULT: None

methodology

Methodology object which governs the candidate item selection. Default is TestRatingsMethodology. If None, AllItemsMethodology() will be used

TYPE: Union[Methodology, None] DEFAULT: TestRatingsMethodology()

num_cpus

number of processors that must be reserved for the method. If set to 0, all cpus available will be used. Be careful though: multiprocessing in python has a substantial memory overhead!

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
Rank

Rank object containing recommendation lists for all users of the test set or for all users in user_list

Source code in clayrs/recsys/recsys.py
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
def rank(self, test_set: Ratings, n_recs: int = 10, user_list: List[str] = None,
         methodology: Union[Methodology, None] = TestRatingsMethodology(),
         num_cpus: int = 1) -> Rank:
    """
    Method used to calculate ranking for all users in test set or all users in `user_list` parameter.
    The `user_list` parameter could contain users with their string id or with their mapped integer

    If the `n_recs` is specified, then the rank will contain the top-n items for the users.
    Otherwise, the rank will contain all unrated items of the particular users.
    By default the ***top-10*** ranking is computed for each user

    Via the `methodology` parameter you can perform different candidate item selection. By default, the
    `TestRatingsMethodology()` is used: so, for each user, items in its test set only will be ranked

    If the algorithm couldn't produce a ranking for some users, they will be skipped and a warning message is
    printed showing the number of users for which the alg couldn't produce a ranking

    Args:
        test_set: Ratings object which represents the ground truth of the split considered
        n_recs: Number of the top items that will be present in the ranking of each user.
            If `None` all candidate items will be returned for the user. Default is 10 (top-10 for each user
            will be computed)
        user_list: List of users for which you want to compute score prediction. If None, the ranking
            will be computed for all users of the `test_set`. The list should contain user id as strings or user ids
            mapped to their integers
        methodology: `Methodology` object which governs the candidate item selection. Default is
            `TestRatingsMethodology`. If None, AllItemsMethodology() will be used
        num_cpus: number of processors that must be reserved for the method. If set to `0`, all cpus available will
            be used. Be careful though: multiprocessing in python has a substantial memory overhead!

    Returns:
        Rank object containing recommendation lists for all users of the test set or for all users in `user_list`
    """

    train_set = self.graph.to_ratings(user_map=test_set.user_map, item_map=test_set.item_map)

    logger.info("Don't worry if it looks stuck at first")
    logger.info("First iterations will stabilize the estimated remaining time")

    # in the graph recsys, each graph algorithm works with strings,
    # so in case we should convert int to strings
    all_users = test_set.unique_user_id_column
    if user_list is not None:
        all_users = np.array(user_list)
        if np.issubdtype(all_users.dtype, int):
            all_users = train_set.user_map.convert_seq_int2str(all_users)

    all_users = set(all_users)

    if methodology is None:
        methodology = AllItemsMethodology()

    methodology.setup(train_set, test_set)

    rank = self.algorithm.rank(self.graph, train_set, test_set, all_users, n_recs, methodology, num_cpus)
    # we should remove empty uir matrices otherwise vstack won't work due to dimensions mismatch
    rank = [uir_rank for uir_rank in rank if len(uir_rank) != 0]

    # can't vstack when rank is empty
    if len(rank) == 0:
        rank = Rank.from_uir(np.array([]), user_map=test_set.user_map, item_map=test_set.item_map)
        return rank

    rank = np.vstack(rank)

    # convert back strings and Nodes object to ints
    rank_users_idx = train_set.user_map.convert_seq_str2int(rank[:, 0])
    rank_items_idx = train_set.item_map.convert_seq_str2int([item_node.value for item_node in rank[:, 1]])
    rank[:, 0] = rank_users_idx
    rank[:, 1] = rank_items_idx
    rank = rank.astype(np.float64)

    rank = Rank.from_uir(rank, user_map=test_set.user_map, item_map=test_set.item_map)

    if len(rank) == 0:
        logger.warning("No items could be ranked for any users! Remember that items to rank must be present "
                       "in the graph.\n"
                       "Try changing methodology!")

    elif len(rank.unique_user_id_column) != len(all_users):
        logger.warning(f"No items could be ranked for users {all_users - set(rank.user_id_column)}\n"
                       f"No nodes to rank for them found in the graph. Try changing methodology! ")

    self._yaml_report = {'graph': repr(self.graph), 'mode': 'rank', 'n_recs': repr(n_recs),
                         'methodology': repr(methodology)}

    return rank