From 42bf5f78ba23ed5a7a908c921a5d8ded7544a315 Mon Sep 17 00:00:00 2001 From: James Simmons Date: Tue, 25 Jul 2023 14:14:03 -0400 Subject: [PATCH] LU-8130 nrs: convert NRS ORR/TRR to rhashtable Move away from the cfs hash implementation to rhashtable for NRS ORR handling. Since rhashtable is lockless it should also increase performance. For the NRS TRR handling we can use Xarray since its based on OST index which are sequential which will give better performance than an hashtable. TRR entries are added to the Xarray but not removed until the Xarray is destroyed. Test-Parameters: testlist=sanityn env=ONLY=77c+77d,ONLY_REPEAT=100 Change-Id: I5206a7586d4b4c8991b7163fd9253f017e6d3969 Signed-off-by: James Simmons Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/40113 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Neil Brown Reviewed-by: Oleg Drokin Reviewed-by: Etienne AUJAMES --- lustre/include/lustre_compat.h | 20 ++ lustre/include/lustre_nrs_orr.h | 29 ++- lustre/ptlrpc/nrs_orr.c | 495 ++++++++++++++++++++++------------------ 3 files changed, 321 insertions(+), 223 deletions(-) diff --git a/lustre/include/lustre_compat.h b/lustre/include/lustre_compat.h index 3040cc2..34042d5 100644 --- a/lustre/include/lustre_compat.h +++ b/lustre/include/lustre_compat.h @@ -45,6 +45,11 @@ #include #include #include +#ifdef HAVE_XARRAY_SUPPORT +#include +#else +#include +#endif #include #ifdef HAVE_4ARGS_VFS_SYMLINK @@ -198,6 +203,21 @@ static inline bool ll_xa_is_value(void *entry) #define ll_xa_is_value xa_is_value #endif +/* Linux kernel version v5.0 commit fd9dc93e36231fb6d520e0edd467058fad4fd12d + * ("XArray: Change xa_insert to return -EBUSY") + * instead of -EEXIST + */ +static inline int __must_check ll_xa_insert(struct xarray *xa, + unsigned long index, + void *entry, gfp_t gpf) +{ + int rc = xa_insert(xa, index, entry, gpf); + + if (rc == -EEXIST) + rc = -EBUSY; + return rc; +} + #ifndef HAVE_TRUNCATE_INODE_PAGES_FINAL static inline void truncate_inode_pages_final(struct address_space *map) { diff --git a/lustre/include/lustre_nrs_orr.h b/lustre/include/lustre_nrs_orr.h index 4aff35f..c45484f 100644 --- a/lustre/include/lustre_nrs_orr.h +++ b/lustre/include/lustre_nrs_orr.h @@ -33,6 +33,12 @@ #ifndef _LUSTRE_NRS_ORR_H #define _LUSTRE_NRS_ORR_H +#ifdef HAVE_XARRAY_SUPPORT +#include +#else +#include +#endif + /** * ORR policy operations */ @@ -104,8 +110,9 @@ struct nrs_orr_key { */ struct nrs_orr_data { struct ptlrpc_nrs_resource od_res; - struct binheap *od_binheap; - struct cfs_hash *od_obj_hash; + struct binheap *od_binheap; + struct rhashtable od_obj_hash; + struct xarray od_trr_objs; struct kmem_cache *od_cache; /** * Used when a new scheduling round commences, in order to synchronize @@ -147,7 +154,7 @@ struct nrs_orr_data { */ struct nrs_orr_object { struct ptlrpc_nrs_resource oo_res; - struct hlist_node oo_hnode; + struct rhash_head oo_rhead; /** * The round number against which requests are being scheduled for this * object or OST @@ -159,11 +166,15 @@ struct nrs_orr_object { */ __u64 oo_sequence; /** - * The key of the object or OST for which this structure instance is - * scheduling RPCs + * The key of the object or OST for which this structure + * instance is scheduling RPCs + */ + struct nrs_orr_key oo_key; + /** + * The reference count of this object. Once it hits zero free + * it from the hash table. */ - struct nrs_orr_key oo_key; - long oo_ref; + refcount_t oo_ref; /** * Round Robin quantum; the maximum number of RPCs that are allowed to * be scheduled for the object or OST in a single batch of each round. @@ -173,6 +184,10 @@ struct nrs_orr_object { * # of pending requests for this object or OST, on all existing rounds */ __u16 oo_active; + /** + * Needed to free this structure from the RCU. + */ + struct rcu_head oo_rcu_head; }; /** diff --git a/lustre/ptlrpc/nrs_orr.c b/lustre/ptlrpc/nrs_orr.c index e2d1a3c..42e1d62 100644 --- a/lustre/ptlrpc/nrs_orr.c +++ b/lustre/ptlrpc/nrs_orr.c @@ -41,10 +41,13 @@ * @{ */ #define DEBUG_SUBSYSTEM S_RPC +#include + #include #include #include #include +#include #include "ptlrpc_internal.h" /** @@ -129,12 +132,11 @@ static int nrs_orr_key_fill(struct nrs_orr_data *orrd, struct ptlrpc_nrs_request *nrq, __u32 opc, char *name, struct nrs_orr_key *key) { - struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request, - rq_nrq); - struct ost_body *body; - __u32 ost_idx; - bool is_orr = strncmp(name, NRS_POL_NAME_ORR, - NRS_POL_NAME_MAX) == 0; + struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request, + rq_nrq); + struct ost_body *body; + u32 ost_idx; + int rc; LASSERT(req != NULL); @@ -143,11 +145,10 @@ static int nrs_orr_key_fill(struct nrs_orr_data *orrd, * moving a request from the regular to the high-priority NRS * head (via ldlm_lock_reorder_req()), but the request key has * been adequately filled when nrs_orr_res_get() was called through - * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR/TRR + * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR * policy, so there is nothing to do. */ - if ((is_orr && nrq->nr_u.orr.or_orr_set) || - (!is_orr && nrq->nr_u.orr.or_trr_set)) { + if (nrq->nr_u.orr.or_orr_set) { *key = nrq->nr_u.orr.or_key; return 0; } @@ -156,32 +157,23 @@ static int nrs_orr_key_fill(struct nrs_orr_data *orrd, if (req->rq_export == NULL) return -ENOTCONN; - if (nrq->nr_u.orr.or_orr_set || nrq->nr_u.orr.or_trr_set) - memset(&nrq->nr_u.orr.or_key, 0, sizeof(nrq->nr_u.orr.or_key)); - ost_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index; - if (is_orr) { - int rc; - /** - * The request pill for OST_READ and OST_WRITE requests is - * initialized in the ost_io service's - * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(), - * so no need to redo it here. - */ - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - RETURN(-EFAULT); + /** + * The request pill for OST_READ and OST_WRITE requests is + * initialized in the ost_io service's + * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(), + * so no need to redo it here. + */ + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + if (!body) + RETURN(-EFAULT); - rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx); - if (rc < 0) - return rc; + rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx); + if (rc < 0) + return rc; - nrq->nr_u.orr.or_orr_set = 1; - } else { - key->ok_idx = ost_idx; - nrq->nr_u.orr.or_trr_set = 1; - } + nrq->nr_u.orr.or_orr_set = 1; return 0; } @@ -358,11 +350,10 @@ out: /** * Generates a character string that can be used in order to register uniquely - * named libcfs_hash and slab objects for ORR/TRR policy instances. The - * character string is unique per policy instance, as it includes the policy's - * name, the CPT number, and a {reg|hp} token, and there is one policy instance - * per NRS head on each CPT, and the policy is only compatible with the ost_io - * service. + * named slab objects for ORR/TRR policy instances. The character string is + * unique per policy instance, as it includes the policy's name, the CPT number, + * and a {reg|hp} token, and there is one policy instance per NRS head on each + * CPT, and the policy is only compatible with the ost_io service. * * \param[in] policy the policy instance * \param[out] name the character array that will hold the generated name @@ -378,130 +369,45 @@ static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name) /** * ORR/TRR hash operations */ -#define NRS_ORR_BITS 24 -#define NRS_ORR_BKT_BITS 12 -#define NRS_ORR_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | CFS_HASH_ASSERT_EMPTY) - -#define NRS_TRR_BITS 4 -#define NRS_TRR_BKT_BITS 2 -#define NRS_TRR_HASH_FLAGS CFS_HASH_SPIN_BKTLOCK - -static unsigned -nrs_orr_hop_hash(struct cfs_hash *hs, const void *key, unsigned mask) -{ - return cfs_hash_djb2_hash(key, sizeof(struct nrs_orr_key), mask); -} - -static void *nrs_orr_hop_key(struct hlist_node *hnode) -{ - struct nrs_orr_object *orro = hlist_entry(hnode, - struct nrs_orr_object, - oo_hnode); - return &orro->oo_key; -} - -static int nrs_orr_hop_keycmp(const void *key, struct hlist_node *hnode) -{ - struct nrs_orr_object *orro = hlist_entry(hnode, - struct nrs_orr_object, - oo_hnode); - - return lu_fid_eq(&orro->oo_key.ok_fid, - &((struct nrs_orr_key *)key)->ok_fid); -} - -static void *nrs_orr_hop_object(struct hlist_node *hnode) -{ - return hlist_entry(hnode, struct nrs_orr_object, oo_hnode); -} - -static void nrs_orr_hop_get(struct cfs_hash *hs, struct hlist_node *hnode) -{ - struct nrs_orr_object *orro = hlist_entry(hnode, - struct nrs_orr_object, - oo_hnode); - orro->oo_ref++; -} - -/** - * Removes an nrs_orr_object the hash and frees its memory, if the object has - * no active users. - */ -static void nrs_orr_hop_put_free(struct cfs_hash *hs, struct hlist_node *hnode) +static u32 nrs_orr_hashfn(const void *data, u32 len, u32 seed) { - struct nrs_orr_object *orro = hlist_entry(hnode, - struct nrs_orr_object, - oo_hnode); - struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent, - struct nrs_orr_data, od_res); - struct cfs_hash_bd bd; - - cfs_hash_bd_get_and_lock(hs, &orro->oo_key, &bd, 1); - - if (--orro->oo_ref > 1) { - cfs_hash_bd_unlock(hs, &bd, 1); - - return; - } - LASSERT(orro->oo_ref == 1); - - cfs_hash_bd_del_locked(hs, &bd, hnode); - cfs_hash_bd_unlock(hs, &bd, 1); - - OBD_SLAB_FREE_PTR(orro, orrd->od_cache); -} + const struct nrs_orr_key *key = data; -static void nrs_orr_hop_put(struct cfs_hash *hs, struct hlist_node *hnode) -{ - struct nrs_orr_object *orro = hlist_entry(hnode, - struct nrs_orr_object, - oo_hnode); - orro->oo_ref--; + seed = cfs_hash_32(seed ^ key->ok_fid.f_oid, 32); + seed ^= cfs_hash_64(key->ok_fid.f_seq, 32); + return seed; } -static int nrs_trr_hop_keycmp(const void *key, struct hlist_node *hnode) +static int nrs_orr_cmpfn(struct rhashtable_compare_arg *arg, const void *obj) { - struct nrs_orr_object *orro = hlist_entry(hnode, - struct nrs_orr_object, - oo_hnode); + const struct nrs_orr_object *orro = obj; + const struct nrs_orr_key *key = arg->key; - return orro->oo_key.ok_idx == ((struct nrs_orr_key *)key)->ok_idx; + return lu_fid_eq(&orro->oo_key.ok_fid, &key->ok_fid) ? 0 : -ESRCH; } -static void nrs_trr_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode) +static void nrs_orr_hash_exit(void *vobj, void *data) { - struct nrs_orr_object *orro = hlist_entry(hnode, - struct nrs_orr_object, - oo_hnode); - struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent, - struct nrs_orr_data, od_res); + struct nrs_orr_object *orro = vobj; + struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent, + struct nrs_orr_data, od_res); - LASSERTF(orro->oo_ref == 0, - "Busy NRS TRR policy object for OST with index %u, with %ld " - "refs\n", orro->oo_key.ok_idx, orro->oo_ref); + /* We shouldn't reach here but just in case. nrs_xxx_res_put + * should of have freed orro. + */ + LASSERTF(refcount_read(&orro->oo_ref) == 0, + "Busy NRS ORR policy object for OST with index %u, with %d refs\n", + orro->oo_key.ok_idx, refcount_read(&orro->oo_ref)); OBD_SLAB_FREE_PTR(orro, orrd->od_cache); } -static struct cfs_hash_ops nrs_orr_hash_ops = { - .hs_hash = nrs_orr_hop_hash, - .hs_key = nrs_orr_hop_key, - .hs_keycmp = nrs_orr_hop_keycmp, - .hs_object = nrs_orr_hop_object, - .hs_get = nrs_orr_hop_get, - .hs_put = nrs_orr_hop_put_free, - .hs_put_locked = nrs_orr_hop_put, -}; - -static struct cfs_hash_ops nrs_trr_hash_ops = { - .hs_hash = nrs_orr_hop_hash, - .hs_key = nrs_orr_hop_key, - .hs_keycmp = nrs_trr_hop_keycmp, - .hs_object = nrs_orr_hop_object, - .hs_get = nrs_orr_hop_get, - .hs_put = nrs_orr_hop_put, - .hs_put_locked = nrs_orr_hop_put, - .hs_exit = nrs_trr_hop_exit, +static const struct rhashtable_params nrs_orr_hash_params = { + .key_len = sizeof(struct lu_fid), + .key_offset = offsetof(struct nrs_orr_object, oo_key), + .head_offset = offsetof(struct nrs_orr_object, oo_rhead), + .hashfn = nrs_orr_hashfn, + .obj_cmpfn = nrs_orr_cmpfn, }; #define NRS_ORR_QUANTUM_DFLT 256 @@ -616,11 +522,6 @@ static int nrs_orr_init(struct ptlrpc_nrs_policy *policy) static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg) { struct nrs_orr_data *orrd; - struct cfs_hash_ops *ops; - unsigned cur_bits; - unsigned max_bits; - unsigned bkt_bits; - unsigned flags; int rc = 0; ENTRY; @@ -632,9 +533,9 @@ static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg) * Binary heap instance for sorted incoming requests. */ orrd->od_binheap = binheap_create(&nrs_orr_heap_ops, - CBH_FLAG_ATOMIC_GROW, 4096, NULL, - nrs_pol2cptab(policy), - nrs_pol2cptid(policy)); + CBH_FLAG_ATOMIC_GROW, 4096, NULL, + nrs_pol2cptab(policy), + nrs_pol2cptid(policy)); if (orrd->od_binheap == NULL) GOTO(out_orrd, rc = -ENOMEM); @@ -649,34 +550,21 @@ static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg) if (orrd->od_cache == NULL) GOTO(out_binheap, rc = -ENOMEM); + /** + * Use a hash for finding objects by struct nrs_orr_key. + * For TRR we use Xarray instead since items are resolved + * using the OST indices, and they will stay relatively + * stable during an OSS node's lifetime. + */ if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR, NRS_POL_NAME_MAX) == 0) { - ops = &nrs_orr_hash_ops; - cur_bits = NRS_ORR_BITS; - max_bits = NRS_ORR_BITS; - bkt_bits = NRS_ORR_BKT_BITS; - flags = NRS_ORR_HASH_FLAGS; + rc = rhashtable_init(&orrd->od_obj_hash, &nrs_orr_hash_params); + if (rc < 0) + GOTO(out_cache, rc); } else { - ops = &nrs_trr_hash_ops; - cur_bits = NRS_TRR_BITS; - max_bits = NRS_TRR_BITS; - bkt_bits = NRS_TRR_BKT_BITS; - flags = NRS_TRR_HASH_FLAGS; + xa_init(&orrd->od_trr_objs); } - /** - * Hash for finding objects by struct nrs_orr_key. - * XXX: For TRR, it might be better to avoid using libcfs_hash? - * All that needs to be resolved are OST indices, and they - * will stay relatively stable during an OSS node's lifetime. - */ - orrd->od_obj_hash = cfs_hash_create(orrd->od_objname, cur_bits, - max_bits, bkt_bits, 0, - CFS_HASH_MIN_THETA, - CFS_HASH_MAX_THETA, ops, flags); - if (orrd->od_obj_hash == NULL) - GOTO(out_cache, rc = -ENOMEM); - /* XXX: Fields accessed unlocked */ orrd->od_quantum = NRS_ORR_QUANTUM_DFLT; orrd->od_supp = NOS_DFLT; @@ -717,12 +605,24 @@ static void nrs_orr_stop(struct ptlrpc_nrs_policy *policy) LASSERT(orrd != NULL); LASSERT(orrd->od_binheap != NULL); - LASSERT(orrd->od_obj_hash != NULL); LASSERT(orrd->od_cache != NULL); LASSERT(binheap_is_empty(orrd->od_binheap)); binheap_destroy(orrd->od_binheap); - cfs_hash_putref(orrd->od_obj_hash); + if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_TRR, + NRS_POL_NAME_MAX) == 0) { + struct nrs_orr_object *orro; + unsigned long i; + + xa_for_each(&orrd->od_trr_objs, i, orro) { + xa_erase(&orrd->od_trr_objs, i); + OBD_SLAB_FREE_PTR(orro, orrd->od_cache); + } + xa_destroy(&orrd->od_trr_objs); + } else { + rhashtable_free_and_destroy(&orrd->od_obj_hash, + nrs_orr_hash_exit, NULL); + } kmem_cache_destroy(orrd->od_cache); OBD_FREE_PTR(orrd); @@ -825,12 +725,11 @@ static int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy, const struct ptlrpc_nrs_resource *parent, struct ptlrpc_nrs_resource **resp, bool moving_req) { - struct nrs_orr_data *orrd; - struct nrs_orr_object *orro; - struct nrs_orr_object *tmp; - struct nrs_orr_key key = { { { 0 } } }; - __u32 opc; - int rc = 0; + struct nrs_orr_data *orrd; + struct nrs_orr_object *orro, *orro2; + struct nrs_orr_key key = { { { 0 } } }; + u32 opc; + int rc = 0; /** * struct nrs_orr_data is requested. @@ -864,9 +763,18 @@ static int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy, if (rc < 0) RETURN(rc); - orro = cfs_hash_lookup(orrd->od_obj_hash, &key); - if (orro != NULL) - goto out; + /* Handle the ORR case which involves looking up the orro in the + * hashtable. If not found then insert it. Unlike TRR the orro can + * be deleted in parallel during the life cycle of the object. + */ + rcu_read_lock(); + orro = rhashtable_lookup_fast(&orrd->od_obj_hash, &key, + nrs_orr_hash_params); + if (orro && refcount_inc_not_zero(&orro->oo_ref)) { + rcu_read_unlock(); + goto found_orro; + } + rcu_read_unlock(); OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache, nrs_pol2cptab(policy), nrs_pol2cptid(policy), @@ -875,15 +783,35 @@ static int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy, RETURN(-ENOMEM); orro->oo_key = key; - orro->oo_ref = 1; + refcount_set(&orro->oo_ref, 1); +try_again: + rcu_read_lock(); + orro2 = rhashtable_lookup_get_insert_fast(&orrd->od_obj_hash, + &orro->oo_rhead, + nrs_orr_hash_params); + /* A returned non-error orro2 means it already exist */ + if (!IS_ERR_OR_NULL(orro2)) { + OBD_SLAB_FREE_PTR(orro, orrd->od_cache); + orro = orro2; + } + + /* insertion failed */ + if (IS_ERR(orro2)) { + rc = PTR_ERR(orro2); - tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key, - &orro->oo_hnode); - if (tmp != orro) { + /* hash table could be resizing. */ + if (rc == -ENOMEM || rc == -EBUSY) { + rcu_read_unlock(); + mdelay(20); + goto try_again; + } OBD_SLAB_FREE_PTR(orro, orrd->od_cache); - orro = tmp; + rcu_read_unlock(); + + RETURN(rc); } -out: + rcu_read_unlock(); +found_orro: /** * For debugging purposes */ @@ -894,6 +822,16 @@ out: return 1; } +static void nrs_orr_object_free(struct rcu_head *head) +{ + struct nrs_orr_object *orro = container_of(head, struct nrs_orr_object, + oo_rcu_head); + struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent, + struct nrs_orr_data, od_res); + + OBD_SLAB_FREE_PTR(orro, orrd->od_cache); +} + /** * Called when releasing references to the resource hierachy obtained for a * request for scheduling using ORR/TRR policy instances @@ -914,15 +852,134 @@ static void nrs_orr_res_put(struct ptlrpc_nrs_policy *policy, return; orro = container_of(res, struct nrs_orr_object, oo_res); - orrd = container_of(res->res_parent, struct nrs_orr_data, od_res); + if (!refcount_dec_and_test(&orro->oo_ref)) + return; + + orrd = container_of(orro->oo_res.res_parent, struct nrs_orr_data, od_res); + rhashtable_remove_fast(&orrd->od_obj_hash, &orro->oo_rhead, + nrs_orr_hash_params); + call_rcu(&orro->oo_rcu_head, nrs_orr_object_free); +} + +/** + * Obtains resources for TRR policy instances. The top-level resource lives + * inside \e nrs_orr_data and the second-level resource inside + * \e nrs_orr_object instances. + * + * @policy the policy for which resources are being taken for + * request @nrq + * @nrq the request for which resources are being taken + * @parent parent resource, embedded in nrs_orr_data for the + * TRR policies + * @resp used to return resource references + * @moving_req signifies limited caller context; used to perform + * memory allocations in an atomic context in this + * policy + * + * RETURN 0 we are returning a top-level, parent resource, one that is + * embedded in an nrs_orr_data object + * 1 we are returning a bottom-level resource, one that is embedded + * in an nrs_orr_object object + * + * \see nrs_resource_get_safe() + */ +static int nrs_trr_res_get(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq, + const struct ptlrpc_nrs_resource *parent, + struct ptlrpc_nrs_resource **resp, bool moving_req) +{ + struct nrs_orr_key key = { { { 0 } } }; + struct nrs_orr_object *orro; + struct nrs_orr_data *orrd; + int rc = 0; + u32 opc; + + /** + * struct nrs_orr_data is requested. + */ + if (!parent) { + *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res; + return 0; + } + + orrd = container_of(parent, struct nrs_orr_data, od_res); + + /** + * If the request type is not supported, fail the enqueuing; the RPC + * will be handled by the fallback NRS policy. + */ + if (!nrs_orr_req_supported(orrd, nrq, &opc)) + return -1; + + /** + * This is an attempt to fill in the request key fields while + * moving a request from the regular to the high-priority NRS + * head (via ldlm_lock_reorder_req()), but the request key has + * been adequately filled when nrs_trr_res_get() was called through + * ptlrpc_nrs_req_initialize() for the regular NRS head's TRR + * policy, so there is nothing to do. + */ + if (!nrq->nr_u.orr.or_trr_set) { + struct ptlrpc_request *req; + + /* Bounce unconnected requests to the default policy. */ + req = container_of(nrq, struct ptlrpc_request, rq_nrq); + if (!req->rq_export) + return -ENOTCONN; + + key.ok_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index; + nrq->nr_u.orr.or_trr_set = 1; + } else { + key = nrq->nr_u.orr.or_key; + } + + /** + * Set the offset range the request covers + */ + rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req); + if (rc < 0) + RETURN(rc); + + /* For TRR we just attempt to find the orro via the ok_idx. + * If not found we insert it into the Xarray. + */ +try_again: + orro = xa_load(&orrd->od_trr_objs, key.ok_idx); + if (orro) + goto found_orro; + + OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache, + nrs_pol2cptab(policy), nrs_pol2cptid(policy), + moving_req ? GFP_ATOMIC : GFP_NOFS); + if (!orro) + RETURN(-ENOMEM); + + orro->oo_key = key; + rc = ll_xa_insert(&orrd->od_trr_objs, key.ok_idx, orro, + moving_req ? GFP_ATOMIC : GFP_NOFS); + if (rc < 0) { + OBD_SLAB_FREE_PTR(orro, orrd->od_cache); + if (rc == -EBUSY) + goto try_again; + + RETURN(rc); + } + +found_orro: + /** + * For debugging purposes + */ + nrq->nr_u.orr.or_key = orro->oo_key; + + *resp = &orro->oo_res; - cfs_hash_put(orrd->od_obj_hash, &orro->oo_hnode); + return 1; } /** * Called when polling an ORR/TRR policy instance for a request so that it can * be served. Returns the request that is at the root of the binary heap, as - * that is the lowest priority one (i.e. libcfs_heap is an implementation of a + * that is the lowest priority one (i.e. binheap is an implementation of a * min-heap) * * \param[in] policy the policy instance being polled @@ -1133,29 +1190,36 @@ static void nrs_orr_req_del(struct ptlrpc_nrs_policy *policy, } /** - * Called right after the request \a nrq finishes being handled by ORR policy + * Called right after the request @nrq finishes being handled by ORR policy * instance \a policy. * - * \param[in] policy the policy that handled the request - * \param[in] nrq the request that was handled + * @policy the policy that handled the request + * @nrq the request that was handled */ static void nrs_orr_req_stop(struct ptlrpc_nrs_policy *policy, struct ptlrpc_nrs_request *nrq) { /** NB: resource control, credits etc can be added here */ - if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR, - NRS_POL_NAME_MAX) == 0) - CDEBUG(D_RPCTRACE, - "NRS: finished handling %s request for object with FID " - DFID", from OST with index %u, with round %llu\n", - NRS_POL_NAME_ORR, PFID(&nrq->nr_u.orr.or_key.ok_fid), - nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round); - else - CDEBUG(D_RPCTRACE, - "NRS: finished handling %s request from OST with index %u," - " with round %llu\n", - NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx, - nrq->nr_u.orr.or_round); + CDEBUG(D_RPCTRACE, + "NRS: finished handling ORR request for object with FID "DFID", from OST with index %u, with round %llu\n", + PFID(&nrq->nr_u.orr.or_key.ok_fid), nrq->nr_u.orr.or_key.ok_idx, + nrq->nr_u.orr.or_round); +} + +/** + * Called right after the request @nrq finishes being handled by TRR policy + * instance @policy. + * + * @policy the policy that handled the request + * @nrq the request that was handled + */ +static void nrs_trr_req_stop(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq) +{ + /** NB: resource control, credits etc can be added here */ + CDEBUG(D_RPCTRACE, + "NRS: finished handling TRR request from OST with index %u, with round %llu\n", + nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round); } /** @@ -1949,12 +2013,11 @@ static const struct ptlrpc_nrs_pol_ops nrs_trr_ops = { .op_policy_start = nrs_orr_start, .op_policy_stop = nrs_orr_stop, .op_policy_ctl = nrs_orr_ctl, - .op_res_get = nrs_orr_res_get, - .op_res_put = nrs_orr_res_put, + .op_res_get = nrs_trr_res_get, .op_req_get = nrs_orr_req_get, .op_req_enqueue = nrs_orr_req_add, .op_req_dequeue = nrs_orr_req_del, - .op_req_stop = nrs_orr_req_stop, + .op_req_stop = nrs_trr_req_stop, .op_lprocfs_init = nrs_trr_lprocfs_init, }; -- 1.8.3.1