4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2013, Intel Corporation.
25 * Copyright 2012 Xyratex Technology Limited
28 * lustre/ptlrpc/nrs_orr.c
30 * Network Request Scheduler (NRS) ORR and TRR policies
32 * Request scheduling in a Round-Robin manner over backend-fs objects and OSTs
35 * Author: Liang Zhen <liang@whamcloud.com>
36 * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
38 #ifdef HAVE_SERVER_SUPPORT
44 #define DEBUG_SUBSYSTEM S_RPC
45 #include <obd_support.h>
46 #include <obd_class.h>
47 #include <lustre_net.h>
48 #include <lustre/lustre_idl.h>
49 #include <lustre_req_layout.h>
50 #include "ptlrpc_internal.h"
53 * \name ORR/TRR policy
55 * ORR/TRR (Object-based Round Robin/Target-based Round Robin) NRS policies
57 * ORR performs batched Round Robin shceduling of brw RPCs, based on the FID of
58 * the backend-fs object that the brw RPC pertains to; the TRR policy performs
59 * batched Round Robin scheduling of brw RPCs, based on the OST index that the
60 * RPC pertains to. Both policies also order RPCs in each batch in ascending
61 * offset order, which is lprocfs-tunable between logical file offsets, and
62 * physical disk offsets, as reported by fiemap.
64 * The TRR policy reuses much of the functionality of ORR. These two scheduling
65 * algorithms could alternatively be implemented under a single NRS policy, that
66 * uses an lprocfs tunable in order to switch between the two types of
67 * scheduling behaviour. The two algorithms have been implemented as separate
68 * policies for reasons of clarity to the user, and to avoid issues that would
69 * otherwise arise at the point of switching between behaviours in the case of
70 * having a single policy, such as resource cleanup for nrs_orr_object
71 * instances. It is possible that this may need to be re-examined in the future,
72 * along with potentially coalescing other policies that perform batched request
73 * scheduling in a Round-Robin manner, all into one policy.
78 #define NRS_POL_NAME_ORR "orr"
79 #define NRS_POL_NAME_TRR "trr"
82 * Checks if the RPC type of \a nrq is currently handled by an ORR/TRR policy
84 * \param[in] orrd the ORR/TRR policy scheduler instance
85 * \param[in] nrq the request
86 * \param[out] opcode the opcode is saved here, just in order to avoid calling
87 * lustre_msg_get_opc() again later
89 * \retval true request type is supported by the policy instance
90 * \retval false request type is not supported by the policy instance
92 static bool nrs_orr_req_supported(struct nrs_orr_data *orrd,
93 struct ptlrpc_nrs_request *nrq, __u32 *opcode)
95 struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
97 __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
101 * XXX: nrs_orr_data::od_supp accessed unlocked.
105 rc = orrd->od_supp & NOS_OST_READ;
108 rc = orrd->od_supp & NOS_OST_WRITE;
119 * Returns the ORR/TRR key fields for the request \a nrq in \a key.
121 * \param[in] orrd the ORR/TRR policy scheduler instance
122 * \param[in] nrq the request
123 * \param[in] opc the request's opcode
124 * \param[in] name the policy name
125 * \param[out] key fields of the key are returned here.
127 * \retval 0 key filled successfully
130 static int nrs_orr_key_fill(struct nrs_orr_data *orrd,
131 struct ptlrpc_nrs_request *nrq, __u32 opc,
132 char *name, struct nrs_orr_key *key)
134 struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
136 struct ost_body *body;
138 bool is_orr = strncmp(name, NRS_POL_NAME_ORR,
139 NRS_POL_NAME_MAX) == 0;
141 LASSERT(req != NULL);
144 * This is an attempt to fill in the request key fields while
145 * moving a request from the regular to the high-priority NRS
146 * head (via ldlm_lock_reorder_req()), but the request key has
147 * been adequately filled when nrs_orr_res_get() was called through
148 * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR/TRR
149 * policy, so there is nothing to do.
151 if ((is_orr && nrq->nr_u.orr.or_orr_set) ||
152 (!is_orr && nrq->nr_u.orr.or_trr_set)) {
153 *key = nrq->nr_u.orr.or_key;
157 if (nrq->nr_u.orr.or_orr_set || nrq->nr_u.orr.or_trr_set)
158 memset(&nrq->nr_u.orr.or_key, 0, sizeof(nrq->nr_u.orr.or_key));
160 ost_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
165 * The request pill for OST_READ and OST_WRITE requests is
166 * initialized in the ost_io service's
167 * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(),
168 * so no need to redo it here.
170 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
174 rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx);
178 nrq->nr_u.orr.or_orr_set = 1;
180 key->ok_idx = ost_idx;
181 nrq->nr_u.orr.or_trr_set = 1;
188 * Populates the range values in \a range with logical offsets obtained via
191 * \param[in] nb niobuf_remote struct array for this request
192 * \param[in] niocount count of niobuf_remote structs for this request
193 * \param[out] range the offset range is returned here
195 static void nrs_orr_range_fill_logical(struct niobuf_remote *nb, int niocount,
196 struct nrs_orr_req_range *range)
198 /* Should we do this at page boundaries ? */
199 range->or_start = nb[0].offset & CFS_PAGE_MASK;
200 range->or_end = (nb[niocount - 1].offset +
201 nb[niocount - 1].len - 1) | ~CFS_PAGE_MASK;
205 * We obtain information just for a single extent, as the request can only be in
206 * a single place in the binary heap anyway.
208 #define ORR_NUM_EXTENTS 1
211 * Converts the logical file offset range in \a range, to a physical disk offset
212 * range in \a range, for a request. Uses obd_get_info() in order to carry out a
213 * fiemap call and obtain backend-fs extent information. The returned range is
214 * in physical block numbers.
216 * \param[in] nrq the request
217 * \param[in] oa obdo struct for this request
218 * \param[in,out] range the offset range in bytes; logical range in, physical
221 * \retval 0 physical offsets obtained successfully
224 static int nrs_orr_range_fill_physical(struct ptlrpc_nrs_request *nrq,
226 struct nrs_orr_req_range *range)
228 struct ptlrpc_request *req = container_of(nrq,
229 struct ptlrpc_request,
231 char fiemap_buf[offsetof(struct ll_user_fiemap,
232 fm_extents[ORR_NUM_EXTENTS])];
233 struct ll_user_fiemap *fiemap = (struct ll_user_fiemap *)fiemap_buf;
234 struct ll_fiemap_info_key key;
239 key = (typeof(key)) {
243 .fm_start = range->or_start,
244 .fm_length = range->or_end - range->or_start,
245 .fm_extent_count = ORR_NUM_EXTENTS
249 rc = obd_get_info(req->rq_svc_thread->t_env, req->rq_export,
250 sizeof(key), &key, NULL, fiemap, NULL);
254 if (fiemap->fm_mapped_extents == 0 ||
255 fiemap->fm_mapped_extents > ORR_NUM_EXTENTS)
256 GOTO(out, rc = -EFAULT);
259 * Calculate the physical offset ranges for the request from the extent
260 * information and the logical request offsets.
262 start = fiemap->fm_extents[0].fe_physical + range->or_start -
263 fiemap->fm_extents[0].fe_logical;
264 end = start + range->or_end - range->or_start;
266 range->or_start = start;
269 nrq->nr_u.orr.or_physical_set = 1;
275 * Sets the offset range the request covers; either in logical file
276 * offsets or in physical disk offsets.
278 * \param[in] nrq the request
279 * \param[in] orrd the ORR/TRR policy scheduler instance
280 * \param[in] opc the request's opcode
281 * \param[in] moving_req is the request in the process of moving onto the
282 * high-priority NRS head?
284 * \retval 0 range filled successfully
287 static int nrs_orr_range_fill(struct ptlrpc_nrs_request *nrq,
288 struct nrs_orr_data *orrd, __u32 opc,
291 struct ptlrpc_request *req = container_of(nrq,
292 struct ptlrpc_request,
294 struct obd_ioobj *ioo;
295 struct niobuf_remote *nb;
296 struct ost_body *body;
297 struct nrs_orr_req_range range;
302 * If we are scheduling using physical disk offsets, but we have filled
303 * the offset information in the request previously
304 * (i.e. ldlm_lock_reorder_req() is moving the request to the
305 * high-priority NRS head), there is no need to do anything, and we can
306 * exit. Moreover than the lack of need, we would be unable to perform
307 * the obd_get_info() call required in nrs_orr_range_fill_physical(),
308 * because ldlm_lock_reorder_lock() calls into here while holding a
309 * spinlock, and retrieving fiemap information via obd_get_info() is a
310 * potentially sleeping operation.
312 if (orrd->od_physical && nrq->nr_u.orr.or_physical_set)
315 ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
317 GOTO(out, rc = -EFAULT);
319 niocount = ioo->ioo_bufcnt;
321 nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
323 GOTO(out, rc = -EFAULT);
326 * Use logical information from niobuf_remote structures.
328 nrs_orr_range_fill_logical(nb, niocount, &range);
331 * Obtain physical offsets if selected, and this is an OST_READ RPC
332 * RPC. We do not enter this block if moving_req is set which indicates
333 * that the request is being moved to the high-priority NRS head by
334 * ldlm_lock_reorder_req(), as that function calls in here while holding
335 * a spinlock, and nrs_orr_range_physical() can sleep, so we just use
336 * logical file offsets for the range values for such requests.
338 if (orrd->od_physical && opc == OST_READ && !moving_req) {
339 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
341 GOTO(out, rc = -EFAULT);
344 * Translate to physical block offsets from backend filesystem
346 * Ignore return values; if obtaining the physical offsets
347 * fails, use the logical offsets.
349 nrs_orr_range_fill_physical(nrq, &body->oa, &range);
352 nrq->nr_u.orr.or_range = range;
358 * Generates a character string that can be used in order to register uniquely
359 * named libcfs_hash and slab objects for ORR/TRR policy instances. The
360 * character string is unique per policy instance, as it includes the policy's
361 * name, the CPT number, and a {reg|hp} token, and there is one policy instance
362 * per NRS head on each CPT, and the policy is only compatible with the ost_io
365 * \param[in] policy the policy instance
366 * \param[out] name the character array that will hold the generated name
368 static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name)
370 snprintf(name, NRS_ORR_OBJ_NAME_MAX, "%s%s%s%d",
371 "nrs_", policy->pol_desc->pd_name,
372 policy->pol_nrs->nrs_queue_type == PTLRPC_NRS_QUEUE_REG ?
373 "_reg_" : "_hp_", nrs_pol2cptid(policy));
377 * ORR/TRR hash operations
379 #define NRS_ORR_BITS 24
380 #define NRS_ORR_BKT_BITS 12
381 #define NRS_ORR_HASH_FLAGS (CFS_HASH_RW_BKTLOCK | CFS_HASH_ASSERT_EMPTY)
383 #define NRS_TRR_BITS 4
384 #define NRS_TRR_BKT_BITS 2
385 #define NRS_TRR_HASH_FLAGS CFS_HASH_RW_BKTLOCK
387 static unsigned nrs_orr_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask)
389 return cfs_hash_djb2_hash(key, sizeof(struct nrs_orr_key), mask);
392 static void *nrs_orr_hop_key(cfs_hlist_node_t *hnode)
394 struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
395 struct nrs_orr_object,
397 return &orro->oo_key;
400 static int nrs_orr_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
402 struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
403 struct nrs_orr_object,
406 return lu_fid_eq(&orro->oo_key.ok_fid,
407 &((struct nrs_orr_key *)key)->ok_fid);
410 static void *nrs_orr_hop_object(cfs_hlist_node_t *hnode)
412 return cfs_hlist_entry(hnode, struct nrs_orr_object, oo_hnode);
415 static void nrs_orr_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
417 struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
418 struct nrs_orr_object,
420 cfs_atomic_inc(&orro->oo_ref);
424 * Removes an nrs_orr_object the hash and frees its memory, if the object has
427 static void nrs_orr_hop_put_free(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
429 struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
430 struct nrs_orr_object,
432 struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent,
433 struct nrs_orr_data, od_res);
434 cfs_hash_bd_t bds[2];
436 if (cfs_atomic_dec_return(&orro->oo_ref) > 1)
439 cfs_hash_lock(hs, 0);
440 cfs_hash_dual_bd_get_and_lock(hs, &orro->oo_key, bds, 1);
443 * Another thread may have won the race and taken a reference on the
446 if (cfs_atomic_read(&orro->oo_ref) > 1)
449 if (bds[1].bd_bucket == NULL)
450 cfs_hash_bd_del_locked(hs, &bds[0], hnode);
452 hnode = cfs_hash_dual_bd_finddel_locked(hs, bds, &orro->oo_key,
454 LASSERT(hnode != NULL);
456 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
460 cfs_hash_dual_bd_unlock(hs, bds, 1);
461 cfs_hash_unlock(hs, 0);
464 static void nrs_orr_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
466 struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
467 struct nrs_orr_object,
469 cfs_atomic_dec(&orro->oo_ref);
472 static int nrs_trr_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
474 struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
475 struct nrs_orr_object,
478 return orro->oo_key.ok_idx == ((struct nrs_orr_key *)key)->ok_idx;
481 static void nrs_trr_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
483 struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
484 struct nrs_orr_object,
486 struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent,
487 struct nrs_orr_data, od_res);
489 LASSERTF(cfs_atomic_read(&orro->oo_ref) == 0,
490 "Busy NRS TRR policy object for OST with index %u, with %d "
491 "refs\n", orro->oo_key.ok_idx, cfs_atomic_read(&orro->oo_ref));
493 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
496 static cfs_hash_ops_t nrs_orr_hash_ops = {
497 .hs_hash = nrs_orr_hop_hash,
498 .hs_key = nrs_orr_hop_key,
499 .hs_keycmp = nrs_orr_hop_keycmp,
500 .hs_object = nrs_orr_hop_object,
501 .hs_get = nrs_orr_hop_get,
502 .hs_put = nrs_orr_hop_put_free,
503 .hs_put_locked = nrs_orr_hop_put,
506 static cfs_hash_ops_t nrs_trr_hash_ops = {
507 .hs_hash = nrs_orr_hop_hash,
508 .hs_key = nrs_orr_hop_key,
509 .hs_keycmp = nrs_trr_hop_keycmp,
510 .hs_object = nrs_orr_hop_object,
511 .hs_get = nrs_orr_hop_get,
512 .hs_put = nrs_orr_hop_put,
513 .hs_put_locked = nrs_orr_hop_put,
514 .hs_exit = nrs_trr_hop_exit,
517 #define NRS_ORR_QUANTUM_DFLT 256
520 * Binary heap predicate.
523 * ptlrpc_nrs_request::nr_u::orr::or_round,
524 * ptlrpc_nrs_request::nr_u::orr::or_sequence, and
525 * ptlrpc_nrs_request::nr_u::orr::or_range to compare two binheap nodes and
526 * produce a binary predicate that indicates their relative priority, so that
527 * the binary heap can perform the necessary sorting operations.
529 * \param[in] e1 the first binheap node to compare
530 * \param[in] e2 the second binheap node to compare
535 static int orr_req_compare(cfs_binheap_node_t *e1, cfs_binheap_node_t *e2)
537 struct ptlrpc_nrs_request *nrq1;
538 struct ptlrpc_nrs_request *nrq2;
540 nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
541 nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
544 * Requests have been scheduled against a different scheduling round.
546 if (nrq1->nr_u.orr.or_round < nrq2->nr_u.orr.or_round)
548 else if (nrq1->nr_u.orr.or_round > nrq2->nr_u.orr.or_round)
552 * Requests have been scheduled against the same scheduling round, but
553 * belong to a different batch, i.e. they pertain to a different
554 * backend-fs object (for ORR policy instances) or OST (for TRR policy
557 if (nrq1->nr_u.orr.or_sequence < nrq2->nr_u.crr.cr_sequence)
559 else if (nrq1->nr_u.orr.or_sequence > nrq2->nr_u.crr.cr_sequence)
563 * If round numbers and sequence numbers are equal, the two requests
564 * have been scheduled on the same round, and belong to the same batch,
565 * which means they pertain to the same backend-fs object (if this is an
566 * ORR policy instance), or to the same OST (if this is a TRR policy
567 * instance), so these requests should be sorted by ascending offset
570 if (nrq1->nr_u.orr.or_range.or_start <
571 nrq2->nr_u.orr.or_range.or_start) {
573 } else if (nrq1->nr_u.orr.or_range.or_start >
574 nrq2->nr_u.orr.or_range.or_start) {
578 * Requests start from the same offset; Dispatch the shorter one
579 * first; perhaps slightly more chances of hitting caches like
582 return nrq1->nr_u.orr.or_range.or_end <
583 nrq2->nr_u.orr.or_range.or_end;
588 * ORR binary heap operations
590 static cfs_binheap_ops_t nrs_orr_heap_ops = {
593 .hop_compare = orr_req_compare,
597 * Prints a warning message if an ORR/TRR policy is started on a service with
600 * \param[in] policy the policy instance
604 static int nrs_orr_init(struct ptlrpc_nrs_policy *policy)
606 if (policy->pol_nrs->nrs_svcpt->scp_service->srv_ncpts > 1) {
607 bool is_orr = strncmp(policy->pol_desc->pd_name,
608 NRS_POL_NAME_ORR, NRS_POL_NAME_MAX) == 0;
610 CWARN("A%s %s NRS policy has been registered on a PTLRPC "
611 "service which has more than one service partition. "
612 "Please be advised that this policy may perform better "
613 "on services with only one partition.\n",
614 is_orr ? "n" : "", policy->pol_desc->pd_name);
620 * Called when an ORR policy instance is started.
622 * \param[in] policy the policy
624 * \retval -ENOMEM OOM error
627 static int nrs_orr_start(struct ptlrpc_nrs_policy *policy)
629 struct nrs_orr_data *orrd;
638 OBD_CPT_ALLOC_PTR(orrd, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
643 * Binary heap instance for sorted incoming requests.
645 orrd->od_binheap = cfs_binheap_create(&nrs_orr_heap_ops,
646 CBH_FLAG_ATOMIC_GROW, 4096, NULL,
647 nrs_pol2cptab(policy),
648 nrs_pol2cptid(policy));
649 if (orrd->od_binheap == NULL)
650 GOTO(failed, rc = -ENOMEM);
652 nrs_orr_genobjname(policy, orrd->od_objname);
655 * Slab cache for NRS ORR/TRR objects.
657 orrd->od_cache = cfs_mem_cache_create(orrd->od_objname,
658 sizeof(struct nrs_orr_object),
660 if (orrd->od_cache == NULL)
661 GOTO(failed, rc = -ENOMEM);
663 if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
664 NRS_POL_NAME_MAX) == 0) {
665 ops = &nrs_orr_hash_ops;
666 cur_bits = NRS_ORR_BITS;
667 max_bits = NRS_ORR_BITS;
668 bkt_bits = NRS_ORR_BKT_BITS;
669 flags = NRS_ORR_HASH_FLAGS;
671 ops = &nrs_trr_hash_ops;
672 cur_bits = NRS_TRR_BITS;
673 max_bits = NRS_TRR_BITS;
674 bkt_bits = NRS_TRR_BKT_BITS;
675 flags = NRS_TRR_HASH_FLAGS;
679 * Hash for finding objects by struct nrs_orr_key.
680 * XXX: For TRR, it might be better to avoid using libcfs_hash?
681 * All that needs to be resolved are OST indices, and they
682 * will stay relatively stable during an OSS node's lifetime.
684 orrd->od_obj_hash = cfs_hash_create(orrd->od_objname, cur_bits,
685 max_bits, bkt_bits, 0,
687 CFS_HASH_MAX_THETA, ops, flags);
688 if (orrd->od_obj_hash == NULL)
689 GOTO(failed, rc = -ENOMEM);
691 /* XXX: Fields accessed unlocked */
692 orrd->od_quantum = NRS_ORR_QUANTUM_DFLT;
693 orrd->od_supp = NOS_DFLT;
694 orrd->od_physical = true;
696 * Set to 1 so that the test inside nrs_orr_req_add() can evaluate to
699 orrd->od_sequence = 1;
701 policy->pol_private = orrd;
706 if (orrd->od_cache) {
707 rc = cfs_mem_cache_destroy(orrd->od_cache);
708 LASSERTF(rc == 0, "Could not destroy od_cache slab\n");
710 if (orrd->od_binheap != NULL)
711 cfs_binheap_destroy(orrd->od_binheap);
719 * Called when an ORR/TRR policy instance is stopped.
721 * Called when the policy has been instructed to transition to the
722 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more
723 * pending requests to serve.
725 * \param[in] policy the policy
727 static void nrs_orr_stop(struct ptlrpc_nrs_policy *policy)
729 struct nrs_orr_data *orrd = policy->pol_private;
732 LASSERT(orrd != NULL);
733 LASSERT(orrd->od_binheap != NULL);
734 LASSERT(orrd->od_obj_hash != NULL);
735 LASSERT(orrd->od_cache != NULL);
736 LASSERT(cfs_binheap_is_empty(orrd->od_binheap));
738 cfs_binheap_destroy(orrd->od_binheap);
739 cfs_hash_putref(orrd->od_obj_hash);
740 cfs_mem_cache_destroy(orrd->od_cache);
746 * Performs a policy-specific ctl function on ORR/TRR policy instances; similar
749 * \param[in] policy the policy instance
750 * \param[in] opc the opcode
751 * \param[in,out] arg used for passing parameters and information
753 * \pre spin_is_locked(&policy->pol_nrs->->nrs_lock)
754 * \post spin_is_locked(&policy->pol_nrs->->nrs_lock)
756 * \retval 0 operation carried successfully
759 int nrs_orr_ctl(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc,
762 LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock));
768 case NRS_CTL_ORR_RD_QUANTUM: {
769 struct nrs_orr_data *orrd = policy->pol_private;
771 *(__u16 *)arg = orrd->od_quantum;
775 case NRS_CTL_ORR_WR_QUANTUM: {
776 struct nrs_orr_data *orrd = policy->pol_private;
778 orrd->od_quantum = *(__u16 *)arg;
779 LASSERT(orrd->od_quantum != 0);
783 case NRS_CTL_ORR_RD_OFF_TYPE: {
784 struct nrs_orr_data *orrd = policy->pol_private;
786 *(bool *)arg = orrd->od_physical;
790 case NRS_CTL_ORR_WR_OFF_TYPE: {
791 struct nrs_orr_data *orrd = policy->pol_private;
793 orrd->od_physical = *(bool *)arg;
797 case NRS_CTL_ORR_RD_SUPP_REQ: {
798 struct nrs_orr_data *orrd = policy->pol_private;
800 *(enum nrs_orr_supp *)arg = orrd->od_supp;
804 case NRS_CTL_ORR_WR_SUPP_REQ: {
805 struct nrs_orr_data *orrd = policy->pol_private;
807 orrd->od_supp = *(enum nrs_orr_supp *)arg;
808 LASSERT((orrd->od_supp & NOS_OST_RW) != 0);
816 * Obtains resources for ORR/TRR policy instances. The top-level resource lives
817 * inside \e nrs_orr_data and the second-level resource inside
818 * \e nrs_orr_object instances.
820 * \param[in] policy the policy for which resources are being taken for
822 * \param[in] nrq the request for which resources are being taken
823 * \param[in] parent parent resource, embedded in nrs_orr_data for the
825 * \param[out] resp used to return resource references
826 * \param[in] moving_req signifies limited caller context; used to perform
827 * memory allocations in an atomic context in this
830 * \retval 0 we are returning a top-level, parent resource, one that is
831 * embedded in an nrs_orr_data object
832 * \retval 1 we are returning a bottom-level resource, one that is embedded
833 * in an nrs_orr_object object
835 * \see nrs_resource_get_safe()
837 int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
838 struct ptlrpc_nrs_request *nrq,
839 const struct ptlrpc_nrs_resource *parent,
840 struct ptlrpc_nrs_resource **resp, bool moving_req)
842 struct nrs_orr_data *orrd;
843 struct nrs_orr_object *orro;
844 struct nrs_orr_object *tmp;
845 struct nrs_orr_key key = { { { 0 } } };
850 * struct nrs_orr_data is requested.
852 if (parent == NULL) {
853 *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res;
857 orrd = container_of(parent, struct nrs_orr_data, od_res);
860 * If the request type is not supported, fail the enqueuing; the RPC
861 * will be handled by the fallback NRS policy.
863 if (!nrs_orr_req_supported(orrd, nrq, &opc))
867 * Fill in the key for the request; OST FID for ORR policy instances,
868 * and OST index for TRR policy instances.
870 rc = nrs_orr_key_fill(orrd, nrq, opc, policy->pol_desc->pd_name, &key);
875 * Set the offset range the request covers
877 rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req);
881 orro = cfs_hash_lookup(orrd->od_obj_hash, &key);
885 OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache,
886 nrs_pol2cptab(policy), nrs_pol2cptid(policy),
887 (moving_req ? CFS_ALLOC_ATOMIC :
893 cfs_atomic_set(&orro->oo_ref, 1);
895 tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key,
898 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
903 * For debugging purposes
905 nrq->nr_u.orr.or_key = orro->oo_key;
907 *resp = &orro->oo_res;
913 * Called when releasing references to the resource hierachy obtained for a
914 * request for scheduling using ORR/TRR policy instances
916 * \param[in] policy the policy the resource belongs to
917 * \param[in] res the resource to be released
919 static void nrs_orr_res_put(struct ptlrpc_nrs_policy *policy,
920 const struct ptlrpc_nrs_resource *res)
922 struct nrs_orr_data *orrd;
923 struct nrs_orr_object *orro;
926 * Do nothing for freeing parent, nrs_orr_data resources.
928 if (res->res_parent == NULL)
931 orro = container_of(res, struct nrs_orr_object, oo_res);
932 orrd = container_of(res->res_parent, struct nrs_orr_data, od_res);
934 cfs_hash_put(orrd->od_obj_hash, &orro->oo_hnode);
938 * Called when polling an ORR/TRR policy instance for a request so that it can
939 * be served. Returns the request that is at the root of the binary heap, as
940 * that is the lowest priority one (i.e. libcfs_heap is an implementation of a
943 * \param[in] policy the policy instance being polled
944 * \param[in] peek when set, signifies that we just want to examine the
945 * request, and not handle it, so the request is not removed
947 * \param[in] force force the policy to return a request; unused in this policy
949 * \retval the request to be handled
950 * \retval NULL no request available
952 * \see ptlrpc_nrs_req_get_nolock()
953 * \see nrs_request_get()
956 struct ptlrpc_nrs_request *nrs_orr_req_get(struct ptlrpc_nrs_policy *policy,
957 bool peek, bool force)
959 struct nrs_orr_data *orrd = policy->pol_private;
960 cfs_binheap_node_t *node = cfs_binheap_root(orrd->od_binheap);
961 struct ptlrpc_nrs_request *nrq;
963 nrq = unlikely(node == NULL) ? NULL :
964 container_of(node, struct ptlrpc_nrs_request, nr_node);
966 if (likely(!peek && nrq != NULL)) {
967 struct nrs_orr_object *orro;
969 orro = container_of(nrs_request_resource(nrq),
970 struct nrs_orr_object, oo_res);
972 LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
974 cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
977 if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
978 NRS_POL_NAME_MAX) == 0)
980 "NRS: starting to handle %s request for object "
981 "with FID "DFID", from OST with index %u, with "
982 "round "LPU64"\n", NRS_POL_NAME_ORR,
983 PFID(&orro->oo_key.ok_fid),
984 nrq->nr_u.orr.or_key.ok_idx,
985 nrq->nr_u.orr.or_round);
988 "NRS: starting to handle %s request from OST "
989 "with index %u, with round "LPU64"\n",
990 NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
991 nrq->nr_u.orr.or_round);
993 /** Peek at the next request to be served */
994 node = cfs_binheap_root(orrd->od_binheap);
996 /** No more requests */
997 if (unlikely(node == NULL)) {
1000 struct ptlrpc_nrs_request *next;
1002 next = container_of(node, struct ptlrpc_nrs_request,
1005 if (orrd->od_round < next->nr_u.orr.or_round)
1006 orrd->od_round = next->nr_u.orr.or_round;
1014 * Sort-adds request \a nrq to an ORR/TRR \a policy instance's set of queued
1015 * requests in the policy's binary heap.
1017 * A scheduling round is a stream of requests that have been sorted in batches
1018 * according to the backend-fs object (for ORR policy instances) or OST (for TRR
1019 * policy instances) that they pertain to (as identified by its IDIF FID or OST
1020 * index respectively); there can be only one batch for each object or OST in
1021 * each round. The batches are of maximum size nrs_orr_data:od_quantum. When a
1022 * new request arrives for scheduling for an object or OST that has exhausted
1023 * its quantum in its current round, the request will be scheduled on the next
1024 * scheduling round. Requests are allowed to be scheduled against a round until
1025 * all requests for the round are serviced, so an object or OST might miss a
1026 * round if requests are not scheduled for it for a long enough period of time.
1027 * Objects or OSTs that miss a round will continue with having their next
1028 * request scheduled, starting at the round that requests are being dispatched
1029 * for, at the time of arrival of this request.
1031 * Requests are tagged with the round number and a sequence number; the sequence
1032 * number indicates the relative ordering amongst the batches of requests in a
1033 * round, and is identical for all requests in a batch, as is the round number.
1034 * The round and sequence numbers are used by orr_req_compare() in order to use
1035 * nrs_orr_data::od_binheap in order to maintain an ordered set of rounds, with
1036 * each round consisting of an ordered set of batches of requests, and each
1037 * batch consisting of an ordered set of requests according to their logical
1038 * file or physical disk offsets.
1040 * \param[in] policy the policy
1041 * \param[in] nrq the request to add
1043 * \retval 0 request successfully added
1044 * \retval != 0 error
1046 static int nrs_orr_req_add(struct ptlrpc_nrs_policy *policy,
1047 struct ptlrpc_nrs_request *nrq)
1049 struct nrs_orr_data *orrd;
1050 struct nrs_orr_object *orro;
1053 orro = container_of(nrs_request_resource(nrq),
1054 struct nrs_orr_object, oo_res);
1055 orrd = container_of(nrs_request_resource(nrq)->res_parent,
1056 struct nrs_orr_data, od_res);
1058 if (orro->oo_quantum == 0 || orro->oo_round < orrd->od_round ||
1059 (orro->oo_active == 0 && orro->oo_quantum > 0)) {
1062 * If there are no pending requests for the object/OST, but some
1063 * of its quantum still remains unused, which implies we did not
1064 * get a chance to schedule up to its maximum allowed batch size
1065 * of requests in the previous round this object/OST
1066 * participated in, schedule this next request on a new round;
1067 * this avoids fragmentation of request batches caused by
1068 * intermittent inactivity on the object/OST, at the expense of
1069 * potentially slightly increased service time for the request
1070 * batch this request will be a part of.
1072 if (orro->oo_active == 0 && orro->oo_quantum > 0)
1075 /** A new scheduling round has commenced */
1076 if (orro->oo_round < orrd->od_round)
1077 orro->oo_round = orrd->od_round;
1079 /** I was not the last object/OST that scheduled a request */
1080 if (orro->oo_sequence < orrd->od_sequence)
1081 orro->oo_sequence = ++orrd->od_sequence;
1083 * Reset the quantum if we have reached the maximum quantum
1084 * size for this batch, or even if we have not managed to
1085 * complete a batch size up to its maximum allowed size.
1086 * XXX: Accessed unlocked
1088 orro->oo_quantum = orrd->od_quantum;
1091 nrq->nr_u.crr.cr_round = orro->oo_round;
1092 nrq->nr_u.crr.cr_sequence = orro->oo_sequence;
1094 rc = cfs_binheap_insert(orrd->od_binheap, &nrq->nr_node);
1097 if (--orro->oo_quantum == 0)
1104 * Removes request \a nrq from an ORR/TRR \a policy instance's set of queued
1107 * \param[in] policy the policy
1108 * \param[in] nrq the request to remove
1110 static void nrs_orr_req_del(struct ptlrpc_nrs_policy *policy,
1111 struct ptlrpc_nrs_request *nrq)
1113 struct nrs_orr_data *orrd;
1114 struct nrs_orr_object *orro;
1117 orro = container_of(nrs_request_resource(nrq),
1118 struct nrs_orr_object, oo_res);
1119 orrd = container_of(nrs_request_resource(nrq)->res_parent,
1120 struct nrs_orr_data, od_res);
1122 LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
1124 is_root = &nrq->nr_node == cfs_binheap_root(orrd->od_binheap);
1126 cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
1130 * If we just deleted the node at the root of the binheap, we may have
1131 * to adjust round numbers.
1133 if (unlikely(is_root)) {
1134 /** Peek at the next request to be served */
1135 cfs_binheap_node_t *node = cfs_binheap_root(orrd->od_binheap);
1137 /** No more requests */
1138 if (unlikely(node == NULL)) {
1141 nrq = container_of(node, struct ptlrpc_nrs_request,
1144 if (orrd->od_round < nrq->nr_u.orr.or_round)
1145 orrd->od_round = nrq->nr_u.orr.or_round;
1151 * Called right after the request \a nrq finishes being handled by ORR policy
1152 * instance \a policy.
1154 * \param[in] policy the policy that handled the request
1155 * \param[in] nrq the request that was handled
1157 static void nrs_orr_req_stop(struct ptlrpc_nrs_policy *policy,
1158 struct ptlrpc_nrs_request *nrq)
1160 /** NB: resource control, credits etc can be added here */
1161 if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
1162 NRS_POL_NAME_MAX) == 0)
1164 "NRS: finished handling %s request for object with FID "
1165 DFID", from OST with index %u, with round "LPU64"\n",
1166 NRS_POL_NAME_ORR, PFID(&nrq->nr_u.orr.or_key.ok_fid),
1167 nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round);
1170 "NRS: finished handling %s request from OST with index %u,"
1171 " with round "LPU64"\n",
1172 NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
1173 nrq->nr_u.orr.or_round);
1183 * This allows to bundle the policy name into the lprocfs_vars::data pointer
1184 * so that lprocfs read/write functions can be used by both the ORR and TRR
1187 struct nrs_lprocfs_orr_data {
1188 struct ptlrpc_service *svc;
1190 } lprocfs_orr_data = {
1191 .name = NRS_POL_NAME_ORR
1192 }, lprocfs_trr_data = {
1193 .name = NRS_POL_NAME_TRR
1197 * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
1198 * for ORR/TRR policy instances on both the regular and high-priority NRS head
1199 * of a service, as long as a policy instance is not in the
1200 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1201 * state are skipped later by nrs_orr_ctl().
1203 * Quantum values are in # of RPCs, and the output is in YAML format.
1210 * XXX: the CRR-N version of this, ptlrpc_lprocfs_rd_nrs_crrn_quantum() is
1211 * almost identical; it can be reworked and then reused for ORR/TRR.
1213 static int ptlrpc_lprocfs_rd_nrs_orr_quantum(char *page, char **start,
1214 off_t off, int count, int *eof,
1217 struct nrs_lprocfs_orr_data *orr_data = data;
1218 struct ptlrpc_service *svc = orr_data->svc;
1224 * Perform two separate calls to this as only one of the NRS heads'
1225 * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
1226 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1228 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1230 NRS_CTL_ORR_RD_QUANTUM,
1234 rc2 = snprintf(page, count, NRS_LPROCFS_QUANTUM_NAME_REG
1237 * Ignore -ENODEV as the regular NRS head's policy may be in the
1238 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1240 } else if (rc != -ENODEV) {
1245 * We know the ost_io service which is the only one ORR/TRR policies are
1246 * compatible with, do have an HP NRS head, but it may be best to guard
1247 * against a possible change of this in the future.
1249 if (!nrs_svc_has_hp(svc))
1252 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1253 orr_data->name, NRS_CTL_ORR_RD_QUANTUM,
1257 rc2 += snprintf(page + rc2, count - rc2,
1258 NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
1260 * Ignore -ENODEV as the high priority NRS head's policy may be
1261 * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1263 } else if (rc != -ENODEV) {
1273 * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
1274 * for ORR/TRR policy instances of a service. The user can set the quantum size
1275 * for the regular and high priority NRS head separately by specifying each
1276 * value, or both together in a single invocation.
1280 * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=req_quantum:64, to set the
1281 * request quantum size of the ORR policy instance on the regular NRS head of
1282 * the ost_io service to 64
1284 * lctl set_param ost.OSS.ost_io.nrs_trr_quantum=hp_quantum:8 to set the request
1285 * quantum size of the TRR policy instance on the high priority NRS head of the
1286 * ost_io service to 8
1288 * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=32, to set both the request
1289 * quantum size of the ORR policy instance on both the regular and the high
1290 * priority NRS head of the ost_io service to 32
1292 * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
1293 * are skipped later by nrs_orr_ctl().
1295 * XXX: the CRR-N version of this, ptlrpc_lprocfs_wr_nrs_crrn_quantum() is
1296 * almost identical; it can be reworked and then reused for ORR/TRR.
1298 static int ptlrpc_lprocfs_wr_nrs_orr_quantum(struct file *file,
1300 unsigned long count, void *data)
1302 struct nrs_lprocfs_orr_data *orr_data = data;
1303 struct ptlrpc_service *svc = orr_data->svc;
1304 enum ptlrpc_nrs_queue_type queue = 0;
1305 char kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
1309 /** lprocfs_find_named_value() modifies its argument, so keep a copy */
1310 unsigned long count_copy;
1314 if (count > (sizeof(kernbuf) - 1))
1317 if (cfs_copy_from_user(kernbuf, buffer, count))
1320 kernbuf[count] = '\0';
1325 * Check if the regular quantum value has been specified
1327 val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
1329 if (val != kernbuf) {
1330 quantum_reg = simple_strtol(val, NULL, 10);
1332 queue |= PTLRPC_NRS_QUEUE_REG;
1338 * Check if the high priority quantum value has been specified
1340 val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
1342 if (val != kernbuf) {
1343 if (!nrs_svc_has_hp(svc))
1346 quantum_hp = simple_strtol(val, NULL, 10);
1348 queue |= PTLRPC_NRS_QUEUE_HP;
1352 * If none of the queues has been specified, look for a valid numerical
1356 if (!isdigit(kernbuf[0]))
1359 quantum_reg = simple_strtol(kernbuf, NULL, 10);
1361 queue = PTLRPC_NRS_QUEUE_REG;
1363 if (nrs_svc_has_hp(svc)) {
1364 queue |= PTLRPC_NRS_QUEUE_HP;
1365 quantum_hp = quantum_reg;
1369 if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
1370 ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
1371 (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
1372 ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
1376 * We change the values on regular and HP NRS heads separately, so that
1377 * we do not exit early from ptlrpc_nrs_policy_control() with an error
1378 * returned by nrs_policy_ctl_locked(), in cases where the user has not
1379 * started the policy on either the regular or HP NRS head; i.e. we are
1380 * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1381 * only if the operation fails with -ENODEV on all heads that have been
1382 * specified by the command; if at least one operation succeeds,
1383 * success is returned.
1385 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1386 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1388 NRS_CTL_ORR_WR_QUANTUM, false,
1390 if ((rc < 0 && rc != -ENODEV) ||
1391 (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1395 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1396 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1398 NRS_CTL_ORR_WR_QUANTUM, false,
1400 if ((rc2 < 0 && rc2 != -ENODEV) ||
1401 (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1405 return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1408 #define LPROCFS_NRS_OFF_NAME_REG "reg_offset_type:"
1409 #define LPROCFS_NRS_OFF_NAME_HP "hp_offset_type:"
1411 #define LPROCFS_NRS_OFF_NAME_PHYSICAL "physical"
1412 #define LPROCFS_NRS_OFF_NAME_LOGICAL "logical"
1415 * Retrieves the offset type used by ORR/TRR policy instances on both the
1416 * regular and high-priority NRS head of a service, as long as a policy
1417 * instance is not in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state;
1418 * policy instances in this state are skipped later by nrs_orr_ctl().
1420 * Offset type information is a (physical|logical) string, and output is
1425 * reg_offset_type:physical
1426 * hp_offset_type:logical
1428 static int ptlrpc_lprocfs_rd_nrs_orr_offset_type(char *page, char **start,
1429 off_t off, int count, int *eof,
1432 struct nrs_lprocfs_orr_data *orr_data = data;
1433 struct ptlrpc_service *svc = orr_data->svc;
1439 * Perform two separate calls to this as only one of the NRS heads'
1440 * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1441 * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1443 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1444 orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1448 rc2 = snprintf(page, count,
1449 LPROCFS_NRS_OFF_NAME_REG"%s\n",
1450 physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1451 LPROCFS_NRS_OFF_NAME_LOGICAL);
1453 * Ignore -ENODEV as the regular NRS head's policy may be in the
1454 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1456 } else if (rc != -ENODEV) {
1461 * We know the ost_io service which is the only one ORR/TRR policies are
1462 * compatible with, do have an HP NRS head, but it may be best to guard
1463 * against a possible change of this in the future.
1465 if (!nrs_svc_has_hp(svc))
1468 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1469 orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1473 rc2 += snprintf(page + rc2, count - rc2,
1474 LPROCFS_NRS_OFF_NAME_HP"%s\n",
1475 physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1476 LPROCFS_NRS_OFF_NAME_LOGICAL);
1478 * Ignore -ENODEV as the high priority NRS head's policy may be
1479 * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1481 } else if (rc != -ENODEV) {
1491 * Max valid command string is the size of the labels, plus "physical" twice.
1492 * plus a separating ' '
1494 #define LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD \
1495 sizeof(LPROCFS_NRS_OFF_NAME_REG LPROCFS_NRS_OFF_NAME_PHYSICAL " " \
1496 LPROCFS_NRS_OFF_NAME_HP LPROCFS_NRS_OFF_NAME_PHYSICAL)
1499 * Sets the type of offsets used to order RPCs in ORR/TRR policy instances. The
1500 * user can set offset type for the regular or high priority NRS head
1501 * separately by specifying each value, or both together in a single invocation.
1505 * lctl set_param ost.OSS.ost_io.nrs_orr_offset_type=
1506 * reg_offset_type:physical, to enable the ORR policy instance on the regular
1507 * NRS head of the ost_io service to use physical disk offset ordering.
1509 * lctl set_param ost.OSS.ost_io.nrs_trr_offset_type=logical, to enable the TRR
1510 * policy instances on both the regular ang high priority NRS heads of the
1511 * ost_io service to use logical file offset ordering.
1513 * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1514 * are skipped later by nrs_orr_ctl().
1516 static int ptlrpc_lprocfs_wr_nrs_orr_offset_type(struct file *file,
1518 unsigned long count,
1521 struct nrs_lprocfs_orr_data *orr_data = data;
1522 struct ptlrpc_service *svc = orr_data->svc;
1523 enum ptlrpc_nrs_queue_type queue = 0;
1524 char kernbuf[LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD];
1529 unsigned long count_copy;
1533 if (count > (sizeof(kernbuf) - 1))
1536 if (cfs_copy_from_user(kernbuf, buffer, count))
1539 kernbuf[count] = '\0';
1544 * Check if the regular offset type has been specified
1546 val_reg = lprocfs_find_named_value(kernbuf,
1547 LPROCFS_NRS_OFF_NAME_REG,
1549 if (val_reg != kernbuf)
1550 queue |= PTLRPC_NRS_QUEUE_REG;
1555 * Check if the high priority offset type has been specified
1557 val_hp = lprocfs_find_named_value(kernbuf, LPROCFS_NRS_OFF_NAME_HP,
1559 if (val_hp != kernbuf) {
1560 if (!nrs_svc_has_hp(svc))
1563 queue |= PTLRPC_NRS_QUEUE_HP;
1567 * If none of the queues has been specified, there may be a valid
1568 * command string at the start of the buffer.
1571 queue = PTLRPC_NRS_QUEUE_REG;
1573 if (nrs_svc_has_hp(svc))
1574 queue |= PTLRPC_NRS_QUEUE_HP;
1577 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1578 if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1579 sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1580 physical_reg = true;
1581 else if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_LOGICAL,
1582 sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1583 physical_reg = false;
1588 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1589 if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1590 sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1592 else if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_LOGICAL,
1593 sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1594 physical_hp = false;
1600 * We change the values on regular and HP NRS heads separately, so that
1601 * we do not exit early from ptlrpc_nrs_policy_control() with an error
1602 * returned by nrs_policy_ctl_locked(), in cases where the user has not
1603 * started the policy on either the regular or HP NRS head; i.e. we are
1604 * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1605 * only if the operation fails with -ENODEV on all heads that have been
1606 * specified by the command; if at least one operation succeeds,
1607 * success is returned.
1609 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1610 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1612 NRS_CTL_ORR_WR_OFF_TYPE, false,
1614 if ((rc < 0 && rc != -ENODEV) ||
1615 (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1619 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1620 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1622 NRS_CTL_ORR_WR_OFF_TYPE, false,
1624 if ((rc2 < 0 && rc2 != -ENODEV) ||
1625 (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1629 return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1632 #define NRS_LPROCFS_REQ_SUPP_NAME_REG "reg_supported:"
1633 #define NRS_LPROCFS_REQ_SUPP_NAME_HP "hp_supported:"
1635 #define LPROCFS_NRS_SUPP_NAME_READS "reads"
1636 #define LPROCFS_NRS_SUPP_NAME_WRITES "writes"
1637 #define LPROCFS_NRS_SUPP_NAME_READWRITES "reads_and_writes"
1640 * Translates enum nrs_orr_supp values to a corresponding string.
1642 static const char *nrs_orr_supp2str(enum nrs_orr_supp supp)
1648 return LPROCFS_NRS_SUPP_NAME_READS;
1650 return LPROCFS_NRS_SUPP_NAME_WRITES;
1652 return LPROCFS_NRS_SUPP_NAME_READWRITES;
1657 * Translates strings to the corresponding enum nrs_orr_supp value
1659 static enum nrs_orr_supp nrs_orr_str2supp(const char *val)
1661 if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READWRITES,
1662 sizeof(LPROCFS_NRS_SUPP_NAME_READWRITES) - 1) == 0)
1664 else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READS,
1665 sizeof(LPROCFS_NRS_SUPP_NAME_READS) - 1) == 0)
1666 return NOS_OST_READ;
1667 else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_WRITES,
1668 sizeof(LPROCFS_NRS_SUPP_NAME_WRITES) - 1) == 0)
1669 return NOS_OST_WRITE;
1675 * Retrieves the type of RPCs handled at the point of invocation by ORR/TRR
1676 * policy instances on both the regular and high-priority NRS head of a service,
1677 * as long as a policy instance is not in the
1678 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1679 * state are skipped later by nrs_orr_ctl().
1681 * Supported RPC type information is a (reads|writes|reads_and_writes) string,
1682 * and output is in YAML format.
1686 * reg_supported:reads
1687 * hp_supported:reads_and_writes
1689 static int ptlrpc_lprocfs_rd_nrs_orr_supported(char *page, char **start,
1690 off_t off, int count, int *eof,
1693 struct nrs_lprocfs_orr_data *orr_data = data;
1694 struct ptlrpc_service *svc = orr_data->svc;
1695 enum nrs_orr_supp supported;
1700 * Perform two separate calls to this as only one of the NRS heads'
1701 * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1702 * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1704 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1706 NRS_CTL_ORR_RD_SUPP_REQ, true,
1711 rc2 = snprintf(page, count,
1712 NRS_LPROCFS_REQ_SUPP_NAME_REG"%s\n",
1713 nrs_orr_supp2str(supported));
1715 * Ignore -ENODEV as the regular NRS head's policy may be in the
1716 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1718 } else if (rc != -ENODEV) {
1723 * We know the ost_io service which is the only one ORR/TRR policies are
1724 * compatible with, do have an HP NRS head, but it may be best to guard
1725 * against a possible change of this in the future.
1727 if (!nrs_svc_has_hp(svc))
1730 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1732 NRS_CTL_ORR_RD_SUPP_REQ, true,
1736 rc2 += snprintf(page + rc2, count - rc2,
1737 NRS_LPROCFS_REQ_SUPP_NAME_HP"%s\n",
1738 nrs_orr_supp2str(supported));
1740 * Ignore -ENODEV as the high priority NRS head's policy may be
1741 * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1743 } else if (rc != -ENODEV) {
1753 * Max valid command string is the size of the labels, plus "reads_and_writes"
1754 * twice, plus a separating ' '
1756 #define LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD \
1757 sizeof(NRS_LPROCFS_REQ_SUPP_NAME_REG LPROCFS_NRS_SUPP_NAME_READWRITES \
1758 NRS_LPROCFS_REQ_SUPP_NAME_HP LPROCFS_NRS_SUPP_NAME_READWRITES \
1762 * Sets the type of RPCs handled by ORR/TRR policy instances. The user can
1763 * modify this setting for the regular or high priority NRS heads separately, or
1764 * both together in a single invocation.
1768 * lctl set_param ost.OSS.ost_io.nrs_orr_supported=
1769 * "reg_supported:reads", to enable the ORR policy instance on the regular NRS
1770 * head of the ost_io service to handle OST_READ RPCs.
1772 * lctl set_param ost.OSS.ost_io.nrs_trr_supported=reads_and_writes, to enable
1773 * the TRR policy instances on both the regular ang high priority NRS heads of
1774 * the ost_io service to use handle OST_READ and OST_WRITE RPCs.
1776 * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1777 * are skipped later by nrs_orr_ctl().
1779 static int ptlrpc_lprocfs_wr_nrs_orr_supported(struct file *file,
1781 unsigned long count, void *data)
1783 struct nrs_lprocfs_orr_data *orr_data = data;
1784 struct ptlrpc_service *svc = orr_data->svc;
1785 enum ptlrpc_nrs_queue_type queue = 0;
1786 char kernbuf[LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD];
1789 enum nrs_orr_supp supp_reg;
1790 enum nrs_orr_supp supp_hp;
1791 unsigned long count_copy;
1795 if (count > (sizeof(kernbuf) - 1))
1798 if (cfs_copy_from_user(kernbuf, buffer, count))
1801 kernbuf[count] = '\0';
1806 * Check if the regular supported requests setting has been specified
1808 val_reg = lprocfs_find_named_value(kernbuf,
1809 NRS_LPROCFS_REQ_SUPP_NAME_REG,
1811 if (val_reg != kernbuf)
1812 queue |= PTLRPC_NRS_QUEUE_REG;
1817 * Check if the high priority supported requests setting has been
1820 val_hp = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_REQ_SUPP_NAME_HP,
1822 if (val_hp != kernbuf) {
1823 if (!nrs_svc_has_hp(svc))
1826 queue |= PTLRPC_NRS_QUEUE_HP;
1830 * If none of the queues has been specified, there may be a valid
1831 * command string at the start of the buffer.
1834 queue = PTLRPC_NRS_QUEUE_REG;
1836 if (nrs_svc_has_hp(svc))
1837 queue |= PTLRPC_NRS_QUEUE_HP;
1840 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1841 supp_reg = nrs_orr_str2supp(val_reg);
1842 if (supp_reg == -EINVAL)
1846 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1847 supp_hp = nrs_orr_str2supp(val_hp);
1848 if (supp_hp == -EINVAL)
1853 * We change the values on regular and HP NRS heads separately, so that
1854 * we do not exit early from ptlrpc_nrs_policy_control() with an error
1855 * returned by nrs_policy_ctl_locked(), in cases where the user has not
1856 * started the policy on either the regular or HP NRS head; i.e. we are
1857 * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1858 * only if the operation fails with -ENODEV on all heads that have been
1859 * specified by the command; if at least one operation succeeds,
1860 * success is returned.
1862 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1863 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1865 NRS_CTL_ORR_WR_SUPP_REQ, false,
1867 if ((rc < 0 && rc != -ENODEV) ||
1868 (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1872 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1873 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1875 NRS_CTL_ORR_WR_SUPP_REQ, false,
1877 if ((rc2 < 0 && rc2 != -ENODEV) ||
1878 (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1882 return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1885 int nrs_orr_lprocfs_init(struct ptlrpc_service *svc)
1890 struct lprocfs_vars nrs_orr_lprocfs_vars[] = {
1891 { .name = "nrs_orr_quantum",
1892 .read_fptr = ptlrpc_lprocfs_rd_nrs_orr_quantum,
1893 .write_fptr = ptlrpc_lprocfs_wr_nrs_orr_quantum },
1894 { .name = "nrs_orr_offset_type",
1895 .read_fptr = ptlrpc_lprocfs_rd_nrs_orr_offset_type,
1896 .write_fptr = ptlrpc_lprocfs_wr_nrs_orr_offset_type },
1897 { .name = "nrs_orr_supported",
1898 .read_fptr = ptlrpc_lprocfs_rd_nrs_orr_supported,
1899 .write_fptr = ptlrpc_lprocfs_wr_nrs_orr_supported },
1903 if (svc->srv_procroot == NULL)
1906 lprocfs_orr_data.svc = svc;
1908 for (i = 0; i < ARRAY_SIZE(nrs_orr_lprocfs_vars); i++)
1909 nrs_orr_lprocfs_vars[i].data = &lprocfs_orr_data;
1911 rc = lprocfs_add_vars(svc->srv_procroot, nrs_orr_lprocfs_vars, NULL);
1916 void nrs_orr_lprocfs_fini(struct ptlrpc_service *svc)
1918 if (svc->srv_procroot == NULL)
1921 lprocfs_remove_proc_entry("nrs_orr_quantum", svc->srv_procroot);
1922 lprocfs_remove_proc_entry("nrs_orr_offset_type", svc->srv_procroot);
1923 lprocfs_remove_proc_entry("nrs_orr_supported", svc->srv_procroot);
1926 #endif /* LPROCFS */
1928 static const struct ptlrpc_nrs_pol_ops nrs_orr_ops = {
1929 .op_policy_init = nrs_orr_init,
1930 .op_policy_start = nrs_orr_start,
1931 .op_policy_stop = nrs_orr_stop,
1932 .op_policy_ctl = nrs_orr_ctl,
1933 .op_res_get = nrs_orr_res_get,
1934 .op_res_put = nrs_orr_res_put,
1935 .op_req_get = nrs_orr_req_get,
1936 .op_req_enqueue = nrs_orr_req_add,
1937 .op_req_dequeue = nrs_orr_req_del,
1938 .op_req_stop = nrs_orr_req_stop,
1940 .op_lprocfs_init = nrs_orr_lprocfs_init,
1941 .op_lprocfs_fini = nrs_orr_lprocfs_fini,
1945 struct ptlrpc_nrs_pol_conf nrs_conf_orr = {
1946 .nc_name = NRS_POL_NAME_ORR,
1947 .nc_ops = &nrs_orr_ops,
1948 .nc_compat = nrs_policy_compat_one,
1949 .nc_compat_svc_name = "ost_io",
1953 * TRR, Target-based Round Robin policy
1955 * TRR reuses much of the functions and data structures of ORR
1960 int nrs_trr_lprocfs_init(struct ptlrpc_service *svc)
1965 struct lprocfs_vars nrs_trr_lprocfs_vars[] = {
1966 { .name = "nrs_trr_quantum",
1967 .read_fptr = ptlrpc_lprocfs_rd_nrs_orr_quantum,
1968 .write_fptr = ptlrpc_lprocfs_wr_nrs_orr_quantum },
1969 { .name = "nrs_trr_offset_type",
1970 .read_fptr = ptlrpc_lprocfs_rd_nrs_orr_offset_type,
1971 .write_fptr = ptlrpc_lprocfs_wr_nrs_orr_offset_type },
1972 { .name = "nrs_trr_supported",
1973 .read_fptr = ptlrpc_lprocfs_rd_nrs_orr_supported,
1974 .write_fptr = ptlrpc_lprocfs_wr_nrs_orr_supported },
1978 if (svc->srv_procroot == NULL)
1981 lprocfs_trr_data.svc = svc;
1983 for (i = 0; i < ARRAY_SIZE(nrs_trr_lprocfs_vars); i++)
1984 nrs_trr_lprocfs_vars[i].data = &lprocfs_trr_data;
1986 rc = lprocfs_add_vars(svc->srv_procroot, nrs_trr_lprocfs_vars, NULL);
1991 void nrs_trr_lprocfs_fini(struct ptlrpc_service *svc)
1993 if (svc->srv_procroot == NULL)
1996 lprocfs_remove_proc_entry("nrs_trr_quantum", svc->srv_procroot);
1997 lprocfs_remove_proc_entry("nrs_trr_offset_type", svc->srv_procroot);
1998 lprocfs_remove_proc_entry("nrs_trr_supported", svc->srv_procroot);
2001 #endif /* LPROCFS */
2004 * Reuse much of the ORR functionality for TRR.
2006 static const struct ptlrpc_nrs_pol_ops nrs_trr_ops = {
2007 .op_policy_init = nrs_orr_init,
2008 .op_policy_start = nrs_orr_start,
2009 .op_policy_stop = nrs_orr_stop,
2010 .op_policy_ctl = nrs_orr_ctl,
2011 .op_res_get = nrs_orr_res_get,
2012 .op_res_put = nrs_orr_res_put,
2013 .op_req_get = nrs_orr_req_get,
2014 .op_req_enqueue = nrs_orr_req_add,
2015 .op_req_dequeue = nrs_orr_req_del,
2016 .op_req_stop = nrs_orr_req_stop,
2018 .op_lprocfs_init = nrs_trr_lprocfs_init,
2019 .op_lprocfs_fini = nrs_trr_lprocfs_fini,
2023 struct ptlrpc_nrs_pol_conf nrs_conf_trr = {
2024 .nc_name = NRS_POL_NAME_TRR,
2025 .nc_ops = &nrs_trr_ops,
2026 .nc_compat = nrs_policy_compat_one,
2027 .nc_compat_svc_name = "ost_io",
2030 /** @} ORR/TRR policy */
2034 #endif /* HAVE_SERVER_SUPPORT */