Whamcloud - gitweb
LU-8130 nrs: convert NRS ORR/TRR to rhashtable 13/40113/24
authorJames Simmons <jsimmons@infradead.org>
Tue, 25 Jul 2023 18:14:03 +0000 (14:14 -0400)
committerOleg Drokin <green@whamcloud.com>
Sat, 19 Aug 2023 05:35:30 +0000 (05:35 +0000)
Move away from the cfs hash implementation to rhashtable
for NRS ORR handling. Since rhashtable is lockless it
should also increase performance. For the NRS TRR handling
we can use Xarray since its based on OST index which are
sequential which will give better performance than an hashtable.
TRR entries are added to the Xarray but not removed until the
Xarray is destroyed.

Test-Parameters: testlist=sanityn env=ONLY=77c+77d,ONLY_REPEAT=100
Change-Id: I5206a7586d4b4c8991b7163fd9253f017e6d3969
Signed-off-by: James Simmons <jsimmons@infradead.org>
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/40113
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Neil Brown <neilb@suse.de>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-by: Etienne AUJAMES <eaujames@ddn.com>
lustre/include/lustre_compat.h
lustre/include/lustre_nrs_orr.h
lustre/ptlrpc/nrs_orr.c

index 3040cc2..34042d5 100644 (file)
 #include <linux/slab.h>
 #include <linux/security.h>
 #include <libcfs/linux/linux-fs.h>
+#ifdef HAVE_XARRAY_SUPPORT
+#include <linux/xarray.h>
+#else
+#include <libcfs/linux/xarray.h>
+#endif
 #include <obd_support.h>
 
 #ifdef HAVE_4ARGS_VFS_SYMLINK
@@ -198,6 +203,21 @@ static inline bool ll_xa_is_value(void *entry)
 #define ll_xa_is_value xa_is_value
 #endif
 
