From: Nikitas Angelinas Date: Wed, 9 Jan 2013 02:40:21 +0000 (+0000) Subject: LU-398 ptlrpc: Add the NRS ORR and TRR policies X-Git-Tag: 2.3.65~71 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=c85f006d40cc5c9504ea873fc815ce2eaa1ee062 LU-398 ptlrpc: Add the NRS ORR and TRR policies The ORR (Object-based Round Robin) policy schedules brw RPCs in per-backend-filesystem-object groupings; RPCs in each group are sorted according to their logical file or physical disk offsets. The TRR (Target-based Round Robin) policy performs the same function as ORR, but instead schedules brw RPCs in per-OST groupings. Both these policies aim to provide for increased read throughput in certain use cases, either by minimizing costly disk seek operations (by ordering OST_READ, and perhaps also OST_WRITE RPCs), but may also allow for improved performance through better resource utilization and by taking advantage of locality of reference characteristics of the I/O load. Signed-off-by: Nikitas Angelinas Co-authored-by: Liang Zhen Change-Id: I1f5a367f2f4a1cf296a3b38f3e395ab28a10668e Oracle-bug-id: b=13634 Xyratex-bug-id: MRP-73 Reviewed-on: http://review.whamcloud.com/4938 Tested-by: Hudson Reviewed-by: Andreas Dilger Tested-by: Maloo Reviewed-by: Lai Siyao --- diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index c1bfedb..3ec5a24 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -787,6 +787,18 @@ enum ptlrpc_nrs_ctl { }; /** + * ORR policy operations + */ +enum nrs_ctl_orr { + NRS_CTL_ORR_RD_QUANTUM = PTLRPC_NRS_CTL_1ST_POL_SPEC, + NRS_CTL_ORR_WR_QUANTUM, + NRS_CTL_ORR_RD_OFF_TYPE, + NRS_CTL_ORR_WR_OFF_TYPE, + NRS_CTL_ORR_RD_SUPP_REQ, + NRS_CTL_ORR_WR_SUPP_REQ, +}; + +/** * NRS policy operations. * * These determine the behaviour of a policy, and are called in response to @@ -1505,6 +1517,184 @@ enum nrs_ctl_crr { /** @} CRR-N */ /** + * \name ORR/TRR + * + * ORR/TRR (Object-based Round Robin/Target-based Round Robin) NRS policies + * @{ + */ + +/** + * Lower and upper byte offsets of a brw RPC + */ +struct nrs_orr_req_range { + __u64 or_start; + __u64 or_end; +}; + +/** + * RPC types supported by the ORR/TRR policies + */ +enum nrs_orr_supp { + NOS_OST_READ = (1 << 0), + NOS_OST_WRITE = (1 << 1), + NOS_OST_RW = (NOS_OST_READ | NOS_OST_WRITE), + /** + * Default value for policies. + */ + NOS_DFLT = NOS_OST_READ +}; + +/** + * As unique keys for grouping RPCs together, we use the object's OST FID for + * the ORR policy, and the OST index for the TRR policy. + * + * XXX: We waste some space for TRR policy instances by using a union, but it + * allows to consolidate some of the code between ORR and TRR, and these + * policies will probably eventually merge into one anyway. + */ +struct nrs_orr_key { + union { + /** object FID for ORR */ + struct lu_fid ok_fid; + /** OST index for TRR */ + __u32 ok_idx; + }; +}; + +/** + * The largest base string for unique hash/slab object names is + * "nrs_orr_reg_", so 13 characters. We add 3 to this to be used for the CPT + * id number, so this _should_ be more than enough for the maximum number of + * CPTs on any system. If it does happen that this statement is incorrect, + * nrs_orr_genobjname() will inevitably yield a non-unique name and cause + * cfs_mem_cache_create() to complain (on Linux), so the erroneous situation + * will hopefully not go unnoticed. + */ +#define NRS_ORR_OBJ_NAME_MAX (sizeof("nrs_orr_reg_") + 3) + +/** + * private data structure for ORR and TRR NRS + */ +struct nrs_orr_data { + struct ptlrpc_nrs_resource od_res; + cfs_binheap_t *od_binheap; + cfs_hash_t *od_obj_hash; + cfs_mem_cache_t *od_cache; + /** + * Used when a new scheduling round commences, in order to synchronize + * all object or OST batches with the new round number. + */ + __u64 od_round; + /** + * Determines the relevant ordering amongst request batches within a + * scheduling round. + */ + __u64 od_sequence; + /** + * RPC types that are currently supported. + */ + enum nrs_orr_supp od_supp; + /** + * Round Robin quantum; the maxium number of RPCs that each request + * batch for each object or OST can have in a scheduling round. + */ + __u16 od_quantum; + /** + * Whether to use physical disk offsets or logical file offsets. + */ + bool od_physical; + /** + * XXX: We need to provide a persistently allocated string to hold + * unique object names for this policy, since in currently supported + * versions of Linux by Lustre, kmem_cache_create() just sets a pointer + * to the name string provided. kstrdup() is used in the version of + * kmeme_cache_create() in current Linux mainline, so we may be able to + * remove this in the future. + */ + char od_objname[NRS_ORR_OBJ_NAME_MAX]; +}; + +/** + * Represents a backend-fs object or OST in the ORR and TRR policies + * respectively + */ +struct nrs_orr_object { + struct ptlrpc_nrs_resource oo_res; + cfs_hlist_node_t oo_hnode; + /** + * The round number against which requests are being scheduled for this + * object or OST + */ + __u64 oo_round; + /** + * The sequence number used for requests scheduled for this object or + * OST during the current round number. + */ + __u64 oo_sequence; + /** + * The key of the object or OST for which this structure instance is + * scheduling RPCs + */ + struct nrs_orr_key oo_key; + cfs_atomic_t oo_ref; + /** + * Round Robin quantum; the maximum number of RPCs that are allowed to + * be scheduled for the object or OST in a single batch of each round. + */ + __u16 oo_quantum; + /** + * # of pending requests for this object or OST, on all existing rounds + */ + __u16 oo_active; +}; + +/** + * ORR/TRR NRS request definition + */ +struct nrs_orr_req { + /** + * The offset range this request covers + */ + struct nrs_orr_req_range or_range; + /** + * Round number for this request; shared with all other requests in the + * same batch. + */ + __u64 or_round; + /** + * Sequence number for this request; shared with all other requests in + * the same batch. + */ + __u64 or_sequence; + /** + * For debugging purposes. + */ + struct nrs_orr_key or_key; + /** + * An ORR policy instance has filled in request information while + * enqueueing the request on the service partition's regular NRS head. + */ + unsigned int or_orr_set:1; + /** + * A TRR policy instance has filled in request information while + * enqueueing the request on the service partition's regular NRS head. + */ + unsigned int or_trr_set:1; + /** + * Request offset ranges have been filled in with logical offset + * values. + */ + unsigned int or_logical_set:1; + /** + * Request offset ranges have been filled in with physical offset + * values. + */ + unsigned int or_physical_set:1; +}; + +/** @} ORR/TRR */ + +/** * NRS request * * Instances of this object exist embedded within ptlrpc_request; the main @@ -1543,6 +1733,8 @@ struct ptlrpc_nrs_request { * CRR-N request defintion */ struct nrs_crrn_req crr; + /** ORR and TRR share the same request definition */ + struct nrs_orr_req orr; } nr_u; /** * Externally-registering policies may want to use this to allocate diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 06396a0..9c9ae28 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -824,7 +824,7 @@ do { \ #define OBD_SLAB_ALLOC_PTR_GFP(ptr, slab, flags) \ OBD_SLAB_ALLOC_GFP(ptr, slab, sizeof *(ptr), flags) -#define OBD_SLAB_CPT_ALLOC_PTR_GFP(ptr, slab, ctab, cpt, flags) \ +#define OBD_SLAB_CPT_ALLOC_PTR_GFP(ptr, slab, cptab, cpt, flags) \ OBD_SLAB_CPT_ALLOC_GFP(ptr, slab, cptab, cpt, sizeof *(ptr), flags) #define OBD_SLAB_FREE_PTR(ptr, slab) \ diff --git a/lustre/ptlrpc/Makefile.in b/lustre/ptlrpc/Makefile.in index 9cf66f2..ef70c05 100644 --- a/lustre/ptlrpc/Makefile.in +++ b/lustre/ptlrpc/Makefile.in @@ -14,7 +14,7 @@ ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o ptlrpc_objs += sec.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o -ptlrpc_objs += sec_null.o sec_plain.o nrs.o nrs_fifo.o nrs_crr.o +ptlrpc_objs += sec_null.o sec_plain.o nrs.o nrs_fifo.o nrs_crr.o nrs_orr.o target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o diff --git a/lustre/ptlrpc/autoMakefile.am b/lustre/ptlrpc/autoMakefile.am index d1f2a83..d27f72b 100644 --- a/lustre/ptlrpc/autoMakefile.am +++ b/lustre/ptlrpc/autoMakefile.am @@ -95,6 +95,7 @@ ptlrpc_SOURCES = \ nrs.c \ nrs_fifo.c \ nrs_crr.c \ + nrs_orr.c \ wiretest.c \ sec.c \ sec_bulk.c \ diff --git a/lustre/ptlrpc/nrs.c b/lustre/ptlrpc/nrs.c index f4700f8..5343ab3 100644 --- a/lustre/ptlrpc/nrs.c +++ b/lustre/ptlrpc/nrs.c @@ -1744,6 +1744,9 @@ extern struct ptlrpc_nrs_pol_conf nrs_conf_fifo; #if defined HAVE_SERVER_SUPPORT && defined(__KERNEL__) /* ptlrpc/nrs_crr.c */ extern struct ptlrpc_nrs_pol_conf nrs_conf_crrn; +/* ptlrpc/nrs_orr.c */ +extern struct ptlrpc_nrs_pol_conf nrs_conf_orr; +extern struct ptlrpc_nrs_pol_conf nrs_conf_trr; #endif /** @@ -1769,6 +1772,14 @@ int ptlrpc_nrs_init(void) rc = ptlrpc_nrs_policy_register(&nrs_conf_crrn); if (rc != 0) GOTO(fail, rc); + + rc = ptlrpc_nrs_policy_register(&nrs_conf_orr); + if (rc != 0) + GOTO(fail, rc); + + rc = ptlrpc_nrs_policy_register(&nrs_conf_trr); + if (rc != 0) + GOTO(fail, rc); #endif RETURN(rc); diff --git a/lustre/ptlrpc/nrs_orr.c b/lustre/ptlrpc/nrs_orr.c new file mode 100644 index 0000000..c660916 --- /dev/null +++ b/lustre/ptlrpc/nrs_orr.c @@ -0,0 +1,2034 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License version 2 for more details. A copy is + * included in the COPYING file that accompanied this code. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * GPL HEADER END + */ +/* + * Copyright (c) 2011 Intel Corporation + * + * Copyright 2012 Xyratex Technology Limited + */ +/* + * lustre/ptlrpc/nrs_orr.c + * + * Network Request Scheduler (NRS) ORR and TRR policies + * + * Request scheduling in a Round-Robin manner over backend-fs objects and OSTs + * respectively + * + * Author: Liang Zhen + * Author: Nikitas Angelinas + */ +#ifdef HAVE_SERVER_SUPPORT + +/** + * \addtogoup nrs + * @{ + */ +#define DEBUG_SUBSYSTEM S_RPC +#include +#include +#include +#include +#include +#include "ptlrpc_internal.h" + +/** + * \name ORR/TRR policy + * + * ORR/TRR (Object-based Round Robin/Target-based Round Robin) NRS policies + * + * ORR performs batched Round Robin shceduling of brw RPCs, based on the FID of + * the backend-fs object that the brw RPC pertains to; the TRR policy performs + * batched Round Robin scheduling of brw RPCs, based on the OST index that the + * RPC pertains to. Both policies also order RPCs in each batch in ascending + * offset order, which is lprocfs-tunable between logical file offsets, and + * physical disk offsets, as reported by fiemap. + * + * The TRR policy reuses much of the functionality of ORR. These two scheduling + * algorithms could alternatively be implemented under a single NRS policy, that + * uses an lprocfs tunable in order to switch between the two types of + * scheduling behaviour. The two algorithms have been implemented as separate + * policies for reasons of clarity to the user, and to avoid issues that would + * otherwise arise at the point of switching between behaviours in the case of + * having a single policy, such as resource cleanup for nrs_orr_object + * instances. It is possible that this may need to be re-examined in the future, + * along with potentially coalescing other policies that perform batched request + * scheduling in a Round-Robin manner, all into one policy. + * + * @{ + */ + +#define NRS_POL_NAME_ORR "orr" +#define NRS_POL_NAME_TRR "trr" + +/** + * Checks if the RPC type of \a nrq is currently handled by an ORR/TRR policy + * + * \param[in] orrd the ORR/TRR policy scheduler instance + * \param[in] nrq the request + * \param[out] opcode the opcode is saved here, just in order to avoid calling + * lustre_msg_get_opc() again later + * + * \retval true request type is supported by the policy instance + * \retval false request type is not supported by the policy instance + */ +static bool nrs_orr_req_supported(struct nrs_orr_data *orrd, + struct ptlrpc_nrs_request *nrq, __u32 *opcode) +{ + struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request, + rq_nrq); + __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); + bool rc = false; + + /** + * XXX: nrs_orr_data::od_supp accessed unlocked. + */ + switch (opc) { + case OST_READ: + rc = orrd->od_supp & NOS_OST_READ; + break; + case OST_WRITE: + rc = orrd->od_supp & NOS_OST_WRITE; + break; + } + + if (rc) + *opcode = opc; + + return rc; +} + +/** + * Returns the ORR/TRR key fields for the request \a nrq in \a key. + * + * \param[in] orrd the ORR/TRR policy scheduler instance + * \param[in] nrq the request + * \param[in] opc the request's opcode + * \param[in] name the policy name + * \param[out] key fields of the key are returned here. + * + * \retval 0 key filled successfully + * \retval < 0 error + */ +static int nrs_orr_key_fill(struct nrs_orr_data *orrd, + struct ptlrpc_nrs_request *nrq, __u32 opc, + char *name, struct nrs_orr_key *key) +{ + struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request, + rq_nrq); + struct ost_body *body; + __u32 ost_idx; + bool is_orr = strncmp(name, NRS_POL_NAME_ORR, + NRS_POL_NAME_MAX) == 0; + + LASSERT(req != NULL); + + /** + * This is an attempt to fill in the request key fields while + * moving a request from the regular to the high-priority NRS + * head (via ldlm_lock_reorder_req()), but the request key has + * been adequately filled when nrs_orr_res_get() was called through + * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR/TRR + * policy, so there is nothing to do. + */ + if ((is_orr && nrq->nr_u.orr.or_orr_set) || + (!is_orr && nrq->nr_u.orr.or_trr_set)) { + *key = nrq->nr_u.orr.or_key; + return 0; + } + + if (nrq->nr_u.orr.or_orr_set || nrq->nr_u.orr.or_trr_set) + memset(&nrq->nr_u.orr.or_key, 0, sizeof(nrq->nr_u.orr.or_key)); + + ost_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index; + + if (is_orr) { + int rc; + /** + * The request pill for OST_READ and OST_WRITE requests is + * initialized in the ost_io service's + * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(), + * so no need to redo it here. + */ + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + RETURN(-EFAULT); + + rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx); + if (rc < 0) + return rc; + + nrq->nr_u.orr.or_orr_set = 1; + } else { + key->ok_idx = ost_idx; + nrq->nr_u.orr.or_trr_set = 1; + } + + return 0; +} + +/** + * Populates the range values in \a range with logical offsets obtained via + * \a nb. + * + * \param[in] nb niobuf_remote struct array for this request + * \param[in] niocount count of niobuf_remote structs for this request + * \param[out] range the offset range is returned here + */ +static void nrs_orr_range_fill_logical(struct niobuf_remote *nb, int niocount, + struct nrs_orr_req_range *range) +{ + /* Should we do this at page boundaries ? */ + range->or_start = nb[0].offset & CFS_PAGE_MASK; + range->or_end = (nb[niocount - 1].offset + + nb[niocount - 1].len - 1) | ~CFS_PAGE_MASK; +} + +/** + * We obtain information just for a single extent, as the request can only be in + * a single place in the binary heap anyway. + */ +#define ORR_NUM_EXTENTS 1 + +/** + * Converts the logical file offset range in \a range, to a physical disk offset + * range in \a range, for a request. Uses obd_get_info() in order to carry out a + * fiemap call and obtain backend-fs extent information. The returned range is + * in physical block numbers. + * + * \param[in] nrq the request + * \param[in] oa obdo struct for this request + * \param[in,out] range the offset range in bytes; logical range in, physical + * range out + * + * \retval 0 physical offsets obtained successfully + * \retvall < 0 error + */ +static int nrs_orr_range_fill_physical(struct ptlrpc_nrs_request *nrq, + struct obdo *oa, + struct nrs_orr_req_range *range) +{ + struct ptlrpc_request *req = container_of(nrq, + struct ptlrpc_request, + rq_nrq); + char fiemap_buf[offsetof(struct ll_user_fiemap, + fm_extents[ORR_NUM_EXTENTS])]; + struct ll_user_fiemap *fiemap = (struct ll_user_fiemap *)fiemap_buf; + struct ll_fiemap_info_key key; + loff_t start; + loff_t end; + int rc; + + key = (typeof(key)) { + .name = KEY_FIEMAP, + .oa = *oa, + .fiemap = { + .fm_start = range->or_start, + .fm_length = range->or_end - range->or_start, + .fm_extent_count = ORR_NUM_EXTENTS + } + }; + + rc = obd_get_info(req->rq_svc_thread->t_env, req->rq_export, + sizeof(key), &key, NULL, fiemap, NULL); + if (rc < 0) + GOTO(out, rc); + + if (fiemap->fm_mapped_extents == 0 || + fiemap->fm_mapped_extents > ORR_NUM_EXTENTS) + GOTO(out, rc = -EFAULT); + + /** + * Calculate the physical offset ranges for the request from the extent + * information and the logical request offsets. + */ + start = fiemap->fm_extents[0].fe_physical + range->or_start - + fiemap->fm_extents[0].fe_logical; + end = start + range->or_end - range->or_start; + + range->or_start = start; + range->or_end = end; + + nrq->nr_u.orr.or_physical_set = 1; +out: + return rc; +} + +/** + * Sets the offset range the request covers; either in logical file + * offsets or in physical disk offsets. + * + * \param[in] nrq the request + * \param[in] orrd the ORR/TRR policy scheduler instance + * \param[in] opc the request's opcode + * \param[in] moving_req is the request in the process of moving onto the + * high-priority NRS head? + * + * \retval 0 range filled successfully + * \retval != 0 error + */ +static int nrs_orr_range_fill(struct ptlrpc_nrs_request *nrq, + struct nrs_orr_data *orrd, __u32 opc, + bool moving_req) +{ + struct ptlrpc_request *req = container_of(nrq, + struct ptlrpc_request, + rq_nrq); + struct obd_ioobj *ioo; + struct niobuf_remote *nb; + struct ost_body *body; + struct nrs_orr_req_range range; + int niocount; + int rc = 0; + + /** + * If we are scheduling using physical disk offsets, but we have filled + * the offset information in the request previously + * (i.e. ldlm_lock_reorder_req() is moving the request to the + * high-priority NRS head), there is no need to do anything, and we can + * exit. Moreover than the lack of need, we would be unable to perform + * the obd_get_info() call required in nrs_orr_range_fill_physical(), + * because ldlm_lock_reorder_lock() calls into here while holding a + * spinlock, and retrieving fiemap information via obd_get_info() is a + * potentially sleeping operation. + */ + if (orrd->od_physical && nrq->nr_u.orr.or_physical_set) + return 0; + + ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ); + if (ioo == NULL) + GOTO(out, rc = -EFAULT); + + niocount = ioo->ioo_bufcnt; + + nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE); + if (nb == NULL) + GOTO(out, rc = -EFAULT); + + /** + * Use logical information from niobuf_remote structures. + */ + nrs_orr_range_fill_logical(nb, niocount, &range); + + /** + * Obtain physical offsets if selected, and this is an OST_READ RPC + * RPC. We do not enter this block if moving_req is set which indicates + * that the request is being moved to the high-priority NRS head by + * ldlm_lock_reorder_req(), as that function calls in here while holding + * a spinlock, and nrs_orr_range_physical() can sleep, so we just use + * logical file offsets for the range values for such requests. + */ + if (orrd->od_physical && opc == OST_READ && !moving_req) { + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EFAULT); + + /** + * Translate to physical block offsets from backend filesystem + * extents. + * Ignore return values; if obtaining the physical offsets + * fails, use the logical offsets. + */ + nrs_orr_range_fill_physical(nrq, &body->oa, &range); + } + + nrq->nr_u.orr.or_range = range; +out: + return rc; +} + +/** + * Generates a character string that can be used in order to register uniquely + * named libcfs_hash and slab objects for ORR/TRR policy instances. The + * character string is unique per policy instance, as it includes the policy's + * name, the CPT number, and a {reg|hp} token, and there is one policy instance + * per NRS head on each CPT, and the policy is only compatible with the ost_io + * service. + * + * \param[in] policy the policy instance + * \param[out] name the character array that will hold the generated name + */ +static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name) +{ + snprintf(name, NRS_ORR_OBJ_NAME_MAX, "%s%s%s%d", + "nrs_", policy->pol_desc->pd_name, + policy->pol_nrs->nrs_queue_type == PTLRPC_NRS_QUEUE_REG ? + "_reg_" : "_hp_", nrs_pol2cptid(policy)); +} + +/** + * ORR/TRR hash operations + */ +#define NRS_ORR_BITS 24 +#define NRS_ORR_BKT_BITS 12 +#define NRS_ORR_HASH_FLAGS (CFS_HASH_RW_BKTLOCK | CFS_HASH_ASSERT_EMPTY) + +#define NRS_TRR_BITS 4 +#define NRS_TRR_BKT_BITS 2 +#define NRS_TRR_HASH_FLAGS CFS_HASH_RW_BKTLOCK + +static unsigned nrs_orr_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask) +{ + return cfs_hash_djb2_hash(key, sizeof(struct nrs_orr_key), mask); +} + +static void *nrs_orr_hop_key(cfs_hlist_node_t *hnode) +{ + struct nrs_orr_object *orro = cfs_hlist_entry(hnode, + struct nrs_orr_object, + oo_hnode); + return &orro->oo_key; +} + +static int nrs_orr_hop_keycmp(const void *key, cfs_hlist_node_t *hnode) +{ + struct nrs_orr_object *orro = cfs_hlist_entry(hnode, + struct nrs_orr_object, + oo_hnode); + + return lu_fid_eq(&orro->oo_key.ok_fid, + &((struct nrs_orr_key *)key)->ok_fid); +} + +static void *nrs_orr_hop_object(cfs_hlist_node_t *hnode) +{ + return cfs_hlist_entry(hnode, struct nrs_orr_object, oo_hnode); +} + +static void nrs_orr_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct nrs_orr_object *orro = cfs_hlist_entry(hnode, + struct nrs_orr_object, + oo_hnode); + cfs_atomic_inc(&orro->oo_ref); +} + +/** + * Removes an nrs_orr_object the hash and frees its memory, if the object has + * no active users. + */ +static void nrs_orr_hop_put_free(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct nrs_orr_object *orro = cfs_hlist_entry(hnode, + struct nrs_orr_object, + oo_hnode); + struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent, + struct nrs_orr_data, od_res); + cfs_hash_bd_t bds[2]; + + if (cfs_atomic_dec_return(&orro->oo_ref) > 1) + return; + + cfs_hash_lock(hs, 0); + cfs_hash_dual_bd_get_and_lock(hs, &orro->oo_key, bds, 1); + + /** + * Another thread may have won the race and taken a reference on the + * nrs_orr_object. + */ + if (cfs_atomic_read(&orro->oo_ref) > 1) + goto lost_race; + + if (bds[1].bd_bucket == NULL) + cfs_hash_bd_del_locked(hs, &bds[0], hnode); + else + hnode = cfs_hash_dual_bd_finddel_locked(hs, bds, &orro->oo_key, + hnode); + LASSERT(hnode != NULL); + + OBD_SLAB_FREE_PTR(orro, orrd->od_cache); + +lost_race: + + cfs_hash_dual_bd_unlock(hs, bds, 1); + cfs_hash_unlock(hs, 0); +} + +static void nrs_orr_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct nrs_orr_object *orro = cfs_hlist_entry(hnode, + struct nrs_orr_object, + oo_hnode); + cfs_atomic_dec(&orro->oo_ref); +} + +static int nrs_trr_hop_keycmp(const void *key, cfs_hlist_node_t *hnode) +{ + struct nrs_orr_object *orro = cfs_hlist_entry(hnode, + struct nrs_orr_object, + oo_hnode); + + return orro->oo_key.ok_idx == ((struct nrs_orr_key *)key)->ok_idx; +} + +static void nrs_trr_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct nrs_orr_object *orro = cfs_hlist_entry(hnode, + struct nrs_orr_object, + oo_hnode); + struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent, + struct nrs_orr_data, od_res); + + LASSERTF(cfs_atomic_read(&orro->oo_ref) == 0, + "Busy NRS TRR policy object for OST with index %u, with %d " + "refs\n", orro->oo_key.ok_idx, cfs_atomic_read(&orro->oo_ref)); + + OBD_SLAB_FREE_PTR(orro, orrd->od_cache); +} + +static cfs_hash_ops_t nrs_orr_hash_ops = { + .hs_hash = nrs_orr_hop_hash, + .hs_key = nrs_orr_hop_key, + .hs_keycmp = nrs_orr_hop_keycmp, + .hs_object = nrs_orr_hop_object, + .hs_get = nrs_orr_hop_get, + .hs_put = nrs_orr_hop_put_free, + .hs_put_locked = nrs_orr_hop_put, +}; + +static cfs_hash_ops_t nrs_trr_hash_ops = { + .hs_hash = nrs_orr_hop_hash, + .hs_key = nrs_orr_hop_key, + .hs_keycmp = nrs_trr_hop_keycmp, + .hs_object = nrs_orr_hop_object, + .hs_get = nrs_orr_hop_get, + .hs_put = nrs_orr_hop_put, + .hs_put_locked = nrs_orr_hop_put, + .hs_exit = nrs_trr_hop_exit, +}; + +#define NRS_ORR_QUANTUM_DFLT 256 + +/** + * Binary heap predicate. + * + * Uses + * ptlrpc_nrs_request::nr_u::orr::or_round, + * ptlrpc_nrs_request::nr_u::orr::or_sequence, and + * ptlrpc_nrs_request::nr_u::orr::or_range to compare two binheap nodes and + * produce a binary predicate that indicates their relative priority, so that + * the binary heap can perform the necessary sorting operations. + * + * \param[in] e1 the first binheap node to compare + * \param[in] e2 the second binheap node to compare + * + * \retval 0 e1 > e2 + * \retval 1 e1 < e2 + */ +static int orr_req_compare(cfs_binheap_node_t *e1, cfs_binheap_node_t *e2) +{ + struct ptlrpc_nrs_request *nrq1; + struct ptlrpc_nrs_request *nrq2; + + nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node); + nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node); + + /** + * Requests have been scheduled against a different scheduling round. + */ + if (nrq1->nr_u.orr.or_round < nrq2->nr_u.orr.or_round) + return 1; + else if (nrq1->nr_u.orr.or_round > nrq2->nr_u.orr.or_round) + return 0; + + /** + * Requests have been scheduled against the same scheduling round, but + * belong to a different batch, i.e. they pertain to a different + * backend-fs object (for ORR policy instances) or OST (for TRR policy + * instances). + */ + if (nrq1->nr_u.orr.or_sequence < nrq2->nr_u.crr.cr_sequence) + return 1; + else if (nrq1->nr_u.orr.or_sequence > nrq2->nr_u.crr.cr_sequence) + return 0; + + /** + * If round numbers and sequence numbers are equal, the two requests + * have been scheduled on the same round, and belong to the same batch, + * which means they pertain to the same backend-fs object (if this is an + * ORR policy instance), or to the same OST (if this is a TRR policy + * instance), so these requests should be sorted by ascending offset + * order. + */ + if (nrq1->nr_u.orr.or_range.or_start < + nrq2->nr_u.orr.or_range.or_start) { + return 1; + } else if (nrq1->nr_u.orr.or_range.or_start > + nrq2->nr_u.orr.or_range.or_start) { + return 0; + } else { + /** + * Requests start from the same offset; Dispatch the shorter one + * first; perhaps slightly more chances of hitting caches like + * this. + */ + return nrq1->nr_u.orr.or_range.or_end < + nrq2->nr_u.orr.or_range.or_end; + } +} + +/** + * ORR binary heap operations + */ +static cfs_binheap_ops_t nrs_orr_heap_ops = { + .hop_enter = NULL, + .hop_exit = NULL, + .hop_compare = orr_req_compare, +}; + +/** + * Prints a warning message if an ORR/TRR policy is started on a service with + * more than one CPT. + * + * \param[in] policy the policy instance + * + * \retval 0 success + */ +static int nrs_orr_init(struct ptlrpc_nrs_policy *policy) +{ + if (policy->pol_nrs->nrs_svcpt->scp_service->srv_ncpts > 1) { + bool is_orr = strncmp(policy->pol_desc->pd_name, + NRS_POL_NAME_ORR, NRS_POL_NAME_MAX) == 0; + + CWARN("A%s %s NRS policy has been registered on a PTLRPC " + "service which has more than one service partition. " + "Please be advised that this policy may perform better " + "on services with only one partition.\n", + is_orr ? "n" : "", policy->pol_desc->pd_name); + } + return 0; +} + +/** + * Called when an ORR policy instance is started. + * + * \param[in] policy the policy + * + * \retval -ENOMEM OOM error + * \retval 0 success + */ +static int nrs_orr_start(struct ptlrpc_nrs_policy *policy) +{ + struct nrs_orr_data *orrd; + cfs_hash_ops_t *ops; + unsigned cur_bits; + unsigned max_bits; + unsigned bkt_bits; + unsigned flags; + int rc = 0; + ENTRY; + + OBD_CPT_ALLOC_PTR(orrd, nrs_pol2cptab(policy), nrs_pol2cptid(policy)); + if (orrd == NULL) + RETURN(-ENOMEM); + + /* + * Binary heap instance for sorted incoming requests. + */ + orrd->od_binheap = cfs_binheap_create(&nrs_orr_heap_ops, + CBH_FLAG_ATOMIC_GROW, 4096, NULL, + nrs_pol2cptab(policy), + nrs_pol2cptid(policy)); + if (orrd->od_binheap == NULL) + GOTO(failed, rc = -ENOMEM); + + nrs_orr_genobjname(policy, orrd->od_objname); + + /** + * Slab cache for NRS ORR/TRR objects. + */ + orrd->od_cache = cfs_mem_cache_create(orrd->od_objname, + sizeof(struct nrs_orr_object), + 0, 0); + if (orrd->od_cache == NULL) + GOTO(failed, rc = -ENOMEM); + + if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR, + NRS_POL_NAME_MAX) == 0) { + ops = &nrs_orr_hash_ops; + cur_bits = NRS_ORR_BITS; + max_bits = NRS_ORR_BITS; + bkt_bits = NRS_ORR_BKT_BITS; + flags = NRS_ORR_HASH_FLAGS; + } else { + ops = &nrs_trr_hash_ops; + cur_bits = NRS_TRR_BITS; + max_bits = NRS_TRR_BITS; + bkt_bits = NRS_TRR_BKT_BITS; + flags = NRS_TRR_HASH_FLAGS; + } + + /** + * Hash for finding objects by struct nrs_orr_key. + * XXX: For TRR, it might be better to avoid using libcfs_hash? + * All that needs to be resolved are OST indices, and they + * will stay relatively stable during an OSS node's lifetime. + */ + orrd->od_obj_hash = cfs_hash_create(orrd->od_objname, cur_bits, + max_bits, bkt_bits, 0, + CFS_HASH_MIN_THETA, + CFS_HASH_MAX_THETA, ops, flags); + if (orrd->od_obj_hash == NULL) + GOTO(failed, rc = -ENOMEM); + + /* XXX: Fields accessed unlocked */ + orrd->od_quantum = NRS_ORR_QUANTUM_DFLT; + orrd->od_supp = NOS_DFLT; + orrd->od_physical = true; + /** + * Set to 1 so that the test inside nrs_orr_req_add() can evaluate to + * true. + */ + orrd->od_sequence = 1; + + policy->pol_private = orrd; + + RETURN(rc); + +failed: + if (orrd->od_cache) { + rc = cfs_mem_cache_destroy(orrd->od_cache); + LASSERTF(rc == 0, "Could not destroy od_cache slab\n"); + } + if (orrd->od_binheap != NULL) + cfs_binheap_destroy(orrd->od_binheap); + + OBD_FREE_PTR(orrd); + + RETURN(rc); +} + +/** + * Called when an ORR/TRR policy instance is stopped. + * + * Called when the policy has been instructed to transition to the + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more + * pending requests to serve. + * + * \param[in] policy the policy + */ +static void nrs_orr_stop(struct ptlrpc_nrs_policy *policy) +{ + struct nrs_orr_data *orrd = policy->pol_private; + ENTRY; + + LASSERT(orrd != NULL); + LASSERT(orrd->od_binheap != NULL); + LASSERT(orrd->od_obj_hash != NULL); + LASSERT(orrd->od_cache != NULL); + LASSERT(cfs_binheap_is_empty(orrd->od_binheap)); + + cfs_binheap_destroy(orrd->od_binheap); + cfs_hash_putref(orrd->od_obj_hash); + cfs_mem_cache_destroy(orrd->od_cache); + + OBD_FREE_PTR(orrd); +} + +/** + * Performs a policy-specific ctl function on ORR/TRR policy instances; similar + * to ioctl. + * + * \param[in] policy the policy instance + * \param[in] opc the opcode + * \param[in,out] arg used for passing parameters and information + * + * \pre spin_is_locked(&policy->pol_nrs->->nrs_lock) + * \post spin_is_locked(&policy->pol_nrs->->nrs_lock) + * + * \retval 0 operation carried successfully + * \retval -ve error + */ +int nrs_orr_ctl(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc, + void *arg) +{ + LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock)); + + switch(opc) { + default: + RETURN(-EINVAL); + + case NRS_CTL_ORR_RD_QUANTUM: { + struct nrs_orr_data *orrd = policy->pol_private; + + *(__u16 *)arg = orrd->od_quantum; + } + break; + + case NRS_CTL_ORR_WR_QUANTUM: { + struct nrs_orr_data *orrd = policy->pol_private; + + orrd->od_quantum = *(__u16 *)arg; + LASSERT(orrd->od_quantum != 0); + } + break; + + case NRS_CTL_ORR_RD_OFF_TYPE: { + struct nrs_orr_data *orrd = policy->pol_private; + + *(bool *)arg = orrd->od_physical; + } + break; + + case NRS_CTL_ORR_WR_OFF_TYPE: { + struct nrs_orr_data *orrd = policy->pol_private; + + orrd->od_physical = *(bool *)arg; + } + break; + + case NRS_CTL_ORR_RD_SUPP_REQ: { + struct nrs_orr_data *orrd = policy->pol_private; + + *(enum nrs_orr_supp *)arg = orrd->od_supp; + } + break; + + case NRS_CTL_ORR_WR_SUPP_REQ: { + struct nrs_orr_data *orrd = policy->pol_private; + + orrd->od_supp = *(enum nrs_orr_supp *)arg; + LASSERT((orrd->od_supp & NOS_OST_RW) != 0); + } + break; + } + RETURN(0); +} + +/** + * Obtains resources for ORR/TRR policy instances. The top-level resource lives + * inside \e nrs_orr_data and the second-level resource inside + * \e nrs_orr_object instances. + * + * \param[in] policy the policy for which resources are being taken for + * request \a nrq + * \param[in] nrq the request for which resources are being taken + * \param[in] parent parent resource, embedded in nrs_orr_data for the + * ORR/TRR policies + * \param[out] resp used to return resource references + * \param[in] moving_req signifies limited caller context; used to perform + * memory allocations in an atomic context in this + * policy + * + * \retval 0 we are returning a top-level, parent resource, one that is + * embedded in an nrs_orr_data object + * \retval 1 we are returning a bottom-level resource, one that is embedded + * in an nrs_orr_object object + * + * \see nrs_resource_get_safe() + */ +int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq, + const struct ptlrpc_nrs_resource *parent, + struct ptlrpc_nrs_resource **resp, bool moving_req) +{ + struct nrs_orr_data *orrd; + struct nrs_orr_object *orro; + struct nrs_orr_object *tmp; + struct nrs_orr_key key = { { { 0 } } }; + __u32 opc; + int rc = 0; + + /** + * struct nrs_orr_data is requested. + */ + if (parent == NULL) { + *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res; + return 0; + } + + orrd = container_of(parent, struct nrs_orr_data, od_res); + + /** + * If the request type is not supported, fail the enqueuing; the RPC + * will be handled by the fallback NRS policy. + */ + if (!nrs_orr_req_supported(orrd, nrq, &opc)) + return -1; + + /** + * Fill in the key for the request; OST FID for ORR policy instances, + * and OST index for TRR policy instances. + */ + rc = nrs_orr_key_fill(orrd, nrq, opc, policy->pol_desc->pd_name, &key); + if (rc < 0) + RETURN(rc); + + /** + * Set the offset range the request covers + */ + rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req); + if (rc < 0) + RETURN(rc); + + orro = cfs_hash_lookup(orrd->od_obj_hash, &key); + if (orro != NULL) + goto out; + + OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache, + nrs_pol2cptab(policy), nrs_pol2cptid(policy), + moving_req ? CFS_ALLOC_ATOMIC : + CFS_ALLOC_IO); + if (orro == NULL) + RETURN(-ENOMEM); + + orro->oo_key = key; + cfs_atomic_set(&orro->oo_ref, 1); + + tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key, + &orro->oo_hnode); + if (tmp != orro) { + OBD_SLAB_FREE_PTR(orro, orrd->od_cache); + orro = tmp; + } +out: + /** + * For debugging purposes + */ + nrq->nr_u.orr.or_key = orro->oo_key; + + *resp = &orro->oo_res; + + return 1; +} + +/** + * Called when releasing references to the resource hierachy obtained for a + * request for scheduling using ORR/TRR policy instances + * + * \param[in] policy the policy the resource belongs to + * \param[in] res the resource to be released + */ +static void nrs_orr_res_put(struct ptlrpc_nrs_policy *policy, + const struct ptlrpc_nrs_resource *res) +{ + struct nrs_orr_data *orrd; + struct nrs_orr_object *orro; + + /** + * Do nothing for freeing parent, nrs_orr_data resources. + */ + if (res->res_parent == NULL) + return; + + orro = container_of(res, struct nrs_orr_object, oo_res); + orrd = container_of(res->res_parent, struct nrs_orr_data, od_res); + + cfs_hash_put(orrd->od_obj_hash, &orro->oo_hnode); +} + +/** + * Called when polling an ORR/TRR policy instance for a request so that it can + * be served. Returns the request that is at the root of the binary heap, as + * that is the lowest priority one (i.e. libcfs_heap is an implementation of a + * min-heap) + * + * \param[in] policy the policy instance being polled + * \param[in] peek when set, signifies that we just want to examine the + * request, and not handle it, so the request is not removed + * from the policy. + * \param[in] force force the policy to return a request; unused in this policy + * + * \retval the request to be handled + * \retval NULL no request available + * + * \see ptlrpc_nrs_req_get_nolock() + * \see nrs_request_get() + */ +static +struct ptlrpc_nrs_request *nrs_orr_req_get(struct ptlrpc_nrs_policy *policy, + bool peek, bool force) +{ + struct nrs_orr_data *orrd = policy->pol_private; + cfs_binheap_node_t *node = cfs_binheap_root(orrd->od_binheap); + struct ptlrpc_nrs_request *nrq; + + nrq = unlikely(node == NULL) ? NULL : + container_of(node, struct ptlrpc_nrs_request, nr_node); + + if (likely(!peek && nrq != NULL)) { + struct nrs_orr_object *orro; + + orro = container_of(nrs_request_resource(nrq), + struct nrs_orr_object, oo_res); + + LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round); + + cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node); + orro->oo_active--; + + if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR, + NRS_POL_NAME_MAX) == 0) + CDEBUG(D_RPCTRACE, + "NRS: starting to handle %s request for object " + "with FID "DFID", from OST with index %u, with " + "round "LPU64"\n", NRS_POL_NAME_ORR, + PFID(&orro->oo_key.ok_fid), + nrq->nr_u.orr.or_key.ok_idx, + nrq->nr_u.orr.or_round); + else + CDEBUG(D_RPCTRACE, + "NRS: starting to handle %s request from OST " + "with index %u, with round "LPU64"\n", + NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx, + nrq->nr_u.orr.or_round); + + /** Peek at the next request to be served */ + node = cfs_binheap_root(orrd->od_binheap); + + /** No more requests */ + if (unlikely(node == NULL)) { + orrd->od_round++; + } else { + struct ptlrpc_nrs_request *next; + + next = container_of(node, struct ptlrpc_nrs_request, + nr_node); + + if (orrd->od_round < next->nr_u.orr.or_round) + orrd->od_round = next->nr_u.orr.or_round; + } + } + + return nrq; +} + +/** + * Sort-adds request \a nrq to an ORR/TRR \a policy instance's set of queued + * requests in the policy's binary heap. + * + * A scheduling round is a stream of requests that have been sorted in batches + * according to the backend-fs object (for ORR policy instances) or OST (for TRR + * policy instances) that they pertain to (as identified by its IDIF FID or OST + * index respectively); there can be only one batch for each object or OST in + * each round. The batches are of maximum size nrs_orr_data:od_quantum. When a + * new request arrives for scheduling for an object or OST that has exhausted + * its quantum in its current round, the request will be scheduled on the next + * scheduling round. Requests are allowed to be scheduled against a round until + * all requests for the round are serviced, so an object or OST might miss a + * round if requests are not scheduled for it for a long enough period of time. + * Objects or OSTs that miss a round will continue with having their next + * request scheduled, starting at the round that requests are being dispatched + * for, at the time of arrival of this request. + * + * Requests are tagged with the round number and a sequence number; the sequence + * number indicates the relative ordering amongst the batches of requests in a + * round, and is identical for all requests in a batch, as is the round number. + * The round and sequence numbers are used by orr_req_compare() in order to use + * nrs_orr_data::od_binheap in order to maintain an ordered set of rounds, with + * each round consisting of an ordered set of batches of requests, and each + * batch consisting of an ordered set of requests according to their logical + * file or physical disk offsets. + * + * \param[in] policy the policy + * \param[in] nrq the request to add + * + * \retval 0 request successfully added + * \retval != 0 error + */ +static int nrs_orr_req_add(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq) +{ + struct nrs_orr_data *orrd; + struct nrs_orr_object *orro; + int rc; + + orro = container_of(nrs_request_resource(nrq), + struct nrs_orr_object, oo_res); + orrd = container_of(nrs_request_resource(nrq)->res_parent, + struct nrs_orr_data, od_res); + + if (orro->oo_quantum == 0 || orro->oo_round < orrd->od_round || + (orro->oo_active == 0 && orro->oo_quantum > 0)) { + + /** + * If there are no pending requests for the object/OST, but some + * of its quantum still remains unused, which implies we did not + * get a chance to schedule up to its maximum allowed batch size + * of requests in the previous round this object/OST + * participated in, schedule this next request on a new round; + * this avoids fragmentation of request batches caused by + * intermittent inactivity on the object/OST, at the expense of + * potentially slightly increased service time for the request + * batch this request will be a part of. + */ + if (orro->oo_active == 0 && orro->oo_quantum > 0) + orro->oo_round++; + + /** A new scheduling round has commenced */ + if (orro->oo_round < orrd->od_round) + orro->oo_round = orrd->od_round; + + /** I was not the last object/OST that scheduled a request */ + if (orro->oo_sequence < orrd->od_sequence) + orro->oo_sequence = ++orrd->od_sequence; + /** + * Reset the quantum if we have reached the maximum quantum + * size for this batch, or even if we have not managed to + * complete a batch size up to its maximum allowed size. + * XXX: Accessed unlocked + */ + orro->oo_quantum = orrd->od_quantum; + } + + nrq->nr_u.crr.cr_round = orro->oo_round; + nrq->nr_u.crr.cr_sequence = orro->oo_sequence; + + rc = cfs_binheap_insert(orrd->od_binheap, &nrq->nr_node); + if (rc == 0) { + orro->oo_active++; + if (--orro->oo_quantum == 0) + orro->oo_round++; + } + return rc; +} + +/** + * Removes request \a nrq from an ORR/TRR \a policy instance's set of queued + * requests. + * + * \param[in] policy the policy + * \param[in] nrq the request to remove + */ +static void nrs_orr_req_del(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq) +{ + struct nrs_orr_data *orrd; + struct nrs_orr_object *orro; + bool is_root; + + orro = container_of(nrs_request_resource(nrq), + struct nrs_orr_object, oo_res); + orrd = container_of(nrs_request_resource(nrq)->res_parent, + struct nrs_orr_data, od_res); + + LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round); + + is_root = &nrq->nr_node == cfs_binheap_root(orrd->od_binheap); + + cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node); + orro->oo_active--; + + /** + * If we just deleted the node at the root of the binheap, we may have + * to adjust round numbers. + */ + if (unlikely(is_root)) { + /** Peek at the next request to be served */ + cfs_binheap_node_t *node = cfs_binheap_root(orrd->od_binheap); + + /** No more requests */ + if (unlikely(node == NULL)) { + orrd->od_round++; + } else { + nrq = container_of(node, struct ptlrpc_nrs_request, + nr_node); + + if (orrd->od_round < nrq->nr_u.orr.or_round) + orrd->od_round = nrq->nr_u.orr.or_round; + } + } +} + +/** + * Called right after the request \a nrq finishes being handled by ORR policy + * instance \a policy. + * + * \param[in] policy the policy that handled the request + * \param[in] nrq the request that was handled + */ +static void nrs_orr_req_stop(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq) +{ + /** NB: resource control, credits etc can be added here */ + if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR, + NRS_POL_NAME_MAX) == 0) + CDEBUG(D_RPCTRACE, + "NRS: finished handling %s request for object with FID " + DFID", from OST with index %u, with round "LPU64"\n", + NRS_POL_NAME_ORR, PFID(&nrq->nr_u.orr.or_key.ok_fid), + nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round); + else + CDEBUG(D_RPCTRACE, + "NRS: finished handling %s request from OST with index %u," + " with round "LPU64"\n", + NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx, + nrq->nr_u.orr.or_round); +} + +/** + * lprocfs interface + */ + +#ifdef LPROCFS + +/** + * This allows to bundle the policy name into the lprocfs_vars::data pointer + * so that lprocfs read/write functions can be used by both the ORR and TRR + * policies. + */ +struct nrs_lprocfs_orr_data { + struct ptlrpc_service *svc; + char *name; +} lprocfs_orr_data = { + .name = NRS_POL_NAME_ORR +}, lprocfs_trr_data = { + .name = NRS_POL_NAME_TRR +}; + +/** + * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size) + * for ORR/TRR policy instances on both the regular and high-priority NRS head + * of a service, as long as a policy instance is not in the + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this + * state are skipped later by nrs_orr_ctl(). + * + * Quantum values are in # of RPCs, and the output is in YAML format. + * + * For example: + * + * reg_quantum:256 + * hp_quantum:8 + * + * XXX: the CRR-N version of this, ptlrpc_lprocfs_rd_nrs_crrn_quantum() is + * almost identical; it can be reworked and then reused for ORR/TRR. + */ +static int ptlrpc_lprocfs_rd_nrs_orr_quantum(char *page, char **start, + off_t off, int count, int *eof, + void *data) +{ + struct nrs_lprocfs_orr_data *orr_data = data; + struct ptlrpc_service *svc = orr_data->svc; + __u16 quantum; + int rc; + int rc2 = 0; + + /** + * Perform two separate calls to this as only one of the NRS heads' + * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state. + */ + rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG, + orr_data->name, + NRS_CTL_ORR_RD_QUANTUM, + true, &quantum); + if (rc == 0) { + *eof = 1; + rc2 = snprintf(page, count, NRS_LPROCFS_QUANTUM_NAME_REG + "%-5d\n", quantum); + /** + * Ignore -ENODEV as the regular NRS head's policy may be in the + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state. + */ + } else if (rc != -ENODEV) { + return rc; + } + + /** + * We know the ost_io service which is the only one ORR/TRR policies are + * compatible with, do have an HP NRS head, but it may be best to guard + * against a possible change of this in the future. + */ + if (!nrs_svc_has_hp(svc)) + goto no_hp; + + rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP, + orr_data->name, NRS_CTL_ORR_RD_QUANTUM, + true, &quantum); + if (rc == 0) { + *eof = 1; + rc2 += snprintf(page + rc2, count - rc2, + NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum); + /** + * Ignore -ENODEV as the high priority NRS head's policy may be + * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state. + */ + } else if (rc != -ENODEV) { + return rc; + } + +no_hp: + + return rc2 ? : rc; +} + +/** + * Sets the value of the Round Robin quantum (i.e. the maximum batch size) + * for ORR/TRR policy instances of a service. The user can set the quantum size + * for the regular and high priority NRS head separately by specifying each + * value, or both together in a single invocation. + * + * For example: + * + * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=req_quantum:64, to set the + * request quantum size of the ORR policy instance on the regular NRS head of + * the ost_io service to 64 + * + * lctl set_param ost.OSS.ost_io.nrs_trr_quantum=hp_quantum:8 to set the request + * quantum size of the TRR policy instance on the high priority NRS head of the + * ost_io service to 8 + * + * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=32, to set both the request + * quantum size of the ORR policy instance on both the regular and the high + * priority NRS head of the ost_io service to 32 + * + * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state + * are skipped later by nrs_orr_ctl(). + * + * XXX: the CRR-N version of this, ptlrpc_lprocfs_wr_nrs_crrn_quantum() is + * almost identical; it can be reworked and then reused for ORR/TRR. + */ +static int ptlrpc_lprocfs_wr_nrs_orr_quantum(struct file *file, + const char *buffer, + unsigned long count, void *data) +{ + struct nrs_lprocfs_orr_data *orr_data = data; + struct ptlrpc_service *svc = orr_data->svc; + enum ptlrpc_nrs_queue_type queue = 0; + char kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD]; + char *val; + long quantum_reg; + long quantum_hp; + /** lprocfs_find_named_value() modifies its argument, so keep a copy */ + unsigned long count_copy; + int rc = 0; + int rc2 = 0; + + if (count > (sizeof(kernbuf) - 1)) + return -EINVAL; + + if (cfs_copy_from_user(kernbuf, buffer, count)) + return -EFAULT; + + kernbuf[count] = '\0'; + + count_copy = count; + + /** + * Check if the regular quantum value has been specified + */ + val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG, + &count_copy); + if (val != kernbuf) { + quantum_reg = simple_strtol(val, NULL, 10); + + queue |= PTLRPC_NRS_QUEUE_REG; + } + + count_copy = count; + + /** + * Check if the high priority quantum value has been specified + */ + val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP, + &count_copy); + if (val != kernbuf) { + if (!nrs_svc_has_hp(svc)) + return -ENODEV; + + quantum_hp = simple_strtol(val, NULL, 10); + + queue |= PTLRPC_NRS_QUEUE_HP; + } + + /** + * If none of the queues has been specified, look for a valid numerical + * value + */ + if (queue == 0) { + if (!isdigit(kernbuf[0])) + return -EINVAL; + + quantum_reg = simple_strtol(kernbuf, NULL, 10); + + queue = PTLRPC_NRS_QUEUE_REG; + + if (nrs_svc_has_hp(svc)) { + queue |= PTLRPC_NRS_QUEUE_HP; + quantum_hp = quantum_reg; + } + } + + if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) && + ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) || + (((queue & PTLRPC_NRS_QUEUE_HP) != 0) && + ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0)))) + return -EINVAL; + + /** + * We change the values on regular and HP NRS heads separately, so that + * we do not exit early from ptlrpc_nrs_policy_control() with an error + * returned by nrs_policy_ctl_locked(), in cases where the user has not + * started the policy on either the regular or HP NRS head; i.e. we are + * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned + * only if the operation fails with -ENODEV on all heads that have been + * specified by the command; if at least one operation succeeds, + * success is returned. + */ + if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) { + rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG, + orr_data->name, + NRS_CTL_ORR_WR_QUANTUM, false, + &quantum_reg); + if ((rc < 0 && rc != -ENODEV) || + (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG)) + return rc; + } + + if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) { + rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP, + orr_data->name, + NRS_CTL_ORR_WR_QUANTUM, false, + &quantum_hp); + if ((rc2 < 0 && rc2 != -ENODEV) || + (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP)) + return rc2; + } + + return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count; +} + +#define LPROCFS_NRS_OFF_NAME_REG "reg_offset_type:" +#define LPROCFS_NRS_OFF_NAME_HP "hp_offset_type:" + +#define LPROCFS_NRS_OFF_NAME_PHYSICAL "physical" +#define LPROCFS_NRS_OFF_NAME_LOGICAL "logical" + +/** + * Retrieves the offset type used by ORR/TRR policy instances on both the + * regular and high-priority NRS head of a service, as long as a policy + * instance is not in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; + * policy instances in this state are skipped later by nrs_orr_ctl(). + * + * Offset type information is a (physical|logical) string, and output is + * in YAML format. + * + * For example: + * + * reg_offset_type:physical + * hp_offset_type:logical + */ +static int ptlrpc_lprocfs_rd_nrs_orr_offset_type(char *page, char **start, + off_t off, int count, int *eof, + void *data) +{ + struct nrs_lprocfs_orr_data *orr_data = data; + struct ptlrpc_service *svc = orr_data->svc; + bool physical; + int rc; + int rc2 = 0; + + /** + * Perform two separate calls to this as only one of the NRS heads' + * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED + * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state. + */ + rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG, + orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE, + true, &physical); + if (rc == 0) { + *eof = 1; + rc2 = snprintf(page, count, + LPROCFS_NRS_OFF_NAME_REG"%s\n", + physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL : + LPROCFS_NRS_OFF_NAME_LOGICAL); + /** + * Ignore -ENODEV as the regular NRS head's policy may be in the + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state. + */ + } else if (rc != -ENODEV) { + return rc; + } + + /** + * We know the ost_io service which is the only one ORR/TRR policies are + * compatible with, do have an HP NRS head, but it may be best to guard + * against a possible change of this in the future. + */ + if (!nrs_svc_has_hp(svc)) + goto no_hp; + + rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP, + orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE, + true, &physical); + if (rc == 0) { + *eof = 1; + rc2 += snprintf(page + rc2, count - rc2, + LPROCFS_NRS_OFF_NAME_HP"%s\n", + physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL : + LPROCFS_NRS_OFF_NAME_LOGICAL); + /** + * Ignore -ENODEV as the high priority NRS head's policy may be + * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state. + */ + } else if (rc != -ENODEV) { + return rc; + } + +no_hp: + + return rc2 ? : rc; +} + +/** + * Max valid command string is the size of the labels, plus "physical" twice. + * plus a separating ' ' + */ +#define LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD \ + sizeof(LPROCFS_NRS_OFF_NAME_REG LPROCFS_NRS_OFF_NAME_PHYSICAL " " \ + LPROCFS_NRS_OFF_NAME_HP LPROCFS_NRS_OFF_NAME_PHYSICAL) + +/** + * Sets the type of offsets used to order RPCs in ORR/TRR policy instances. The + * user can set offset type for the regular or high priority NRS head + * separately by specifying each value, or both together in a single invocation. + * + * For example: + * + * lctl set_param ost.OSS.ost_io.nrs_orr_offset_type= + * reg_offset_type:physical, to enable the ORR policy instance on the regular + * NRS head of the ost_io service to use physical disk offset ordering. + * + * lctl set_param ost.OSS.ost_io.nrs_trr_offset_type=logical, to enable the TRR + * policy instances on both the regular ang high priority NRS heads of the + * ost_io service to use logical file offset ordering. + * + * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are + * are skipped later by nrs_orr_ctl(). + */ +static int ptlrpc_lprocfs_wr_nrs_orr_offset_type(struct file *file, + const char *buffer, + unsigned long count, + void *data) +{ + struct nrs_lprocfs_orr_data *orr_data = data; + struct ptlrpc_service *svc = orr_data->svc; + enum ptlrpc_nrs_queue_type queue = 0; + char kernbuf[LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD]; + char *val_reg; + char *val_hp; + bool physical_reg; + bool physical_hp; + unsigned long count_copy; + int rc = 0; + int rc2 = 0; + + if (count > (sizeof(kernbuf) - 1)) + return -EINVAL; + + if (cfs_copy_from_user(kernbuf, buffer, count)) + return -EFAULT; + + kernbuf[count] = '\0'; + + count_copy = count; + + /** + * Check if the regular offset type has been specified + */ + val_reg = lprocfs_find_named_value(kernbuf, + LPROCFS_NRS_OFF_NAME_REG, + &count_copy); + if (val_reg != kernbuf) + queue |= PTLRPC_NRS_QUEUE_REG; + + count_copy = count; + + /** + * Check if the high priority offset type has been specified + */ + val_hp = lprocfs_find_named_value(kernbuf, LPROCFS_NRS_OFF_NAME_HP, + &count_copy); + if (val_hp != kernbuf) { + if (!nrs_svc_has_hp(svc)) + return -ENODEV; + + queue |= PTLRPC_NRS_QUEUE_HP; + } + + /** + * If none of the queues has been specified, there may be a valid + * command string at the start of the buffer. + */ + if (queue == 0) { + queue = PTLRPC_NRS_QUEUE_REG; + + if (nrs_svc_has_hp(svc)) + queue |= PTLRPC_NRS_QUEUE_HP; + } + + if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) { + if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_PHYSICAL, + sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0) + physical_reg = true; + else if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_LOGICAL, + sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0) + physical_reg = false; + else + return -EINVAL; + } + + if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) { + if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_PHYSICAL, + sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0) + physical_hp = true; + else if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_LOGICAL, + sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0) + physical_hp = false; + else + return -EINVAL; + } + + /** + * We change the values on regular and HP NRS heads separately, so that + * we do not exit early from ptlrpc_nrs_policy_control() with an error + * returned by nrs_policy_ctl_locked(), in cases where the user has not + * started the policy on either the regular or HP NRS head; i.e. we are + * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned + * only if the operation fails with -ENODEV on all heads that have been + * specified by the command; if at least one operation succeeds, + * success is returned. + */ + if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) { + rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG, + orr_data->name, + NRS_CTL_ORR_WR_OFF_TYPE, false, + &physical_reg); + if ((rc < 0 && rc != -ENODEV) || + (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG)) + return rc; + } + + if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) { + rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP, + orr_data->name, + NRS_CTL_ORR_WR_OFF_TYPE, false, + &physical_hp); + if ((rc2 < 0 && rc2 != -ENODEV) || + (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP)) + return rc2; + } + + return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count; +} + +#define NRS_LPROCFS_REQ_SUPP_NAME_REG "reg_supported:" +#define NRS_LPROCFS_REQ_SUPP_NAME_HP "hp_supported:" + +#define LPROCFS_NRS_SUPP_NAME_READS "reads" +#define LPROCFS_NRS_SUPP_NAME_WRITES "writes" +#define LPROCFS_NRS_SUPP_NAME_READWRITES "reads_and_writes" + +/** + * Translates enum nrs_orr_supp values to a corresponding string. + */ +static const char *nrs_orr_supp2str(enum nrs_orr_supp supp) +{ + switch(supp) { + default: + LBUG(); + case NOS_OST_READ: + return LPROCFS_NRS_SUPP_NAME_READS; + case NOS_OST_WRITE: + return LPROCFS_NRS_SUPP_NAME_WRITES; + case NOS_OST_RW: + return LPROCFS_NRS_SUPP_NAME_READWRITES; + } +} + +/** + * Translates strings to the corresponding enum nrs_orr_supp value + */ +static enum nrs_orr_supp nrs_orr_str2supp(const char *val) +{ + if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READWRITES, + sizeof(LPROCFS_NRS_SUPP_NAME_READWRITES) - 1) == 0) + return NOS_OST_RW; + else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READS, + sizeof(LPROCFS_NRS_SUPP_NAME_READS) - 1) == 0) + return NOS_OST_READ; + else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_WRITES, + sizeof(LPROCFS_NRS_SUPP_NAME_WRITES) - 1) == 0) + return NOS_OST_WRITE; + else + return -EINVAL; +} + +/** + * Retrieves the type of RPCs handled at the point of invocation by ORR/TRR + * policy instances on both the regular and high-priority NRS head of a service, + * as long as a policy instance is not in the + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this + * state are skipped later by nrs_orr_ctl(). + * + * Supported RPC type information is a (reads|writes|reads_and_writes) string, + * and output is in YAML format. + * + * For example: + * + * reg_supported:reads + * hp_supported:reads_and_writes + */ +static int ptlrpc_lprocfs_rd_nrs_orr_supported(char *page, char **start, + off_t off, int count, int *eof, + void *data) +{ + struct nrs_lprocfs_orr_data *orr_data = data; + struct ptlrpc_service *svc = orr_data->svc; + enum nrs_orr_supp supported; + int rc; + int rc2 = 0; + + /** + * Perform two separate calls to this as only one of the NRS heads' + * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED + * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state. + */ + rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG, + orr_data->name, + NRS_CTL_ORR_RD_SUPP_REQ, true, + &supported); + + if (rc == 0) { + *eof = 1; + rc2 = snprintf(page, count, + NRS_LPROCFS_REQ_SUPP_NAME_REG"%s\n", + nrs_orr_supp2str(supported)); + /** + * Ignore -ENODEV as the regular NRS head's policy may be in the + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state. + */ + } else if (rc != -ENODEV) { + return rc; + } + + /** + * We know the ost_io service which is the only one ORR/TRR policies are + * compatible with, do have an HP NRS head, but it may be best to guard + * against a possible change of this in the future. + */ + if (!nrs_svc_has_hp(svc)) + goto no_hp; + + rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP, + orr_data->name, + NRS_CTL_ORR_RD_SUPP_REQ, true, + &supported); + if (rc == 0) { + *eof = 1; + rc2 += snprintf(page + rc2, count - rc2, + NRS_LPROCFS_REQ_SUPP_NAME_HP"%s\n", + nrs_orr_supp2str(supported)); + /** + * Ignore -ENODEV as the high priority NRS head's policy may be + * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state. + */ + } else if (rc != -ENODEV) { + return rc; + } + +no_hp: + + return rc2 ? : rc; +} + +/** + * Max valid command string is the size of the labels, plus "reads_and_writes" + * twice, plus a separating ' ' + */ +#define LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD \ + sizeof(NRS_LPROCFS_REQ_SUPP_NAME_REG LPROCFS_NRS_SUPP_NAME_READWRITES \ + NRS_LPROCFS_REQ_SUPP_NAME_HP LPROCFS_NRS_SUPP_NAME_READWRITES \ + " ") + +/** + * Sets the type of RPCs handled by ORR/TRR policy instances. The user can + * modify this setting for the regular or high priority NRS heads separately, or + * both together in a single invocation. + * + * For example: + * + * lctl set_param ost.OSS.ost_io.nrs_orr_supported= + * "reg_supported:reads", to enable the ORR policy instance on the regular NRS + * head of the ost_io service to handle OST_READ RPCs. + * + * lctl set_param ost.OSS.ost_io.nrs_trr_supported=reads_and_writes, to enable + * the TRR policy instances on both the regular ang high priority NRS heads of + * the ost_io service to use handle OST_READ and OST_WRITE RPCs. + * + * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are + * are skipped later by nrs_orr_ctl(). + */ +static int ptlrpc_lprocfs_wr_nrs_orr_supported(struct file *file, + const char *buffer, + unsigned long count, void *data) +{ + struct nrs_lprocfs_orr_data *orr_data = data; + struct ptlrpc_service *svc = orr_data->svc; + enum ptlrpc_nrs_queue_type queue = 0; + char kernbuf[LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD]; + char *val_reg; + char *val_hp; + enum nrs_orr_supp supp_reg; + enum nrs_orr_supp supp_hp; + unsigned long count_copy; + int rc = 0; + int rc2 = 0; + + if (count > (sizeof(kernbuf) - 1)) + return -EINVAL; + + if (cfs_copy_from_user(kernbuf, buffer, count)) + return -EFAULT; + + kernbuf[count] = '\0'; + + count_copy = count; + + /** + * Check if the regular supported requests setting has been specified + */ + val_reg = lprocfs_find_named_value(kernbuf, + NRS_LPROCFS_REQ_SUPP_NAME_REG, + &count_copy); + if (val_reg != kernbuf) + queue |= PTLRPC_NRS_QUEUE_REG; + + count_copy = count; + + /** + * Check if the high priority supported requests setting has been + * specified + */ + val_hp = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_REQ_SUPP_NAME_HP, + &count_copy); + if (val_hp != kernbuf) { + if (!nrs_svc_has_hp(svc)) + return -ENODEV; + + queue |= PTLRPC_NRS_QUEUE_HP; + } + + /** + * If none of the queues has been specified, there may be a valid + * command string at the start of the buffer. + */ + if (queue == 0) { + queue = PTLRPC_NRS_QUEUE_REG; + + if (nrs_svc_has_hp(svc)) + queue |= PTLRPC_NRS_QUEUE_HP; + } + + if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) { + supp_reg = nrs_orr_str2supp(val_reg); + if (supp_reg == -EINVAL) + return -EINVAL; + } + + if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) { + supp_hp = nrs_orr_str2supp(val_hp); + if (supp_hp == -EINVAL) + return -EINVAL; + } + + /** + * We change the values on regular and HP NRS heads separately, so that + * we do not exit early from ptlrpc_nrs_policy_control() with an error + * returned by nrs_policy_ctl_locked(), in cases where the user has not + * started the policy on either the regular or HP NRS head; i.e. we are + * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned + * only if the operation fails with -ENODEV on all heads that have been + * specified by the command; if at least one operation succeeds, + * success is returned. + */ + if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) { + rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG, + orr_data->name, + NRS_CTL_ORR_WR_SUPP_REQ, false, + &supp_reg); + if ((rc < 0 && rc != -ENODEV) || + (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG)) + return rc; + } + + if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) { + rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP, + orr_data->name, + NRS_CTL_ORR_WR_SUPP_REQ, false, + &supp_hp); + if ((rc2 < 0 && rc2 != -ENODEV) || + (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP)) + return rc2; + } + + return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count; +} + +int nrs_orr_lprocfs_init(struct ptlrpc_service *svc) +{ + int rc; + int i; + + struct lprocfs_vars nrs_orr_lprocfs_vars[] = { + { .name = "nrs_orr_quantum", + .read_fptr = ptlrpc_lprocfs_rd_nrs_orr_quantum, + .write_fptr = ptlrpc_lprocfs_wr_nrs_orr_quantum }, + { .name = "nrs_orr_offset_type", + .read_fptr = ptlrpc_lprocfs_rd_nrs_orr_offset_type, + .write_fptr = ptlrpc_lprocfs_wr_nrs_orr_offset_type }, + { .name = "nrs_orr_supported", + .read_fptr = ptlrpc_lprocfs_rd_nrs_orr_supported, + .write_fptr = ptlrpc_lprocfs_wr_nrs_orr_supported }, + { NULL } + }; + + if (svc->srv_procroot == NULL) + return 0; + + lprocfs_orr_data.svc = svc; + + for (i = 0; i < ARRAY_SIZE(nrs_orr_lprocfs_vars); i++) + nrs_orr_lprocfs_vars[i].data = &lprocfs_orr_data; + + rc = lprocfs_add_vars(svc->srv_procroot, nrs_orr_lprocfs_vars, NULL); + + return rc; +} + +void nrs_orr_lprocfs_fini(struct ptlrpc_service *svc) +{ + if (svc->srv_procroot == NULL) + return; + + lprocfs_remove_proc_entry("nrs_orr_quantum", svc->srv_procroot); + lprocfs_remove_proc_entry("nrs_orr_offset_type", svc->srv_procroot); + lprocfs_remove_proc_entry("nrs_orr_supported", svc->srv_procroot); +} + +#endif /* LPROCFS */ + +static const struct ptlrpc_nrs_pol_ops nrs_orr_ops = { + .op_policy_init = nrs_orr_init, + .op_policy_start = nrs_orr_start, + .op_policy_stop = nrs_orr_stop, + .op_policy_ctl = nrs_orr_ctl, + .op_res_get = nrs_orr_res_get, + .op_res_put = nrs_orr_res_put, + .op_req_get = nrs_orr_req_get, + .op_req_enqueue = nrs_orr_req_add, + .op_req_dequeue = nrs_orr_req_del, + .op_req_stop = nrs_orr_req_stop, +#ifdef LPROCFS + .op_lprocfs_init = nrs_orr_lprocfs_init, + .op_lprocfs_fini = nrs_orr_lprocfs_fini, +#endif +}; + +struct ptlrpc_nrs_pol_conf nrs_conf_orr = { + .nc_name = NRS_POL_NAME_ORR, + .nc_ops = &nrs_orr_ops, + .nc_compat = nrs_policy_compat_one, + .nc_compat_svc_name = "ost_io", +}; + +/** + * TRR, Target-based Round Robin policy + * + * TRR reuses much of the functions and data structures of ORR + */ + +#ifdef LPROCFS + +int nrs_trr_lprocfs_init(struct ptlrpc_service *svc) +{ + int rc; + int i; + + struct lprocfs_vars nrs_trr_lprocfs_vars[] = { + { .name = "nrs_trr_quantum", + .read_fptr = ptlrpc_lprocfs_rd_nrs_orr_quantum, + .write_fptr = ptlrpc_lprocfs_wr_nrs_orr_quantum }, + { .name = "nrs_trr_offset_type", + .read_fptr = ptlrpc_lprocfs_rd_nrs_orr_offset_type, + .write_fptr = ptlrpc_lprocfs_wr_nrs_orr_offset_type }, + { .name = "nrs_trr_supported", + .read_fptr = ptlrpc_lprocfs_rd_nrs_orr_supported, + .write_fptr = ptlrpc_lprocfs_wr_nrs_orr_supported }, + { NULL } + }; + + if (svc->srv_procroot == NULL) + return 0; + + lprocfs_trr_data.svc = svc; + + for (i = 0; i < ARRAY_SIZE(nrs_trr_lprocfs_vars); i++) + nrs_trr_lprocfs_vars[i].data = &lprocfs_trr_data; + + rc = lprocfs_add_vars(svc->srv_procroot, nrs_trr_lprocfs_vars, NULL); + + return rc; +} + +void nrs_trr_lprocfs_fini(struct ptlrpc_service *svc) +{ + if (svc->srv_procroot == NULL) + return; + + lprocfs_remove_proc_entry("nrs_trr_quantum", svc->srv_procroot); + lprocfs_remove_proc_entry("nrs_trr_offset_type", svc->srv_procroot); + lprocfs_remove_proc_entry("nrs_trr_supported", svc->srv_procroot); +} + +#endif /* LPROCFS */ + +/** + * Reuse much of the ORR functionality for TRR. + */ +static const struct ptlrpc_nrs_pol_ops nrs_trr_ops = { + .op_policy_init = nrs_orr_init, + .op_policy_start = nrs_orr_start, + .op_policy_stop = nrs_orr_stop, + .op_policy_ctl = nrs_orr_ctl, + .op_res_get = nrs_orr_res_get, + .op_res_put = nrs_orr_res_put, + .op_req_get = nrs_orr_req_get, + .op_req_enqueue = nrs_orr_req_add, + .op_req_dequeue = nrs_orr_req_del, + .op_req_stop = nrs_orr_req_stop, +#ifdef LPROCFS + .op_lprocfs_init = nrs_trr_lprocfs_init, + .op_lprocfs_fini = nrs_trr_lprocfs_fini, +#endif +}; + +struct ptlrpc_nrs_pol_conf nrs_conf_trr = { + .nc_name = NRS_POL_NAME_TRR, + .nc_ops = &nrs_trr_ops, + .nc_compat = nrs_policy_compat_one, + .nc_compat_svc_name = "ost_io", +}; + +/** @} ORR/TRR policy */ + +/** @} nrs */ + +#endif /* HAVE_SERVER_SUPPORT */ diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index 45d9fed..153a3ba 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -213,7 +213,7 @@ struct ptlrpc_nrs_policy *nrs_request_policy(struct ptlrpc_nrs_request *nrq) #define NRS_LPROCFS_QUANTUM_NAME_HP "hp_quantum:" /** - * the maximum size of nrs_crrn_client::cc_quantum + * the maximum size of nrs_crrn_client::cc_quantum and nrs_orr_data::od_quantum. */ #define LPROCFS_NRS_QUANTUM_MAX 65535 diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index c141f49..c56780c 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -1778,7 +1778,8 @@ got_request: * ptlrpc_server_handle_req later on. */ static int -ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt) +ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt, + struct ptlrpc_thread *thread) { struct ptlrpc_service *svc = svcpt->scp_service; struct ptlrpc_request *req; @@ -1898,6 +1899,8 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt) goto err_req; } + req->rq_svc_thread = thread; + ptlrpc_at_add_timed(req); /* Move it over to the request processing queue */ @@ -2239,7 +2242,7 @@ liblustre_check_services (void *arg) svcpt->scp_nthrs_running++; do { - rc = ptlrpc_server_handle_req_in(svcpt); + rc = ptlrpc_server_handle_req_in(svcpt, NULL); rc |= ptlrpc_server_handle_reply(svcpt); rc |= ptlrpc_at_check_timed(svcpt); rc |= ptlrpc_server_handle_request(svcpt, NULL); @@ -2501,7 +2504,10 @@ static int ptlrpc_main(void *arg) /* Process all incoming reqs before handling any */ if (ptlrpc_server_request_incoming(svcpt)) { - ptlrpc_server_handle_req_in(svcpt); + lu_context_enter(&env->le_ctx); + ptlrpc_server_handle_req_in(svcpt, thread); + lu_context_exit(&env->le_ctx); + /* but limit ourselves in case of flood */ if (counter++ < 100) continue;