4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2013, 2017, Intel Corporation.
25 * Copyright 2012 Xyratex Technology Limited
28 * lustre/ptlrpc/nrs_orr.c
30 * Network Request Scheduler (NRS) ORR and TRR policies
32 * Request scheduling in a Round-Robin manner over backend-fs objects and OSTs
35 * Author: Liang Zhen <liang@whamcloud.com>
36 * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
43 #define DEBUG_SUBSYSTEM S_RPC
44 #include <linux/delay.h>
46 #include <obd_support.h>
47 #include <obd_class.h>
48 #include <lustre_net.h>
49 #include <lustre_req_layout.h>
50 #include <lustre_compat.h>
51 #include "ptlrpc_internal.h"
54 * \name ORR/TRR policy
56 * ORR/TRR (Object-based Round Robin/Target-based Round Robin) NRS policies
58 * ORR performs batched Round Robin shceduling of brw RPCs, based on the FID of
59 * the backend-fs object that the brw RPC pertains to; the TRR policy performs
60 * batched Round Robin scheduling of brw RPCs, based on the OST index that the
61 * RPC pertains to. Both policies also order RPCs in each batch in ascending
62 * offset order, which is lprocfs-tunable between logical file offsets, and
63 * physical disk offsets, as reported by fiemap.
65 * The TRR policy reuses much of the functionality of ORR. These two scheduling
66 * algorithms could alternatively be implemented under a single NRS policy, that
67 * uses an lprocfs tunable in order to switch between the two types of
68 * scheduling behaviour. The two algorithms have been implemented as separate
69 * policies for reasons of clarity to the user, and to avoid issues that would
70 * otherwise arise at the point of switching between behaviours in the case of
71 * having a single policy, such as resource cleanup for nrs_orr_object
72 * instances. It is possible that this may need to be re-examined in the future,
73 * along with potentially coalescing other policies that perform batched request
74 * scheduling in a Round-Robin manner, all into one policy.
79 #define NRS_POL_NAME_ORR "orr"
80 #define NRS_POL_NAME_TRR "trr"
83 * Checks if the RPC type of \a nrq is currently handled by an ORR/TRR policy
85 * \param[in] orrd the ORR/TRR policy scheduler instance
86 * \param[in] nrq the request
87 * \param[out] opcode the opcode is saved here, just in order to avoid calling
88 * lustre_msg_get_opc() again later
90 * \retval true request type is supported by the policy instance
91 * \retval false request type is not supported by the policy instance
93 static bool nrs_orr_req_supported(struct nrs_orr_data *orrd,
94 struct ptlrpc_nrs_request *nrq, __u32 *opcode)
96 struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
98 __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
102 * XXX: nrs_orr_data::od_supp accessed unlocked.
106 rc = orrd->od_supp & NOS_OST_READ;
109 rc = orrd->od_supp & NOS_OST_WRITE;
120 * Returns the ORR/TRR key fields for the request \a nrq in \a key.
122 * \param[in] orrd the ORR/TRR policy scheduler instance
123 * \param[in] nrq the request
124 * \param[in] opc the request's opcode
125 * \param[in] name the policy name
126 * \param[out] key fields of the key are returned here.
128 * \retval 0 key filled successfully
131 static int nrs_orr_key_fill(struct nrs_orr_data *orrd,
132 struct ptlrpc_nrs_request *nrq, __u32 opc,
133 char *name, struct nrs_orr_key *key)
135 struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
137 struct ost_body *body;
141 LASSERT(req != NULL);
144 * This is an attempt to fill in the request key fields while
145 * moving a request from the regular to the high-priority NRS
146 * head (via ldlm_lock_reorder_req()), but the request key has
147 * been adequately filled when nrs_orr_res_get() was called through
148 * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR
149 * policy, so there is nothing to do.
151 if (nrq->nr_u.orr.or_orr_set) {
152 *key = nrq->nr_u.orr.or_key;
156 /* Bounce unconnected requests to the default policy. */
157 if (req->rq_export == NULL)
160 ost_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
163 * The request pill for OST_READ and OST_WRITE requests is
164 * initialized in the ost_io service's
165 * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(),
166 * so no need to redo it here.
168 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
172 rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx);
176 nrq->nr_u.orr.or_orr_set = 1;
182 * Populates the range values in \a range with logical offsets obtained via
185 * \param[in] nb niobuf_remote struct array for this request
186 * \param[in] niocount count of niobuf_remote structs for this request
187 * \param[out] range the offset range is returned here
189 static void nrs_orr_range_fill_logical(struct niobuf_remote *nb, int niocount,
190 struct nrs_orr_req_range *range)
192 /* Should we do this at page boundaries ? */
193 range->or_start = nb[0].rnb_offset & PAGE_MASK;
194 range->or_end = (nb[niocount - 1].rnb_offset +
195 nb[niocount - 1].rnb_len - 1) | ~PAGE_MASK;
199 * We obtain information just for a single extent, as the request can only be in
200 * a single place in the binary heap anyway.
202 #define ORR_NUM_EXTENTS 1
205 * Converts the logical file offset range in \a range, to a physical disk offset
206 * range in \a range, for a request. Uses obd_get_info() in order to carry out a
207 * fiemap call and obtain backend-fs extent information. The returned range is
208 * in physical block numbers.
210 * \param[in] nrq the request
211 * \param[in] oa obdo struct for this request
212 * \param[in,out] range the offset range in bytes; logical range in, physical
215 * \retval 0 physical offsets obtained successfully
218 static int nrs_orr_range_fill_physical(struct ptlrpc_nrs_request *nrq,
220 struct nrs_orr_req_range *range)
222 struct ptlrpc_request *req = container_of(nrq,
223 struct ptlrpc_request,
225 char fiemap_buf[offsetof(struct fiemap,
226 fm_extents[ORR_NUM_EXTENTS])];
227 struct fiemap *fiemap = (struct fiemap *)fiemap_buf;
228 struct ll_fiemap_info_key key;
233 key = (typeof(key)) {
234 .lfik_name = KEY_FIEMAP,
237 .fm_start = range->or_start,
238 .fm_length = range->or_end - range->or_start,
239 .fm_extent_count = ORR_NUM_EXTENTS
243 rc = obd_get_info(req->rq_svc_thread->t_env, req->rq_export,
244 sizeof(key), &key, NULL, fiemap);
248 if (fiemap->fm_mapped_extents == 0 ||
249 fiemap->fm_mapped_extents > ORR_NUM_EXTENTS)
250 GOTO(out, rc = -EFAULT);
253 * Calculate the physical offset ranges for the request from the extent
254 * information and the logical request offsets.
256 start = fiemap->fm_extents[0].fe_physical + range->or_start -
257 fiemap->fm_extents[0].fe_logical;
258 end = start + range->or_end - range->or_start;
260 range->or_start = start;
263 nrq->nr_u.orr.or_physical_set = 1;
269 * Sets the offset range the request covers; either in logical file
270 * offsets or in physical disk offsets.
272 * \param[in] nrq the request
273 * \param[in] orrd the ORR/TRR policy scheduler instance
274 * \param[in] opc the request's opcode
275 * \param[in] moving_req is the request in the process of moving onto the
276 * high-priority NRS head?
278 * \retval 0 range filled successfully
281 static int nrs_orr_range_fill(struct ptlrpc_nrs_request *nrq,
282 struct nrs_orr_data *orrd, __u32 opc,
285 struct ptlrpc_request *req = container_of(nrq,
286 struct ptlrpc_request,
288 struct obd_ioobj *ioo;
289 struct niobuf_remote *nb;
290 struct ost_body *body;
291 struct nrs_orr_req_range range;
296 * If we are scheduling using physical disk offsets, but we have filled
297 * the offset information in the request previously
298 * (i.e. ldlm_lock_reorder_req() is moving the request to the
299 * high-priority NRS head), there is no need to do anything, and we can
300 * exit. Moreover than the lack of need, we would be unable to perform
301 * the obd_get_info() call required in nrs_orr_range_fill_physical(),
302 * because ldlm_lock_reorder_lock() calls into here while holding a
303 * spinlock, and retrieving fiemap information via obd_get_info() is a
304 * potentially sleeping operation.
306 if (orrd->od_physical && nrq->nr_u.orr.or_physical_set)
309 ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
311 GOTO(out, rc = -EFAULT);
313 niocount = ioo->ioo_bufcnt;
315 nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
317 GOTO(out, rc = -EFAULT);
320 * Use logical information from niobuf_remote structures.
322 nrs_orr_range_fill_logical(nb, niocount, &range);
325 * Obtain physical offsets if selected, and this is an OST_READ RPC
326 * RPC. We do not enter this block if moving_req is set which indicates
327 * that the request is being moved to the high-priority NRS head by
328 * ldlm_lock_reorder_req(), as that function calls in here while holding
329 * a spinlock, and nrs_orr_range_physical() can sleep, so we just use
330 * logical file offsets for the range values for such requests.
332 if (orrd->od_physical && opc == OST_READ && !moving_req) {
333 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
335 GOTO(out, rc = -EFAULT);
338 * Translate to physical block offsets from backend filesystem
340 * Ignore return values; if obtaining the physical offsets
341 * fails, use the logical offsets.
343 nrs_orr_range_fill_physical(nrq, &body->oa, &range);
346 nrq->nr_u.orr.or_range = range;
352 * Generates a character string that can be used in order to register uniquely
353 * named slab objects for ORR/TRR policy instances. The character string is
354 * unique per policy instance, as it includes the policy's name, the CPT number,
355 * and a {reg|hp} token, and there is one policy instance per NRS head on each
356 * CPT, and the policy is only compatible with the ost_io service.
358 * \param[in] policy the policy instance
359 * \param[out] name the character array that will hold the generated name
361 static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name)
363 snprintf(name, NRS_ORR_OBJ_NAME_MAX, "%s%s%s%d",
364 "nrs_", policy->pol_desc->pd_name,
365 policy->pol_nrs->nrs_queue_type == PTLRPC_NRS_QUEUE_REG ?
366 "_reg_" : "_hp_", nrs_pol2cptid(policy));
370 * ORR/TRR hash operations
372 static u32 nrs_orr_hashfn(const void *data, u32 len, u32 seed)
374 const struct nrs_orr_key *key = data;
376 seed = cfs_hash_32(seed ^ key->ok_fid.f_oid, 32);
377 seed ^= cfs_hash_64(key->ok_fid.f_seq, 32);
381 static int nrs_orr_cmpfn(struct rhashtable_compare_arg *arg, const void *obj)
383 const struct nrs_orr_object *orro = obj;
384 const struct nrs_orr_key *key = arg->key;
386 return lu_fid_eq(&orro->oo_key.ok_fid, &key->ok_fid) ? 0 : -ESRCH;
389 static void nrs_orr_hash_exit(void *vobj, void *data)
391 struct nrs_orr_object *orro = vobj;
392 struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent,
393 struct nrs_orr_data, od_res);
395 /* We shouldn't reach here but just in case. nrs_xxx_res_put
396 * should of have freed orro.
398 LASSERTF(refcount_read(&orro->oo_ref) == 0,
399 "Busy NRS ORR policy object for OST with index %u, with %d refs\n",
400 orro->oo_key.ok_idx, refcount_read(&orro->oo_ref));
402 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
405 static const struct rhashtable_params nrs_orr_hash_params = {
406 .key_len = sizeof(struct lu_fid),
407 .key_offset = offsetof(struct nrs_orr_object, oo_key),
408 .head_offset = offsetof(struct nrs_orr_object, oo_rhead),
409 .hashfn = nrs_orr_hashfn,
410 .obj_cmpfn = nrs_orr_cmpfn,
413 #define NRS_ORR_QUANTUM_DFLT 256
416 * Binary heap predicate.
419 * ptlrpc_nrs_request::nr_u::orr::or_round,
420 * ptlrpc_nrs_request::nr_u::orr::or_sequence, and
421 * ptlrpc_nrs_request::nr_u::orr::or_range to compare two binheap nodes and
422 * produce a binary predicate that indicates their relative priority, so that
423 * the binary heap can perform the necessary sorting operations.
425 * \param[in] e1 the first binheap node to compare
426 * \param[in] e2 the second binheap node to compare
432 orr_req_compare(struct binheap_node *e1, struct binheap_node *e2)
434 struct ptlrpc_nrs_request *nrq1;
435 struct ptlrpc_nrs_request *nrq2;
437 nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
438 nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
441 * Requests have been scheduled against a different scheduling round.
443 if (nrq1->nr_u.orr.or_round < nrq2->nr_u.orr.or_round)
445 else if (nrq1->nr_u.orr.or_round > nrq2->nr_u.orr.or_round)
449 * Requests have been scheduled against the same scheduling round, but
450 * belong to a different batch, i.e. they pertain to a different
451 * backend-fs object (for ORR policy instances) or OST (for TRR policy
454 if (nrq1->nr_u.orr.or_sequence < nrq2->nr_u.orr.or_sequence)
456 else if (nrq1->nr_u.orr.or_sequence > nrq2->nr_u.orr.or_sequence)
460 * If round numbers and sequence numbers are equal, the two requests
461 * have been scheduled on the same round, and belong to the same batch,
462 * which means they pertain to the same backend-fs object (if this is an
463 * ORR policy instance), or to the same OST (if this is a TRR policy
464 * instance), so these requests should be sorted by ascending offset
467 if (nrq1->nr_u.orr.or_range.or_start <
468 nrq2->nr_u.orr.or_range.or_start) {
470 } else if (nrq1->nr_u.orr.or_range.or_start >
471 nrq2->nr_u.orr.or_range.or_start) {
475 * Requests start from the same offset; Dispatch the shorter one
476 * first; perhaps slightly more chances of hitting caches like
479 return nrq1->nr_u.orr.or_range.or_end <
480 nrq2->nr_u.orr.or_range.or_end;
485 * ORR binary heap operations
487 static struct binheap_ops nrs_orr_heap_ops = {
490 .hop_compare = orr_req_compare,
494 * Prints a warning message if an ORR/TRR policy is started on a service with
495 * more than one CPT. Not printed on the console for now, since we don't
496 * have any performance metrics in the first place, and it is annoying.
498 * \param[in] policy the policy instance
502 static int nrs_orr_init(struct ptlrpc_nrs_policy *policy)
504 if (policy->pol_nrs->nrs_svcpt->scp_service->srv_ncpts > 1)
505 CDEBUG(D_CONFIG, "%s: The %s NRS policy was registered on a "
506 "service with multiple service partitions. This policy "
507 "may perform better with a single partition.\n",
508 policy->pol_nrs->nrs_svcpt->scp_service->srv_name,
509 policy->pol_desc->pd_name);
515 * Called when an ORR policy instance is started.
517 * \param[in] policy the policy
519 * \retval -ENOMEM OOM error
522 static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg)
524 struct nrs_orr_data *orrd;
528 OBD_CPT_ALLOC_PTR(orrd, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
533 * Binary heap instance for sorted incoming requests.
535 orrd->od_binheap = binheap_create(&nrs_orr_heap_ops,
536 CBH_FLAG_ATOMIC_GROW, 4096, NULL,
537 nrs_pol2cptab(policy),
538 nrs_pol2cptid(policy));
539 if (orrd->od_binheap == NULL)
540 GOTO(out_orrd, rc = -ENOMEM);
542 nrs_orr_genobjname(policy, orrd->od_objname);
545 * Slab cache for NRS ORR/TRR objects.
547 orrd->od_cache = kmem_cache_create(orrd->od_objname,
548 sizeof(struct nrs_orr_object),
550 if (orrd->od_cache == NULL)
551 GOTO(out_binheap, rc = -ENOMEM);
554 * Use a hash for finding objects by struct nrs_orr_key.
555 * For TRR we use Xarray instead since items are resolved
556 * using the OST indices, and they will stay relatively
557 * stable during an OSS node's lifetime.
559 if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
560 NRS_POL_NAME_MAX) == 0) {
561 rc = rhashtable_init(&orrd->od_obj_hash, &nrs_orr_hash_params);
565 xa_init(&orrd->od_trr_objs);
568 /* XXX: Fields accessed unlocked */
569 orrd->od_quantum = NRS_ORR_QUANTUM_DFLT;
570 orrd->od_supp = NOS_DFLT;
571 orrd->od_physical = true;
573 * Set to 1 so that the test inside nrs_orr_req_add() can evaluate to
576 orrd->od_sequence = 1;
578 policy->pol_private = orrd;
583 kmem_cache_destroy(orrd->od_cache);
585 binheap_destroy(orrd->od_binheap);
593 * Called when an ORR/TRR policy instance is stopped.
595 * Called when the policy has been instructed to transition to the
596 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more
597 * pending requests to serve.
599 * \param[in] policy the policy
601 static void nrs_orr_stop(struct ptlrpc_nrs_policy *policy)
603 struct nrs_orr_data *orrd = policy->pol_private;
606 LASSERT(orrd != NULL);
607 LASSERT(orrd->od_binheap != NULL);
608 LASSERT(orrd->od_cache != NULL);
609 LASSERT(binheap_is_empty(orrd->od_binheap));
611 binheap_destroy(orrd->od_binheap);
612 if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_TRR,
613 NRS_POL_NAME_MAX) == 0) {
614 struct nrs_orr_object *orro;
617 xa_for_each(&orrd->od_trr_objs, i, orro) {
618 xa_erase(&orrd->od_trr_objs, i);
619 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
621 xa_destroy(&orrd->od_trr_objs);
623 rhashtable_free_and_destroy(&orrd->od_obj_hash,
624 nrs_orr_hash_exit, NULL);
627 kmem_cache_destroy(orrd->od_cache);
633 * Performs a policy-specific ctl function on ORR/TRR policy instances; similar
636 * \param[in] policy the policy instance
637 * \param[in] opc the opcode
638 * \param[in,out] arg used for passing parameters and information
640 * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
641 * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
643 * \retval 0 operation carried successfully
646 static int nrs_orr_ctl(struct ptlrpc_nrs_policy *policy,
647 enum ptlrpc_nrs_ctl opc, void *arg)
649 assert_spin_locked(&policy->pol_nrs->nrs_lock);
655 case NRS_CTL_ORR_RD_QUANTUM: {
656 struct nrs_orr_data *orrd = policy->pol_private;
658 *(__u16 *)arg = orrd->od_quantum;
662 case NRS_CTL_ORR_WR_QUANTUM: {
663 struct nrs_orr_data *orrd = policy->pol_private;
665 orrd->od_quantum = *(__u16 *)arg;
666 LASSERT(orrd->od_quantum != 0);
670 case NRS_CTL_ORR_RD_OFF_TYPE: {
671 struct nrs_orr_data *orrd = policy->pol_private;
673 *(bool *)arg = orrd->od_physical;
677 case NRS_CTL_ORR_WR_OFF_TYPE: {
678 struct nrs_orr_data *orrd = policy->pol_private;
680 orrd->od_physical = *(bool *)arg;
684 case NRS_CTL_ORR_RD_SUPP_REQ: {
685 struct nrs_orr_data *orrd = policy->pol_private;
687 *(enum nrs_orr_supp *)arg = orrd->od_supp;
691 case NRS_CTL_ORR_WR_SUPP_REQ: {
692 struct nrs_orr_data *orrd = policy->pol_private;
694 orrd->od_supp = *(enum nrs_orr_supp *)arg;
695 LASSERT((orrd->od_supp & NOS_OST_RW) != 0);
703 * Obtains resources for ORR/TRR policy instances. The top-level resource lives
704 * inside \e nrs_orr_data and the second-level resource inside
705 * \e nrs_orr_object instances.
707 * \param[in] policy the policy for which resources are being taken for
709 * \param[in] nrq the request for which resources are being taken
710 * \param[in] parent parent resource, embedded in nrs_orr_data for the
712 * \param[out] resp used to return resource references
713 * \param[in] moving_req signifies limited caller context; used to perform
714 * memory allocations in an atomic context in this
717 * \retval 0 we are returning a top-level, parent resource, one that is
718 * embedded in an nrs_orr_data object
719 * \retval 1 we are returning a bottom-level resource, one that is embedded
720 * in an nrs_orr_object object
722 * \see nrs_resource_get_safe()
724 static int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
725 struct ptlrpc_nrs_request *nrq,
726 const struct ptlrpc_nrs_resource *parent,
727 struct ptlrpc_nrs_resource **resp, bool moving_req)
729 struct nrs_orr_data *orrd;
730 struct nrs_orr_object *orro, *new_orro;
731 struct nrs_orr_key key = { { { 0 } } };
736 * struct nrs_orr_data is requested.
738 if (parent == NULL) {
739 *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res;
743 orrd = container_of(parent, struct nrs_orr_data, od_res);
746 * If the request type is not supported, fail the enqueuing; the RPC
747 * will be handled by the fallback NRS policy.
749 if (!nrs_orr_req_supported(orrd, nrq, &opc))
753 * Fill in the key for the request; OST FID for ORR policy instances,
754 * and OST index for TRR policy instances.
756 rc = nrs_orr_key_fill(orrd, nrq, opc, policy->pol_desc->pd_name, &key);
761 * Set the offset range the request covers
763 rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req);
767 /* Handle the ORR case which involves looking up the orro in the
768 * hashtable. If not found then insert it. Unlike TRR the orro can
769 * be deleted in parallel during the life cycle of the object.
772 orro = rhashtable_lookup_fast(&orrd->od_obj_hash, &key,
773 nrs_orr_hash_params);
774 if (orro && refcount_inc_not_zero(&orro->oo_ref))
779 OBD_SLAB_CPT_ALLOC_PTR_GFP(new_orro, orrd->od_cache,
780 nrs_pol2cptab(policy), nrs_pol2cptid(policy),
781 moving_req ? GFP_ATOMIC : GFP_NOFS);
782 if (new_orro == NULL)
785 new_orro->oo_key = key;
786 refcount_set(&new_orro->oo_ref, 1);
789 orro = rhashtable_lookup_get_insert_fast(&orrd->od_obj_hash,
791 nrs_orr_hash_params);
792 /* insertion sucessfull */
793 if (likely(orro == NULL)) {
798 /* A returned non-error orro means it already exist */
799 rc = IS_ERR(orro) ? PTR_ERR(orro) : 0;
800 if (!rc && refcount_inc_not_zero(&orro->oo_ref)) {
801 OBD_SLAB_FREE_PTR(new_orro, orrd->od_cache);
806 /* oo_ref == 0, orro will be freed */
810 /* hash table could be resizing. */
811 if (rc == -ENOMEM || rc == -EBUSY) {
815 OBD_SLAB_FREE_PTR(new_orro, orrd->od_cache);
822 * For debugging purposes
824 nrq->nr_u.orr.or_key = orro->oo_key;
826 *resp = &orro->oo_res;
831 static void nrs_orr_object_free(struct rcu_head *head)
833 struct nrs_orr_object *orro = container_of(head, struct nrs_orr_object,
835 struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent,
836 struct nrs_orr_data, od_res);
838 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
842 * Called when releasing references to the resource hierachy obtained for a
843 * request for scheduling using ORR/TRR policy instances
845 * \param[in] policy the policy the resource belongs to
846 * \param[in] res the resource to be released
848 static void nrs_orr_res_put(struct ptlrpc_nrs_policy *policy,
849 const struct ptlrpc_nrs_resource *res)
851 struct nrs_orr_data *orrd;
852 struct nrs_orr_object *orro;
855 * Do nothing for freeing parent, nrs_orr_data resources.
857 if (res->res_parent == NULL)
860 orro = container_of(res, struct nrs_orr_object, oo_res);
861 if (!refcount_dec_and_test(&orro->oo_ref))
864 orrd = container_of(orro->oo_res.res_parent, struct nrs_orr_data, od_res);
865 rhashtable_remove_fast(&orrd->od_obj_hash, &orro->oo_rhead,
866 nrs_orr_hash_params);
867 call_rcu(&orro->oo_rcu_head, nrs_orr_object_free);
871 * Obtains resources for TRR policy instances. The top-level resource lives
872 * inside \e nrs_orr_data and the second-level resource inside
873 * \e nrs_orr_object instances.
875 * @policy the policy for which resources are being taken for
877 * @nrq the request for which resources are being taken
878 * @parent parent resource, embedded in nrs_orr_data for the
880 * @resp used to return resource references
881 * @moving_req signifies limited caller context; used to perform
882 * memory allocations in an atomic context in this
885 * RETURN 0 we are returning a top-level, parent resource, one that is
886 * embedded in an nrs_orr_data object
887 * 1 we are returning a bottom-level resource, one that is embedded
888 * in an nrs_orr_object object
890 * \see nrs_resource_get_safe()
892 static int nrs_trr_res_get(struct ptlrpc_nrs_policy *policy,
893 struct ptlrpc_nrs_request *nrq,
894 const struct ptlrpc_nrs_resource *parent,
895 struct ptlrpc_nrs_resource **resp, bool moving_req)
897 struct nrs_orr_key key = { { { 0 } } };
898 struct nrs_orr_object *orro;
899 struct nrs_orr_data *orrd;
904 * struct nrs_orr_data is requested.
907 *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res;
911 orrd = container_of(parent, struct nrs_orr_data, od_res);
914 * If the request type is not supported, fail the enqueuing; the RPC
915 * will be handled by the fallback NRS policy.
917 if (!nrs_orr_req_supported(orrd, nrq, &opc))
921 * This is an attempt to fill in the request key fields while
922 * moving a request from the regular to the high-priority NRS
923 * head (via ldlm_lock_reorder_req()), but the request key has
924 * been adequately filled when nrs_trr_res_get() was called through
925 * ptlrpc_nrs_req_initialize() for the regular NRS head's TRR
926 * policy, so there is nothing to do.
928 if (!nrq->nr_u.orr.or_trr_set) {
929 struct ptlrpc_request *req;
931 /* Bounce unconnected requests to the default policy. */
932 req = container_of(nrq, struct ptlrpc_request, rq_nrq);
936 key.ok_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
937 nrq->nr_u.orr.or_trr_set = 1;
939 key = nrq->nr_u.orr.or_key;
943 * Set the offset range the request covers
945 rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req);
949 /* For TRR we just attempt to find the orro via the ok_idx.
950 * If not found we insert it into the Xarray.
953 orro = xa_load(&orrd->od_trr_objs, key.ok_idx);
957 OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache,
958 nrs_pol2cptab(policy), nrs_pol2cptid(policy),
959 moving_req ? GFP_ATOMIC : GFP_NOFS);
964 rc = ll_xa_insert(&orrd->od_trr_objs, key.ok_idx, orro,
965 moving_req ? GFP_ATOMIC : GFP_NOFS);
967 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
976 * For debugging purposes
978 nrq->nr_u.orr.or_key = orro->oo_key;
980 *resp = &orro->oo_res;
986 * Called when polling an ORR/TRR policy instance for a request so that it can
987 * be served. Returns the request that is at the root of the binary heap, as
988 * that is the lowest priority one (i.e. binheap is an implementation of a
991 * \param[in] policy the policy instance being polled
992 * \param[in] peek when set, signifies that we just want to examine the
993 * request, and not handle it, so the request is not removed
995 * \param[in] force force the policy to return a request; unused in this policy
997 * \retval the request to be handled
998 * \retval NULL no request available
1000 * \see ptlrpc_nrs_req_get_nolock()
1001 * \see nrs_request_get()
1004 struct ptlrpc_nrs_request *nrs_orr_req_get(struct ptlrpc_nrs_policy *policy,
1005 bool peek, bool force)
1007 struct nrs_orr_data *orrd = policy->pol_private;
1008 struct binheap_node *node = binheap_root(orrd->od_binheap);
1009 struct ptlrpc_nrs_request *nrq;
1011 nrq = unlikely(node == NULL) ? NULL :
1012 container_of(node, struct ptlrpc_nrs_request, nr_node);
1014 if (likely(!peek && nrq != NULL)) {
1015 struct nrs_orr_object *orro;
1017 orro = container_of(nrs_request_resource(nrq),
1018 struct nrs_orr_object, oo_res);
1020 LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
1022 binheap_remove(orrd->od_binheap, &nrq->nr_node);
1025 if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
1026 NRS_POL_NAME_MAX) == 0)
1028 "NRS: starting to handle %s request for object "
1029 "with FID "DFID", from OST with index %u, with "
1030 "round %llu\n", NRS_POL_NAME_ORR,
1031 PFID(&orro->oo_key.ok_fid),
1032 nrq->nr_u.orr.or_key.ok_idx,
1033 nrq->nr_u.orr.or_round);
1036 "NRS: starting to handle %s request from OST "
1037 "with index %u, with round %llu\n",
1038 NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
1039 nrq->nr_u.orr.or_round);
1041 /** Peek at the next request to be served */
1042 node = binheap_root(orrd->od_binheap);
1044 /** No more requests */
1045 if (unlikely(node == NULL)) {
1048 struct ptlrpc_nrs_request *next;
1050 next = container_of(node, struct ptlrpc_nrs_request,
1053 if (orrd->od_round < next->nr_u.orr.or_round)
1054 orrd->od_round = next->nr_u.orr.or_round;
1062 * Sort-adds request \a nrq to an ORR/TRR \a policy instance's set of queued
1063 * requests in the policy's binary heap.
1065 * A scheduling round is a stream of requests that have been sorted in batches
1066 * according to the backend-fs object (for ORR policy instances) or OST (for TRR
1067 * policy instances) that they pertain to (as identified by its IDIF FID or OST
1068 * index respectively); there can be only one batch for each object or OST in
1069 * each round. The batches are of maximum size nrs_orr_data:od_quantum. When a
1070 * new request arrives for scheduling for an object or OST that has exhausted
1071 * its quantum in its current round, the request will be scheduled on the next
1072 * scheduling round. Requests are allowed to be scheduled against a round until
1073 * all requests for the round are serviced, so an object or OST might miss a
1074 * round if requests are not scheduled for it for a long enough period of time.
1075 * Objects or OSTs that miss a round will continue with having their next
1076 * request scheduled, starting at the round that requests are being dispatched
1077 * for, at the time of arrival of this request.
1079 * Requests are tagged with the round number and a sequence number; the sequence
1080 * number indicates the relative ordering amongst the batches of requests in a
1081 * round, and is identical for all requests in a batch, as is the round number.
1082 * The round and sequence numbers are used by orr_req_compare() in order to use
1083 * nrs_orr_data::od_binheap in order to maintain an ordered set of rounds, with
1084 * each round consisting of an ordered set of batches of requests, and each
1085 * batch consisting of an ordered set of requests according to their logical
1086 * file or physical disk offsets.
1088 * \param[in] policy the policy
1089 * \param[in] nrq the request to add
1091 * \retval 0 request successfully added
1092 * \retval != 0 error
1094 static int nrs_orr_req_add(struct ptlrpc_nrs_policy *policy,
1095 struct ptlrpc_nrs_request *nrq)
1097 struct nrs_orr_data *orrd;
1098 struct nrs_orr_object *orro;
1101 orro = container_of(nrs_request_resource(nrq),
1102 struct nrs_orr_object, oo_res);
1103 orrd = container_of(nrs_request_resource(nrq)->res_parent,
1104 struct nrs_orr_data, od_res);
1106 if (orro->oo_quantum == 0 || orro->oo_round < orrd->od_round ||
1107 (orro->oo_active == 0 && orro->oo_quantum > 0)) {
1110 * If there are no pending requests for the object/OST, but some
1111 * of its quantum still remains unused, which implies we did not
1112 * get a chance to schedule up to its maximum allowed batch size
1113 * of requests in the previous round this object/OST
1114 * participated in, schedule this next request on a new round;
1115 * this avoids fragmentation of request batches caused by
1116 * intermittent inactivity on the object/OST, at the expense of
1117 * potentially slightly increased service time for the request
1118 * batch this request will be a part of.
1120 if (orro->oo_active == 0 && orro->oo_quantum > 0)
1123 /** A new scheduling round has commenced */
1124 if (orro->oo_round < orrd->od_round)
1125 orro->oo_round = orrd->od_round;
1127 /** I was not the last object/OST that scheduled a request */
1128 if (orro->oo_sequence < orrd->od_sequence)
1129 orro->oo_sequence = ++orrd->od_sequence;
1131 * Reset the quantum if we have reached the maximum quantum
1132 * size for this batch, or even if we have not managed to
1133 * complete a batch size up to its maximum allowed size.
1134 * XXX: Accessed unlocked
1136 orro->oo_quantum = orrd->od_quantum;
1139 nrq->nr_u.orr.or_round = orro->oo_round;
1140 nrq->nr_u.orr.or_sequence = orro->oo_sequence;
1142 rc = binheap_insert(orrd->od_binheap, &nrq->nr_node);
1145 if (--orro->oo_quantum == 0)
1152 * Removes request \a nrq from an ORR/TRR \a policy instance's set of queued
1155 * \param[in] policy the policy
1156 * \param[in] nrq the request to remove
1158 static void nrs_orr_req_del(struct ptlrpc_nrs_policy *policy,
1159 struct ptlrpc_nrs_request *nrq)
1161 struct nrs_orr_data *orrd;
1162 struct nrs_orr_object *orro;
1165 orro = container_of(nrs_request_resource(nrq),
1166 struct nrs_orr_object, oo_res);
1167 orrd = container_of(nrs_request_resource(nrq)->res_parent,
1168 struct nrs_orr_data, od_res);
1170 LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
1172 is_root = &nrq->nr_node == binheap_root(orrd->od_binheap);
1174 binheap_remove(orrd->od_binheap, &nrq->nr_node);
1178 * If we just deleted the node at the root of the binheap, we may have
1179 * to adjust round numbers.
1181 if (unlikely(is_root)) {
1182 /** Peek at the next request to be served */
1183 struct binheap_node *node = binheap_root(orrd->od_binheap);
1185 /** No more requests */
1186 if (unlikely(node == NULL)) {
1189 nrq = container_of(node, struct ptlrpc_nrs_request,
1192 if (orrd->od_round < nrq->nr_u.orr.or_round)
1193 orrd->od_round = nrq->nr_u.orr.or_round;
1199 * Called right after the request @nrq finishes being handled by ORR policy
1200 * instance \a policy.
1202 * @policy the policy that handled the request
1203 * @nrq the request that was handled
1205 static void nrs_orr_req_stop(struct ptlrpc_nrs_policy *policy,
1206 struct ptlrpc_nrs_request *nrq)
1208 /** NB: resource control, credits etc can be added here */
1210 "NRS: finished handling ORR request for object with FID "DFID", from OST with index %u, with round %llu\n",
1211 PFID(&nrq->nr_u.orr.or_key.ok_fid), nrq->nr_u.orr.or_key.ok_idx,
1212 nrq->nr_u.orr.or_round);
1216 * Called right after the request @nrq finishes being handled by TRR policy
1219 * @policy the policy that handled the request
1220 * @nrq the request that was handled
1222 static void nrs_trr_req_stop(struct ptlrpc_nrs_policy *policy,
1223 struct ptlrpc_nrs_request *nrq)
1225 /** NB: resource control, credits etc can be added here */
1227 "NRS: finished handling TRR request from OST with index %u, with round %llu\n",
1228 nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round);
1236 * This allows to bundle the policy name into the lprocfs_vars::data pointer
1237 * so that lprocfs read/write functions can be used by both the ORR and TRR
1240 static struct nrs_lprocfs_orr_data {
1241 struct ptlrpc_service *svc;
1243 } lprocfs_orr_data = {
1244 .name = NRS_POL_NAME_ORR
1245 }, lprocfs_trr_data = {
1246 .name = NRS_POL_NAME_TRR
1250 * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
1251 * for ORR/TRR policy instances on both the regular and high-priority NRS head
1252 * of a service, as long as a policy instance is not in the
1253 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1254 * state are skipped later by nrs_orr_ctl().
1256 * Quantum values are in # of RPCs, and the output is in YAML format.
1263 * XXX: the CRR-N version of this, ptlrpc_lprocfs_rd_nrs_crrn_quantum() is
1264 * almost identical; it can be reworked and then reused for ORR/TRR.
1267 ptlrpc_lprocfs_nrs_orr_quantum_seq_show(struct seq_file *m, void *data)
1269 struct nrs_lprocfs_orr_data *orr_data = m->private;
1270 struct ptlrpc_service *svc = orr_data->svc;
1275 * Perform two separate calls to this as only one of the NRS heads'
1276 * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
1277 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1279 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1281 NRS_CTL_ORR_RD_QUANTUM,
1284 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_REG "%-5d\n", quantum);
1286 * Ignore -ENODEV as the regular NRS head's policy may be in the
1287 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1289 } else if (rc != -ENODEV) {
1294 * We know the ost_io service which is the only one ORR/TRR policies are
1295 * compatible with, do have an HP NRS head, but it may be best to guard
1296 * against a possible change of this in the future.
1298 if (!nrs_svc_has_hp(svc))
1301 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1302 orr_data->name, NRS_CTL_ORR_RD_QUANTUM,
1305 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
1307 * Ignore -ENODEV as the high priority NRS head's policy may be
1308 * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1310 } else if (rc != -ENODEV) {
1320 * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
1321 * for ORR/TRR policy instances of a service. The user can set the quantum size
1322 * for the regular and high priority NRS head separately by specifying each
1323 * value, or both together in a single invocation.
1327 * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=req_quantum:64, to set the
1328 * request quantum size of the ORR policy instance on the regular NRS head of
1329 * the ost_io service to 64
1331 * lctl set_param ost.OSS.ost_io.nrs_trr_quantum=hp_quantum:8 to set the request
1332 * quantum size of the TRR policy instance on the high priority NRS head of the
1333 * ost_io service to 8
1335 * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=32, to set both the request
1336 * quantum size of the ORR policy instance on both the regular and the high
1337 * priority NRS head of the ost_io service to 32
1339 * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
1340 * are skipped later by nrs_orr_ctl().
1342 * XXX: the CRR-N version of this, ptlrpc_lprocfs_wr_nrs_crrn_quantum() is
1343 * almost identical; it can be reworked and then reused for ORR/TRR.
1346 ptlrpc_lprocfs_nrs_orr_quantum_seq_write(struct file *file,
1347 const char __user *buffer,
1348 size_t count, loff_t *off)
1350 struct seq_file *m = file->private_data;
1351 struct nrs_lprocfs_orr_data *orr_data = m->private;
1352 struct ptlrpc_service *svc = orr_data->svc;
1353 enum ptlrpc_nrs_queue_type queue = 0;
1354 char kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
1358 /** lprocfs_find_named_value() modifies its argument, so keep a copy */
1363 if (count > (sizeof(kernbuf) - 1))
1366 if (copy_from_user(kernbuf, buffer, count))
1369 kernbuf[count] = '\0';
1374 * Check if the regular quantum value has been specified
1376 val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
1378 if (val != kernbuf) {
1379 rc = kstrtol(val, 10, &quantum_reg);
1382 queue |= PTLRPC_NRS_QUEUE_REG;
1388 * Check if the high priority quantum value has been specified
1390 val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
1392 if (val != kernbuf) {
1393 if (!nrs_svc_has_hp(svc))
1396 rc = kstrtol(val, 10, &quantum_hp);
1400 queue |= PTLRPC_NRS_QUEUE_HP;
1404 * If none of the queues has been specified, look for a valid numerical
1408 rc = kstrtol(kernbuf, 10, &quantum_reg);
1412 queue = PTLRPC_NRS_QUEUE_REG;
1414 if (nrs_svc_has_hp(svc)) {
1415 queue |= PTLRPC_NRS_QUEUE_HP;
1416 quantum_hp = quantum_reg;
1420 if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
1421 ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
1422 (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
1423 ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
1427 * We change the values on regular and HP NRS heads separately, so that
1428 * we do not exit early from ptlrpc_nrs_policy_control() with an error
1429 * returned by nrs_policy_ctl_locked(), in cases where the user has not
1430 * started the policy on either the regular or HP NRS head; i.e. we are
1431 * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1432 * only if the operation fails with -ENODEV on all heads that have been
1433 * specified by the command; if at least one operation succeeds,
1434 * success is returned.
1436 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1437 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1439 NRS_CTL_ORR_WR_QUANTUM, false,
1441 if ((rc < 0 && rc != -ENODEV) ||
1442 (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1446 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1447 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1449 NRS_CTL_ORR_WR_QUANTUM, false,
1451 if ((rc2 < 0 && rc2 != -ENODEV) ||
1452 (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1456 return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1459 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_quantum);
1461 #define LPROCFS_NRS_OFF_NAME_REG "reg_offset_type:"
1462 #define LPROCFS_NRS_OFF_NAME_HP "hp_offset_type:"
1464 #define LPROCFS_NRS_OFF_NAME_PHYSICAL "physical"
1465 #define LPROCFS_NRS_OFF_NAME_LOGICAL "logical"
1468 * Retrieves the offset type used by ORR/TRR policy instances on both the
1469 * regular and high-priority NRS head of a service, as long as a policy
1470 * instance is not in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state;
1471 * policy instances in this state are skipped later by nrs_orr_ctl().
1473 * Offset type information is a (physical|logical) string, and output is
1478 * reg_offset_type:physical
1479 * hp_offset_type:logical
1482 ptlrpc_lprocfs_nrs_orr_offset_type_seq_show(struct seq_file *m, void *data)
1484 struct nrs_lprocfs_orr_data *orr_data = m->private;
1485 struct ptlrpc_service *svc = orr_data->svc;
1490 * Perform two separate calls to this as only one of the NRS heads'
1491 * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1492 * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1494 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1495 orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1498 seq_printf(m, LPROCFS_NRS_OFF_NAME_REG"%s\n",
1499 physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1500 LPROCFS_NRS_OFF_NAME_LOGICAL);
1502 * Ignore -ENODEV as the regular NRS head's policy may be in the
1503 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1505 } else if (rc != -ENODEV) {
1510 * We know the ost_io service which is the only one ORR/TRR policies are
1511 * compatible with, do have an HP NRS head, but it may be best to guard
1512 * against a possible change of this in the future.
1514 if (!nrs_svc_has_hp(svc))
1517 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1518 orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1521 seq_printf(m, LPROCFS_NRS_OFF_NAME_HP"%s\n",
1522 physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1523 LPROCFS_NRS_OFF_NAME_LOGICAL);
1525 * Ignore -ENODEV as the high priority NRS head's policy may be
1526 * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1528 } else if (rc != -ENODEV) {
1537 * Max valid command string is the size of the labels, plus "physical" twice.
1538 * plus a separating ' '
1540 #define LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD \
1541 sizeof(LPROCFS_NRS_OFF_NAME_REG LPROCFS_NRS_OFF_NAME_PHYSICAL " " \
1542 LPROCFS_NRS_OFF_NAME_HP LPROCFS_NRS_OFF_NAME_PHYSICAL)
1545 * Sets the type of offsets used to order RPCs in ORR/TRR policy instances. The
1546 * user can set offset type for the regular or high priority NRS head
1547 * separately by specifying each value, or both together in a single invocation.
1551 * lctl set_param ost.OSS.ost_io.nrs_orr_offset_type=
1552 * reg_offset_type:physical, to enable the ORR policy instance on the regular
1553 * NRS head of the ost_io service to use physical disk offset ordering.
1555 * lctl set_param ost.OSS.ost_io.nrs_trr_offset_type=logical, to enable the TRR
1556 * policy instances on both the regular ang high priority NRS heads of the
1557 * ost_io service to use logical file offset ordering.
1559 * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1560 * are skipped later by nrs_orr_ctl().
1563 ptlrpc_lprocfs_nrs_orr_offset_type_seq_write(struct file *file,
1564 const char __user *buffer,
1568 struct seq_file *m = file->private_data;
1569 struct nrs_lprocfs_orr_data *orr_data = m->private;
1570 struct ptlrpc_service *svc = orr_data->svc;
1571 enum ptlrpc_nrs_queue_type queue = 0;
1572 char kernbuf[LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD];
1581 if (count > (sizeof(kernbuf) - 1))
1584 if (copy_from_user(kernbuf, buffer, count))
1587 kernbuf[count] = '\0';
1592 * Check if the regular offset type has been specified
1594 val_reg = lprocfs_find_named_value(kernbuf,
1595 LPROCFS_NRS_OFF_NAME_REG,
1597 if (val_reg != kernbuf)
1598 queue |= PTLRPC_NRS_QUEUE_REG;
1603 * Check if the high priority offset type has been specified
1605 val_hp = lprocfs_find_named_value(kernbuf, LPROCFS_NRS_OFF_NAME_HP,
1607 if (val_hp != kernbuf) {
1608 if (!nrs_svc_has_hp(svc))
1611 queue |= PTLRPC_NRS_QUEUE_HP;
1615 * If none of the queues has been specified, there may be a valid
1616 * command string at the start of the buffer.
1619 queue = PTLRPC_NRS_QUEUE_REG;
1621 if (nrs_svc_has_hp(svc))
1622 queue |= PTLRPC_NRS_QUEUE_HP;
1625 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1626 if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1627 sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1628 physical_reg = true;
1629 else if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_LOGICAL,
1630 sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1631 physical_reg = false;
1636 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1637 if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1638 sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1640 else if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_LOGICAL,
1641 sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1642 physical_hp = false;
1648 * We change the values on regular and HP NRS heads separately, so that
1649 * we do not exit early from ptlrpc_nrs_policy_control() with an error
1650 * returned by nrs_policy_ctl_locked(), in cases where the user has not
1651 * started the policy on either the regular or HP NRS head; i.e. we are
1652 * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1653 * only if the operation fails with -ENODEV on all heads that have been
1654 * specified by the command; if at least one operation succeeds,
1655 * success is returned.
1657 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1658 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1660 NRS_CTL_ORR_WR_OFF_TYPE, false,
1662 if ((rc < 0 && rc != -ENODEV) ||
1663 (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1667 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1668 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1670 NRS_CTL_ORR_WR_OFF_TYPE, false,
1672 if ((rc2 < 0 && rc2 != -ENODEV) ||
1673 (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1677 return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1680 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_offset_type);
1682 #define NRS_LPROCFS_REQ_SUPP_NAME_REG "reg_supported:"
1683 #define NRS_LPROCFS_REQ_SUPP_NAME_HP "hp_supported:"
1685 #define LPROCFS_NRS_SUPP_NAME_READS "reads"
1686 #define LPROCFS_NRS_SUPP_NAME_WRITES "writes"
1687 #define LPROCFS_NRS_SUPP_NAME_READWRITES "reads_and_writes"
1690 * Translates enum nrs_orr_supp values to a corresponding string.
1692 static const char *nrs_orr_supp2str(enum nrs_orr_supp supp)
1698 return LPROCFS_NRS_SUPP_NAME_READS;
1700 return LPROCFS_NRS_SUPP_NAME_WRITES;
1702 return LPROCFS_NRS_SUPP_NAME_READWRITES;
1707 * Translates strings to the corresponding enum nrs_orr_supp value
1709 static enum nrs_orr_supp nrs_orr_str2supp(const char *val)
1711 if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READWRITES,
1712 sizeof(LPROCFS_NRS_SUPP_NAME_READWRITES) - 1) == 0)
1714 else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READS,
1715 sizeof(LPROCFS_NRS_SUPP_NAME_READS) - 1) == 0)
1716 return NOS_OST_READ;
1717 else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_WRITES,
1718 sizeof(LPROCFS_NRS_SUPP_NAME_WRITES) - 1) == 0)
1719 return NOS_OST_WRITE;
1725 * Retrieves the type of RPCs handled at the point of invocation by ORR/TRR
1726 * policy instances on both the regular and high-priority NRS head of a service,
1727 * as long as a policy instance is not in the
1728 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1729 * state are skipped later by nrs_orr_ctl().
1731 * Supported RPC type information is a (reads|writes|reads_and_writes) string,
1732 * and output is in YAML format.
1736 * reg_supported:reads
1737 * hp_supported:reads_and_writes
1740 ptlrpc_lprocfs_nrs_orr_supported_seq_show(struct seq_file *m, void *data)
1742 struct nrs_lprocfs_orr_data *orr_data = m->private;
1743 struct ptlrpc_service *svc = orr_data->svc;
1744 enum nrs_orr_supp supported;
1748 * Perform two separate calls to this as only one of the NRS heads'
1749 * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1750 * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1752 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1754 NRS_CTL_ORR_RD_SUPP_REQ, true,
1758 seq_printf(m, NRS_LPROCFS_REQ_SUPP_NAME_REG"%s\n",
1759 nrs_orr_supp2str(supported));
1761 * Ignore -ENODEV as the regular NRS head's policy may be in the
1762 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1764 } else if (rc != -ENODEV) {
1769 * We know the ost_io service which is the only one ORR/TRR policies are
1770 * compatible with, do have an HP NRS head, but it may be best to guard
1771 * against a possible change of this in the future.
1773 if (!nrs_svc_has_hp(svc))
1776 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1778 NRS_CTL_ORR_RD_SUPP_REQ, true,
1781 seq_printf(m, NRS_LPROCFS_REQ_SUPP_NAME_HP"%s\n",
1782 nrs_orr_supp2str(supported));
1784 * Ignore -ENODEV as the high priority NRS head's policy may be
1785 * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1787 } else if (rc != -ENODEV) {
1797 * Max valid command string is the size of the labels, plus "reads_and_writes"
1798 * twice, plus a separating ' '
1800 #define LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD \
1801 sizeof(NRS_LPROCFS_REQ_SUPP_NAME_REG LPROCFS_NRS_SUPP_NAME_READWRITES \
1802 NRS_LPROCFS_REQ_SUPP_NAME_HP LPROCFS_NRS_SUPP_NAME_READWRITES \
1806 * Sets the type of RPCs handled by ORR/TRR policy instances. The user can
1807 * modify this setting for the regular or high priority NRS heads separately, or
1808 * both together in a single invocation.
1812 * lctl set_param ost.OSS.ost_io.nrs_orr_supported=
1813 * "reg_supported:reads", to enable the ORR policy instance on the regular NRS
1814 * head of the ost_io service to handle OST_READ RPCs.
1816 * lctl set_param ost.OSS.ost_io.nrs_trr_supported=reads_and_writes, to enable
1817 * the TRR policy instances on both the regular ang high priority NRS heads of
1818 * the ost_io service to use handle OST_READ and OST_WRITE RPCs.
1820 * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1821 * are skipped later by nrs_orr_ctl().
1824 ptlrpc_lprocfs_nrs_orr_supported_seq_write(struct file *file,
1825 const char __user *buffer,
1829 struct seq_file *m = file->private_data;
1830 struct nrs_lprocfs_orr_data *orr_data = m->private;
1831 struct ptlrpc_service *svc = orr_data->svc;
1832 enum ptlrpc_nrs_queue_type queue = 0;
1833 char kernbuf[LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD];
1836 enum nrs_orr_supp supp_reg;
1837 enum nrs_orr_supp supp_hp;
1842 if (count > (sizeof(kernbuf) - 1))
1845 if (copy_from_user(kernbuf, buffer, count))
1848 kernbuf[count] = '\0';
1853 * Check if the regular supported requests setting has been specified
1855 val_reg = lprocfs_find_named_value(kernbuf,
1856 NRS_LPROCFS_REQ_SUPP_NAME_REG,
1858 if (val_reg != kernbuf)
1859 queue |= PTLRPC_NRS_QUEUE_REG;
1864 * Check if the high priority supported requests setting has been
1867 val_hp = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_REQ_SUPP_NAME_HP,
1869 if (val_hp != kernbuf) {
1870 if (!nrs_svc_has_hp(svc))
1873 queue |= PTLRPC_NRS_QUEUE_HP;
1877 * If none of the queues has been specified, there may be a valid
1878 * command string at the start of the buffer.
1881 queue = PTLRPC_NRS_QUEUE_REG;
1883 if (nrs_svc_has_hp(svc))
1884 queue |= PTLRPC_NRS_QUEUE_HP;
1887 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1888 supp_reg = nrs_orr_str2supp(val_reg);
1889 if (supp_reg == -EINVAL)
1893 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1894 supp_hp = nrs_orr_str2supp(val_hp);
1895 if (supp_hp == -EINVAL)
1900 * We change the values on regular and HP NRS heads separately, so that
1901 * we do not exit early from ptlrpc_nrs_policy_control() with an error
1902 * returned by nrs_policy_ctl_locked(), in cases where the user has not
1903 * started the policy on either the regular or HP NRS head; i.e. we are
1904 * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1905 * only if the operation fails with -ENODEV on all heads that have been
1906 * specified by the command; if at least one operation succeeds,
1907 * success is returned.
1909 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1910 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1912 NRS_CTL_ORR_WR_SUPP_REQ, false,
1914 if ((rc < 0 && rc != -ENODEV) ||
1915 (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1919 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1920 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1922 NRS_CTL_ORR_WR_SUPP_REQ, false,
1924 if ((rc2 < 0 && rc2 != -ENODEV) ||
1925 (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1929 return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1932 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_supported);
1934 static int nrs_orr_lprocfs_init(struct ptlrpc_service *svc)
1938 struct ldebugfs_vars nrs_orr_lprocfs_vars[] = {
1939 { .name = "nrs_orr_quantum",
1940 .fops = &ptlrpc_lprocfs_nrs_orr_quantum_fops },
1941 { .name = "nrs_orr_offset_type",
1942 .fops = &ptlrpc_lprocfs_nrs_orr_offset_type_fops },
1943 { .name = "nrs_orr_supported",
1944 .fops = &ptlrpc_lprocfs_nrs_orr_supported_fops },
1948 if (!svc->srv_debugfs_entry)
1951 lprocfs_orr_data.svc = svc;
1953 for (i = 0; i < ARRAY_SIZE(nrs_orr_lprocfs_vars); i++)
1954 nrs_orr_lprocfs_vars[i].data = &lprocfs_orr_data;
1956 ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_orr_lprocfs_vars, NULL);
1961 static const struct ptlrpc_nrs_pol_ops nrs_orr_ops = {
1962 .op_policy_init = nrs_orr_init,
1963 .op_policy_start = nrs_orr_start,
1964 .op_policy_stop = nrs_orr_stop,
1965 .op_policy_ctl = nrs_orr_ctl,
1966 .op_res_get = nrs_orr_res_get,
1967 .op_res_put = nrs_orr_res_put,
1968 .op_req_get = nrs_orr_req_get,
1969 .op_req_enqueue = nrs_orr_req_add,
1970 .op_req_dequeue = nrs_orr_req_del,
1971 .op_req_stop = nrs_orr_req_stop,
1972 .op_lprocfs_init = nrs_orr_lprocfs_init,
1975 struct ptlrpc_nrs_pol_conf nrs_conf_orr = {
1976 .nc_name = NRS_POL_NAME_ORR,
1977 .nc_ops = &nrs_orr_ops,
1978 .nc_compat = nrs_policy_compat_one,
1979 .nc_compat_svc_name = "ost_io",
1983 * TRR, Target-based Round Robin policy
1985 * TRR reuses much of the functions and data structures of ORR
1987 static int nrs_trr_lprocfs_init(struct ptlrpc_service *svc)
1991 struct ldebugfs_vars nrs_trr_lprocfs_vars[] = {
1992 { .name = "nrs_trr_quantum",
1993 .fops = &ptlrpc_lprocfs_nrs_orr_quantum_fops },
1994 { .name = "nrs_trr_offset_type",
1995 .fops = &ptlrpc_lprocfs_nrs_orr_offset_type_fops },
1996 { .name = "nrs_trr_supported",
1997 .fops = &ptlrpc_lprocfs_nrs_orr_supported_fops },
2001 if (!svc->srv_debugfs_entry)
2004 lprocfs_trr_data.svc = svc;
2006 for (i = 0; i < ARRAY_SIZE(nrs_trr_lprocfs_vars); i++)
2007 nrs_trr_lprocfs_vars[i].data = &lprocfs_trr_data;
2009 ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_trr_lprocfs_vars, NULL);
2015 * Reuse much of the ORR functionality for TRR.
2017 static const struct ptlrpc_nrs_pol_ops nrs_trr_ops = {
2018 .op_policy_init = nrs_orr_init,
2019 .op_policy_start = nrs_orr_start,
2020 .op_policy_stop = nrs_orr_stop,
2021 .op_policy_ctl = nrs_orr_ctl,
2022 .op_res_get = nrs_trr_res_get,
2023 .op_req_get = nrs_orr_req_get,
2024 .op_req_enqueue = nrs_orr_req_add,
2025 .op_req_dequeue = nrs_orr_req_del,
2026 .op_req_stop = nrs_trr_req_stop,
2027 .op_lprocfs_init = nrs_trr_lprocfs_init,
2030 struct ptlrpc_nrs_pol_conf nrs_conf_trr = {
2031 .nc_name = NRS_POL_NAME_TRR,
2032 .nc_ops = &nrs_trr_ops,
2033 .nc_compat = nrs_policy_compat_one,
2034 .nc_compat_svc_name = "ost_io",
2037 /** @} ORR/TRR policy */