+/* Linux kernel version v5.0 commit fd9dc93e36231fb6d520e0edd467058fad4fd12d
+ * ("XArray: Change xa_insert to return -EBUSY")
+ * instead of -EEXIST
+ */
+static inline int __must_check ll_xa_insert(struct xarray *xa,
+                                           unsigned long index,
+                                           void *entry, gfp_t gpf)
+{
+       int rc = xa_insert(xa, index, entry, gpf);
+
+       if (rc == -EEXIST)
+               rc = -EBUSY;
+       return rc;
+}
+
 #ifndef HAVE_TRUNCATE_INODE_PAGES_FINAL
 static inline void truncate_inode_pages_final(struct address_space *map)
 {
index 4aff35f..c45484f 100644 (file)
 #ifndef _LUSTRE_NRS_ORR_H
 #define _LUSTRE_NRS_ORR_H
 
+#ifdef HAVE_XARRAY_SUPPORT
+#include <linux/xarray.h>
+#else
+#include <libcfs/linux/xarray.h>
+#endif
+
 /**
  * ORR policy operations
  */
@@ -104,8 +110,9 @@ struct nrs_orr_key {
  */
 struct nrs_orr_data {
        struct ptlrpc_nrs_resource      od_res;
-       struct binheap         *od_binheap;
-       struct cfs_hash                *od_obj_hash;
+       struct binheap                 *od_binheap;
+       struct rhashtable               od_obj_hash;
+       struct xarray                   od_trr_objs;
        struct kmem_cache              *od_cache;
        /**
         * Used when a new scheduling round commences, in order to synchronize
@@ -147,7 +154,7 @@ struct nrs_orr_data {
  */
 struct nrs_orr_object {
        struct ptlrpc_nrs_resource      oo_res;
-       struct hlist_node               oo_hnode;
+       struct rhash_head               oo_rhead;
        /**
         * The round number against which requests are being scheduled for this
         * object or OST
@@ -159,11 +166,15 @@ struct nrs_orr_object {
         */
        __u64                           oo_sequence;
        /**
-        * The key of the object or OST for which this structure instance is
-        * scheduling RPCs
+        * The key of the object or OST for which this structure
+        * instance is scheduling RPCs
+        */
+       struct nrs_orr_key      oo_key;
+       /**
+        * The reference count of this object. Once it hits zero free
+        * it from the hash table.
         */
-       struct nrs_orr_key              oo_key;
-       long                            oo_ref;
+       refcount_t                      oo_ref;
        /**
         * Round Robin quantum; the maximum number of RPCs that are allowed to
         * be scheduled for the object or OST in a single batch of each round.
@@ -173,6 +184,10 @@ struct nrs_orr_object {
         * # of pending requests for this object or OST, on all existing rounds
         */
        __u16                           oo_active;
+       /**
+        * Needed to free this structure from the RCU.
+        */
+       struct rcu_head                 oo_rcu_head;
 };
 
 /**
index e2d1a3c..42e1d62 100644 (file)
  * @{
  */
 #define DEBUG_SUBSYSTEM S_RPC
+#include <linux/delay.h>
+
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_net.h>
 #include <lustre_req_layout.h>
+#include <lustre_compat.h>
 #include "ptlrpc_internal.h"
 
 /**
@@ -129,12 +132,11 @@ static int nrs_orr_key_fill(struct nrs_orr_data *orrd,
                            struct ptlrpc_nrs_request *nrq, __u32 opc,
                            char *name, struct nrs_orr_key *key)
 {
-       struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
-                                                  rq_nrq);
-       struct ost_body        *body;
-       __u32                   ost_idx;
-       bool                    is_orr = strncmp(name, NRS_POL_NAME_ORR,
-                                                NRS_POL_NAME_MAX) == 0;
+       struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
+                                                 rq_nrq);
+       struct ost_body *body;
+       u32 ost_idx;
+       int rc;
 
        LASSERT(req != NULL);
 
@@ -143,11 +145,10 @@ static int nrs_orr_key_fill(struct nrs_orr_data *orrd,
         * moving a request from the regular to the high-priority NRS
         * head (via ldlm_lock_reorder_req()), but the request key has
         * been adequately filled when nrs_orr_res_get() was called through
-        * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR/TRR
+        * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR
         * policy, so there is nothing to do.
         */
-       if ((is_orr && nrq->nr_u.orr.or_orr_set) ||
-           (!is_orr && nrq->nr_u.orr.or_trr_set)) {
+       if (nrq->nr_u.orr.or_orr_set) {
                *key = nrq->nr_u.orr.or_key;
                return 0;
        }
@@ -156,32 +157,23 @@ static int nrs_orr_key_fill(struct nrs_orr_data *orrd,
        if (req->rq_export == NULL)
                return -ENOTCONN;
 
-       if (nrq->nr_u.orr.or_orr_set || nrq->nr_u.orr.or_trr_set)
-               memset(&nrq->nr_u.orr.or_key, 0, sizeof(nrq->nr_u.orr.or_key));
-
        ost_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
 
-       if (is_orr) {
-               int     rc;
-               /**
-                * The request pill for OST_READ and OST_WRITE requests is
-                * initialized in the ost_io service's
-                * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(),
-                * so no need to redo it here.
-                */
-               body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-               if (body == NULL)
-                       RETURN(-EFAULT);
+       /**
+        * The request pill for OST_READ and OST_WRITE requests is
+        * initialized in the ost_io service's
+        * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(),
+        * so no need to redo it here.
+        */
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       if (!body)
+               RETURN(-EFAULT);
 
-               rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx);
-               if (rc < 0)
-                       return rc;
+       rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx);
+       if (rc < 0)
+               return rc;
 
-               nrq->nr_u.orr.or_orr_set = 1;
-       } else {
-               key->ok_idx = ost_idx;
-               nrq->nr_u.orr.or_trr_set = 1;
-       }
+       nrq->nr_u.orr.or_orr_set = 1;
 
        return 0;
 }
@@ -358,11 +350,10 @@ out:
 
 /**
  * Generates a character string that can be used in order to register uniquely
- * named libcfs_hash and slab objects for ORR/TRR policy instances. The
- * character string is unique per policy instance, as it includes the policy's
- * name, the CPT number, and a {reg|hp} token, and there is one policy instance
- * per NRS head on each CPT, and the policy is only compatible with the ost_io
- * service.
+ * named slab objects for ORR/TRR policy instances. The character string is
+ * unique per policy instance, as it includes the policy's name, the CPT number,
+ * and a {reg|hp} token, and there is one policy instance per NRS head on each
+ * CPT, and the policy is only compatible with the ost_io service.
  *
  * \param[in] policy the policy instance
  * \param[out] name  the character array that will hold the generated name
@@ -378,130 +369,45 @@ static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name)
 /**
  * ORR/TRR hash operations
  */
-#define NRS_ORR_BITS           24
-#define NRS_ORR_BKT_BITS       12
-#define NRS_ORR_HASH_FLAGS     (CFS_HASH_SPIN_BKTLOCK | CFS_HASH_ASSERT_EMPTY)
-
-#define NRS_TRR_BITS           4
-#define NRS_TRR_BKT_BITS       2
-#define NRS_TRR_HASH_FLAGS     CFS_HASH_SPIN_BKTLOCK
-
-static unsigned
-nrs_orr_hop_hash(struct cfs_hash *hs, const void *key, unsigned mask)
-{
-       return cfs_hash_djb2_hash(key, sizeof(struct nrs_orr_key), mask);
-}
-
-static void *nrs_orr_hop_key(struct hlist_node *hnode)
-{
-       struct nrs_orr_object *orro = hlist_entry(hnode,
-                                                     struct nrs_orr_object,
-                                                     oo_hnode);
-       return &orro->oo_key;
-}
-
-static int nrs_orr_hop_keycmp(const void *key, struct hlist_node *hnode)
-{
-       struct nrs_orr_object *orro = hlist_entry(hnode,
-                                                     struct nrs_orr_object,
-                                                     oo_hnode);
-
-       return lu_fid_eq(&orro->oo_key.ok_fid,
-                        &((struct nrs_orr_key *)key)->ok_fid);
-}
-
-static void *nrs_orr_hop_object(struct hlist_node *hnode)
-{
-       return hlist_entry(hnode, struct nrs_orr_object, oo_hnode);
-}
-
-static void nrs_orr_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-       struct nrs_orr_object *orro = hlist_entry(hnode,
-                                                     struct nrs_orr_object,
-                                                     oo_hnode);
-       orro->oo_ref++;
-}
-
-/**
- * Removes an nrs_orr_object the hash and frees its memory, if the object has
- * no active users.
- */
-static void nrs_orr_hop_put_free(struct cfs_hash *hs, struct hlist_node *hnode)
+static u32 nrs_orr_hashfn(const void *data, u32 len, u32 seed)
 {
-       struct nrs_orr_object *orro = hlist_entry(hnode,
-                                                     struct nrs_orr_object,
-                                                     oo_hnode);
-       struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
-                                                  struct nrs_orr_data, od_res);
-       struct cfs_hash_bd     bd;
-
-       cfs_hash_bd_get_and_lock(hs, &orro->oo_key, &bd, 1);
-
-       if (--orro->oo_ref > 1) {
-               cfs_hash_bd_unlock(hs, &bd, 1);
-
-               return;
-       }
-       LASSERT(orro->oo_ref == 1);
-
-       cfs_hash_bd_del_locked(hs, &bd, hnode);
-       cfs_hash_bd_unlock(hs, &bd, 1);
-
-       OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
-}
+       const struct nrs_orr_key *key = data;
 
-static void nrs_orr_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-       struct nrs_orr_object *orro = hlist_entry(hnode,
-                                                     struct nrs_orr_object,
-                                                     oo_hnode);
-       orro->oo_ref--;
+       seed = cfs_hash_32(seed ^ key->ok_fid.f_oid, 32);
+       seed ^= cfs_hash_64(key->ok_fid.f_seq, 32);
+       return seed;
 }
 
-static int nrs_trr_hop_keycmp(const void *key, struct hlist_node *hnode)
+static int nrs_orr_cmpfn(struct rhashtable_compare_arg *arg, const void *obj)
 {
-       struct nrs_orr_object *orro = hlist_entry(hnode,
-                                                     struct nrs_orr_object,
-                                                     oo_hnode);
+       const struct nrs_orr_object *orro = obj;
+       const struct nrs_orr_key *key = arg->key;
 
-       return orro->oo_key.ok_idx == ((struct nrs_orr_key *)key)->ok_idx;
+       return lu_fid_eq(&orro->oo_key.ok_fid, &key->ok_fid) ? 0 : -ESRCH;
 }
 
-static void nrs_trr_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
+static void nrs_orr_hash_exit(void *vobj, void *data)
 {
-       struct nrs_orr_object *orro = hlist_entry(hnode,
-                                                     struct nrs_orr_object,
-                                                     oo_hnode);
-       struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
-                                                  struct nrs_orr_data, od_res);
+       struct nrs_orr_object *orro = vobj;
+       struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent,
+                                                struct nrs_orr_data, od_res);
 
-       LASSERTF(orro->oo_ref == 0,
-                "Busy NRS TRR policy object for OST with index %u, with %ld "
-                "refs\n", orro->oo_key.ok_idx, orro->oo_ref);
+       /* We shouldn't reach here but just in case. nrs_xxx_res_put
+        * should of have freed orro.
+        */
+       LASSERTF(refcount_read(&orro->oo_ref) == 0,
+                "Busy NRS ORR policy object for OST with index %u, with %d refs\n",
+                orro->oo_key.ok_idx, refcount_read(&orro->oo_ref));
 
        OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
 }
 
-static struct cfs_hash_ops nrs_orr_hash_ops = {
-       .hs_hash        = nrs_orr_hop_hash,
-       .hs_key         = nrs_orr_hop_key,
-       .hs_keycmp      = nrs_orr_hop_keycmp,
-       .hs_object      = nrs_orr_hop_object,
-       .hs_get         = nrs_orr_hop_get,
-       .hs_put         = nrs_orr_hop_put_free,
-       .hs_put_locked  = nrs_orr_hop_put,
-};
-
-static struct cfs_hash_ops nrs_trr_hash_ops = {
-       .hs_hash        = nrs_orr_hop_hash,
-       .hs_key         = nrs_orr_hop_key,
-       .hs_keycmp      = nrs_trr_hop_keycmp,
-       .hs_object      = nrs_orr_hop_object,
-       .hs_get         = nrs_orr_hop_get,
-       .hs_put         = nrs_orr_hop_put,
-       .hs_put_locked  = nrs_orr_hop_put,
-       .hs_exit        = nrs_trr_hop_exit,
+static const struct rhashtable_params nrs_orr_hash_params = {
+       .key_len        = sizeof(struct lu_fid),
+       .key_offset     = offsetof(struct nrs_orr_object, oo_key),
+       .head_offset    = offsetof(struct nrs_orr_object, oo_rhead),
+       .hashfn         = nrs_orr_hashfn,
+       .obj_cmpfn      = nrs_orr_cmpfn,
 };
 
 #define NRS_ORR_QUANTUM_DFLT   256
@@ -616,11 +522,6 @@ static int nrs_orr_init(struct ptlrpc_nrs_policy *policy)
 static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg)
 {
        struct nrs_orr_data    *orrd;
-       struct cfs_hash_ops            *ops;
-       unsigned                cur_bits;
-       unsigned                max_bits;
-       unsigned                bkt_bits;
-       unsigned                flags;
        int                     rc = 0;
        ENTRY;
 
@@ -632,9 +533,9 @@ static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg)
         * Binary heap instance for sorted incoming requests.
         */
        orrd->od_binheap = binheap_create(&nrs_orr_heap_ops,
-                                             CBH_FLAG_ATOMIC_GROW, 4096, NULL,
-                                             nrs_pol2cptab(policy),
-                                             nrs_pol2cptid(policy));
+                                         CBH_FLAG_ATOMIC_GROW, 4096, NULL,
+                                         nrs_pol2cptab(policy),
+                                         nrs_pol2cptid(policy));
        if (orrd->od_binheap == NULL)
                GOTO(out_orrd, rc = -ENOMEM);
 
@@ -649,34 +550,21 @@ static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg)
        if (orrd->od_cache == NULL)
                GOTO(out_binheap, rc = -ENOMEM);
 
+       /**
+        * Use a hash for finding objects by struct nrs_orr_key.
+        * For TRR we use Xarray instead since items are resolved
+        * using the OST indices, and they will stay relatively
+        * stable during an OSS node's lifetime.
+        */
        if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
                    NRS_POL_NAME_MAX) == 0) {
-               ops = &nrs_orr_hash_ops;
-               cur_bits = NRS_ORR_BITS;
-               max_bits = NRS_ORR_BITS;
-               bkt_bits = NRS_ORR_BKT_BITS;
-               flags = NRS_ORR_HASH_FLAGS;
+               rc = rhashtable_init(&orrd->od_obj_hash, &nrs_orr_hash_params);
+               if (rc < 0)
+                       GOTO(out_cache, rc);
        } else {
-               ops = &nrs_trr_hash_ops;
-               cur_bits = NRS_TRR_BITS;
-               max_bits = NRS_TRR_BITS;
-               bkt_bits = NRS_TRR_BKT_BITS;
-               flags = NRS_TRR_HASH_FLAGS;
+               xa_init(&orrd->od_trr_objs);
        }
 
-       /**
-        * Hash for finding objects by struct nrs_orr_key.
-        * XXX: For TRR, it might be better to avoid using libcfs_hash?
-        * All that needs to be resolved are OST indices, and they
-        * will stay relatively stable during an OSS node's lifetime.
-        */
-       orrd->od_obj_hash = cfs_hash_create(orrd->od_objname, cur_bits,
-                                           max_bits, bkt_bits, 0,
-                                           CFS_HASH_MIN_THETA,
-                                           CFS_HASH_MAX_THETA, ops, flags);
-       if (orrd->od_obj_hash == NULL)
-               GOTO(out_cache, rc = -ENOMEM);
-
        /* XXX: Fields accessed unlocked */
        orrd->od_quantum = NRS_ORR_QUANTUM_DFLT;
        orrd->od_supp = NOS_DFLT;
@@ -717,12 +605,24 @@ static void nrs_orr_stop(struct ptlrpc_nrs_policy *policy)
 
        LASSERT(orrd != NULL);
        LASSERT(orrd->od_binheap != NULL);
-       LASSERT(orrd->od_obj_hash != NULL);
        LASSERT(orrd->od_cache != NULL);
        LASSERT(binheap_is_empty(orrd->od_binheap));
 
        binheap_destroy(orrd->od_binheap);
-       cfs_hash_putref(orrd->od_obj_hash);
+       if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_TRR,
+                   NRS_POL_NAME_MAX) == 0) {
+               struct nrs_orr_object *orro;
+               unsigned long i;
+
+               xa_for_each(&orrd->od_trr_objs, i, orro) {
+                       xa_erase(&orrd->od_trr_objs, i);
+                       OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
+               }
+               xa_destroy(&orrd->od_trr_objs);
+       } else {
+               rhashtable_free_and_destroy(&orrd->od_obj_hash,
+                                           nrs_orr_hash_exit, NULL);
+       }
        kmem_cache_destroy(orrd->od_cache);
 
        OBD_FREE_PTR(orrd);
@@ -825,12 +725,11 @@ static int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
                           const struct ptlrpc_nrs_resource *parent,
                           struct ptlrpc_nrs_resource **resp, bool moving_req)
 {
-       struct nrs_orr_data            *orrd;
-       struct nrs_orr_object          *orro;
-       struct nrs_orr_object          *tmp;
-       struct nrs_orr_key              key = { { { 0 } } };
-       __u32                           opc;
-       int                             rc = 0;
+       struct nrs_orr_data *orrd;
+       struct nrs_orr_object *orro, *orro2;
+       struct nrs_orr_key key = { { { 0 } } };
+       u32 opc;
+       int rc = 0;
 
        /**
         * struct nrs_orr_data is requested.
@@ -864,9 +763,18 @@ static int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
        if (rc < 0)
                RETURN(rc);
 
-       orro = cfs_hash_lookup(orrd->od_obj_hash, &key);
-       if (orro != NULL)
-               goto out;
+       /* Handle the ORR case which involves looking up the orro in the
+        * hashtable. If not found then insert it. Unlike TRR the orro can
+        * be deleted in parallel during the life cycle of the object.
+        */
+       rcu_read_lock();
+       orro = rhashtable_lookup_fast(&orrd->od_obj_hash, &key,
+                                     nrs_orr_hash_params);
+       if (orro && refcount_inc_not_zero(&orro->oo_ref)) {
+               rcu_read_unlock();
+               goto found_orro;
+       }
+       rcu_read_unlock();
 
        OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache,
                                   nrs_pol2cptab(policy), nrs_pol2cptid(policy),
@@ -875,15 +783,35 @@ static int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
                RETURN(-ENOMEM);
 
        orro->oo_key = key;
-       orro->oo_ref = 1;
+       refcount_set(&orro->oo_ref, 1);
+try_again:
+       rcu_read_lock();
+       orro2 = rhashtable_lookup_get_insert_fast(&orrd->od_obj_hash,
+                                                 &orro->oo_rhead,
+                                                 nrs_orr_hash_params);
+       /* A returned non-error orro2 means it already exist */
+       if (!IS_ERR_OR_NULL(orro2)) {
+               OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
+               orro = orro2;
+       }
+
+       /* insertion failed */
+       if (IS_ERR(orro2)) {
+               rc = PTR_ERR(orro2);
 
-       tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key,
-                                     &orro->oo_hnode);
-       if (tmp != orro) {
+               /* hash table could be resizing. */
+               if (rc == -ENOMEM || rc == -EBUSY) {
+                       rcu_read_unlock();
+                       mdelay(20);
+                       goto try_again;
+               }
                OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
-               orro = tmp;
+               rcu_read_unlock();
+
+               RETURN(rc);
        }
-out:
+       rcu_read_unlock();
+found_orro:
        /**
         * For debugging purposes
         */
@@ -894,6 +822,16 @@ out:
        return 1;
 }
 
+static void nrs_orr_object_free(struct rcu_head *head)
+{
+       struct nrs_orr_object *orro = container_of(head, struct nrs_orr_object,
+                                                  oo_rcu_head);
+       struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent,
+                                                struct nrs_orr_data, od_res);
+
+       OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
+}
+
 /**
  * Called when releasing references to the resource hierachy obtained for a
  * request for scheduling using ORR/TRR policy instances
@@ -914,15 +852,134 @@ static void nrs_orr_res_put(struct ptlrpc_nrs_policy *policy,
                return;
 
        orro = container_of(res, struct nrs_orr_object, oo_res);
-       orrd = container_of(res->res_parent, struct nrs_orr_data, od_res);
+       if (!refcount_dec_and_test(&orro->oo_ref))
+               return;
+
+       orrd = container_of(orro->oo_res.res_parent, struct nrs_orr_data, od_res);
+       rhashtable_remove_fast(&orrd->od_obj_hash, &orro->oo_rhead,
+                              nrs_orr_hash_params);
+       call_rcu(&orro->oo_rcu_head, nrs_orr_object_free);
+}
+
+/**
+ * Obtains resources for TRR policy instances. The top-level resource lives
+ * inside \e nrs_orr_data and the second-level resource inside
+ * \e nrs_orr_object instances.
+ *
+ * @policy     the policy for which resources are being taken for
+ *             request @nrq
+ * @nrq                the request for which resources are being taken
+ * @parent     parent resource, embedded in nrs_orr_data for the
+ *             TRR policies
+ * @resp       used to return resource references
+ * @moving_req signifies limited caller context; used to perform
+ *             memory allocations in an atomic context in this
+ *             policy
+ *
+ * RETURN      0 we are returning a top-level, parent resource, one that is
+ *               embedded in an nrs_orr_data object
+ *             1 we are returning a bottom-level resource, one that is embedded
+ *               in an nrs_orr_object object
+ *
+ * \see nrs_resource_get_safe()
+ */
+static int nrs_trr_res_get(struct ptlrpc_nrs_policy *policy,
+                          struct ptlrpc_nrs_request *nrq,
+                          const struct ptlrpc_nrs_resource *parent,
+                          struct ptlrpc_nrs_resource **resp, bool moving_req)
+{
+       struct nrs_orr_key key = { { { 0 } } };
+       struct nrs_orr_object *orro;
+       struct nrs_orr_data *orrd;
+       int rc = 0;
+       u32 opc;
+
+       /**
+        * struct nrs_orr_data is requested.
+        */
+       if (!parent) {
+               *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res;
+               return 0;
+       }
+
+       orrd = container_of(parent, struct nrs_orr_data, od_res);
+
+       /**
+        * If the request type is not supported, fail the enqueuing; the RPC
+        * will be handled by the fallback NRS policy.
+        */
+       if (!nrs_orr_req_supported(orrd, nrq, &opc))
+               return -1;
+
+       /**
+        * This is an attempt to fill in the request key fields while
+        * moving a request from the regular to the high-priority NRS
+        * head (via ldlm_lock_reorder_req()), but the request key has
+        * been adequately filled when nrs_trr_res_get() was called through
+        * ptlrpc_nrs_req_initialize() for the regular NRS head's TRR
+        * policy, so there is nothing to do.
+        */
+       if (!nrq->nr_u.orr.or_trr_set) {
+               struct ptlrpc_request *req;
+
+               /* Bounce unconnected requests to the default policy. */
+               req = container_of(nrq, struct ptlrpc_request, rq_nrq);
+               if (!req->rq_export)
+                       return -ENOTCONN;
+
+               key.ok_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
+               nrq->nr_u.orr.or_trr_set = 1;
+       } else {
+               key = nrq->nr_u.orr.or_key;
+       }
+
+       /**
+        * Set the offset range the request covers
+        */
+       rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req);
+       if (rc < 0)
+               RETURN(rc);
+
+       /* For TRR we just attempt to find the orro via the ok_idx.
+        * If not found we insert it into the Xarray.
+        */
+try_again:
+       orro = xa_load(&orrd->od_trr_objs, key.ok_idx);
+       if (orro)
+               goto found_orro;
+
+       OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache,
+                                  nrs_pol2cptab(policy), nrs_pol2cptid(policy),
+                                  moving_req ? GFP_ATOMIC : GFP_NOFS);
+       if (!orro)
+               RETURN(-ENOMEM);
+
+       orro->oo_key = key;
+       rc = ll_xa_insert(&orrd->od_trr_objs, key.ok_idx, orro,
+                      moving_req ? GFP_ATOMIC : GFP_NOFS);
+       if (rc < 0) {
+               OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
+               if (rc == -EBUSY)
+                       goto try_again;
+
+               RETURN(rc);
+       }
+
+found_orro:
+       /**
+        * For debugging purposes
+        */
+       nrq->nr_u.orr.or_key = orro->oo_key;
+
+       *resp = &orro->oo_res;
 
-       cfs_hash_put(orrd->od_obj_hash, &orro->oo_hnode);
+       return 1;
 }
 
 /**
  * Called when polling an ORR/TRR policy instance for a request so that it can
  * be served. Returns the request that is at the root of the binary heap, as
- * that is the lowest priority one (i.e. libcfs_heap is an implementation of a
+ * that is the lowest priority one (i.e. binheap is an implementation of a
  * min-heap)
  *
  * \param[in] policy the policy instance being polled
@@ -1133,29 +1190,36 @@ static void nrs_orr_req_del(struct ptlrpc_nrs_policy *policy,
 }
 
 /**
- * Called right after the request \a nrq finishes being handled by ORR policy
+ * Called right after the request @nrq finishes being handled by ORR policy
  * instance \a policy.
  *
- * \param[in] policy the policy that handled the request
- * \param[in] nrq    the request that was handled
+ * @policy     the policy that handled the request
+ * @nrq                the request that was handled
  */
 static void nrs_orr_req_stop(struct ptlrpc_nrs_policy *policy,
                             struct ptlrpc_nrs_request *nrq)
 {
        /** NB: resource control, credits etc can be added here */
-       if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
-                   NRS_POL_NAME_MAX) == 0)
-               CDEBUG(D_RPCTRACE,
-                      "NRS: finished handling %s request for object with FID "
-                      DFID", from OST with index %u, with round %llu\n",
-                      NRS_POL_NAME_ORR, PFID(&nrq->nr_u.orr.or_key.ok_fid),
-                      nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round);
-       else
-               CDEBUG(D_RPCTRACE,
-                      "NRS: finished handling %s request from OST with index %u,"
-                      " with round %llu\n",
-                      NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
-                      nrq->nr_u.orr.or_round);
+       CDEBUG(D_RPCTRACE,
+              "NRS: finished handling ORR request for object with FID "DFID", from OST with index %u, with round %llu\n",
+              PFID(&nrq->nr_u.orr.or_key.ok_fid), nrq->nr_u.orr.or_key.ok_idx,
+              nrq->nr_u.orr.or_round);
+}
+
+/**
+ * Called right after the request @nrq finishes being handled by TRR policy
+ * instance @policy.
+ *
+ * @policy     the policy that handled the request
+ * @nrq                the request that was handled
+ */
+static void nrs_trr_req_stop(struct ptlrpc_nrs_policy *policy,
+                            struct ptlrpc_nrs_request *nrq)
+{
+       /** NB: resource control, credits etc can be added here */
+       CDEBUG(D_RPCTRACE,
+              "NRS: finished handling TRR request from OST with index %u, with round %llu\n",
+              nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round);
 }
 
 /**
@@ -1949,12 +2013,11 @@ static const struct ptlrpc_nrs_pol_ops nrs_trr_ops = {
        .op_policy_start        = nrs_orr_start,
        .op_policy_stop         = nrs_orr_stop,
        .op_policy_ctl          = nrs_orr_ctl,
-       .op_res_get             = nrs_orr_res_get,
-       .op_res_put             = nrs_orr_res_put,
+       .op_res_get             = nrs_trr_res_get,
        .op_req_get             = nrs_orr_req_get,
        .op_req_enqueue         = nrs_orr_req_add,
        .op_req_dequeue         = nrs_orr_req_del,
-       .op_req_stop            = nrs_orr_req_stop,
+       .op_req_stop            = nrs_trr_req_stop,
        .op_lprocfs_init        = nrs_trr_lprocfs_init,
 };