Whamcloud - gitweb
LU-11085 nodemap: switch interval tree to in-kernel impl.
[fs/lustre-release.git] / lustre / ptlrpc / nrs_orr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2017, Intel Corporation.
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs_orr.c
29  *
30  * Network Request Scheduler (NRS) ORR and TRR policies
31  *
32  * Request scheduling in a Round-Robin manner over backend-fs objects and OSTs
33  * respectively
34  *
35  * Author: Liang Zhen <liang@whamcloud.com>
36  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
37  */
38
39 /**
40  * \addtogoup nrs
41  * @{
42  */
43 #define DEBUG_SUBSYSTEM S_RPC
44 #include <obd_support.h>
45 #include <obd_class.h>
46 #include <lustre_net.h>
47 #include <lustre_req_layout.h>
48 #include "ptlrpc_internal.h"
49
50 /**
51  * \name ORR/TRR policy
52  *
53  * ORR/TRR (Object-based Round Robin/Target-based Round Robin) NRS policies
54  *
55  * ORR performs batched Round Robin shceduling of brw RPCs, based on the FID of
56  * the backend-fs object that the brw RPC pertains to; the TRR policy performs
57  * batched Round Robin scheduling of brw RPCs, based on the OST index that the
58  * RPC pertains to. Both policies also order RPCs in each batch in ascending
59  * offset order, which is lprocfs-tunable between logical file offsets, and
60  * physical disk offsets, as reported by fiemap.
61  *
62  * The TRR policy reuses much of the functionality of ORR. These two scheduling
63  * algorithms could alternatively be implemented under a single NRS policy, that
64  * uses an lprocfs tunable in order to switch between the two types of
65  * scheduling behaviour. The two algorithms have been implemented as separate
66  * policies for reasons of clarity to the user, and to avoid issues that would
67  * otherwise arise at the point of switching between behaviours in the case of
68  * having a single policy, such as resource cleanup for nrs_orr_object
69  * instances. It is possible that this may need to be re-examined in the future,
70  * along with potentially coalescing other policies that perform batched request
71  * scheduling in a Round-Robin manner, all into one policy.
72  *
73  * @{
74  */
75
76 #define NRS_POL_NAME_ORR        "orr"
77 #define NRS_POL_NAME_TRR        "trr"
78
79 /**
80  * Checks if the RPC type of \a nrq is currently handled by an ORR/TRR policy
81  *
82  * \param[in]  orrd   the ORR/TRR policy scheduler instance
83  * \param[in]  nrq    the request
84  * \param[out] opcode the opcode is saved here, just in order to avoid calling
85  *                    lustre_msg_get_opc() again later
86  *
87  * \retval true  request type is supported by the policy instance
88  * \retval false request type is not supported by the policy instance
89  */
90 static bool nrs_orr_req_supported(struct nrs_orr_data *orrd,
91                                   struct ptlrpc_nrs_request *nrq, __u32 *opcode)
92 {
93         struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
94                                                    rq_nrq);
95         __u32                   opc = lustre_msg_get_opc(req->rq_reqmsg);
96         bool                    rc = false;
97
98         /**
99          * XXX: nrs_orr_data::od_supp accessed unlocked.
100          */
101         switch (opc) {
102         case OST_READ:
103                 rc = orrd->od_supp & NOS_OST_READ;
104                 break;
105         case OST_WRITE:
106                 rc = orrd->od_supp & NOS_OST_WRITE;
107                 break;
108         }
109
110         if (rc)
111                 *opcode = opc;
112
113         return rc;
114 }
115
116 /**
117  * Returns the ORR/TRR key fields for the request \a nrq in \a key.
118  *
119  * \param[in]  orrd the ORR/TRR policy scheduler instance
120  * \param[in]  nrq  the request
121  * \param[in]  opc  the request's opcode
122  * \param[in]  name the policy name
123  * \param[out] key  fields of the key are returned here.
124  *
125  * \retval 0   key filled successfully
126  * \retval < 0 error
127  */
128 static int nrs_orr_key_fill(struct nrs_orr_data *orrd,
129                             struct ptlrpc_nrs_request *nrq, __u32 opc,
130                             char *name, struct nrs_orr_key *key)
131 {
132         struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
133                                                    rq_nrq);
134         struct ost_body        *body;
135         __u32                   ost_idx;
136         bool                    is_orr = strncmp(name, NRS_POL_NAME_ORR,
137                                                  NRS_POL_NAME_MAX) == 0;
138
139         LASSERT(req != NULL);
140
141         /**
142          * This is an attempt to fill in the request key fields while
143          * moving a request from the regular to the high-priority NRS
144          * head (via ldlm_lock_reorder_req()), but the request key has
145          * been adequately filled when nrs_orr_res_get() was called through
146          * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR/TRR
147          * policy, so there is nothing to do.
148          */
149         if ((is_orr && nrq->nr_u.orr.or_orr_set) ||
150             (!is_orr && nrq->nr_u.orr.or_trr_set)) {
151                 *key = nrq->nr_u.orr.or_key;
152                 return 0;
153         }
154
155         /* Bounce unconnected requests to the default policy. */
156         if (req->rq_export == NULL)
157                 return -ENOTCONN;
158
159         if (nrq->nr_u.orr.or_orr_set || nrq->nr_u.orr.or_trr_set)
160                 memset(&nrq->nr_u.orr.or_key, 0, sizeof(nrq->nr_u.orr.or_key));
161
162         ost_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
163
164         if (is_orr) {
165                 int     rc;
166                 /**
167                  * The request pill for OST_READ and OST_WRITE requests is
168                  * initialized in the ost_io service's
169                  * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(),
170                  * so no need to redo it here.
171                  */
172                 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
173                 if (body == NULL)
174                         RETURN(-EFAULT);
175
176                 rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx);
177                 if (rc < 0)
178                         return rc;
179
180                 nrq->nr_u.orr.or_orr_set = 1;
181         } else {
182                 key->ok_idx = ost_idx;
183                 nrq->nr_u.orr.or_trr_set = 1;
184         }
185
186         return 0;
187 }
188
189 /**
190  * Populates the range values in \a range with logical offsets obtained via
191  * \a nb.
192  *
193  * \param[in]  nb       niobuf_remote struct array for this request
194  * \param[in]  niocount count of niobuf_remote structs for this request
195  * \param[out] range    the offset range is returned here
196  */
197 static void nrs_orr_range_fill_logical(struct niobuf_remote *nb, int niocount,
198                                        struct nrs_orr_req_range *range)
199 {
200         /* Should we do this at page boundaries ? */
201         range->or_start = nb[0].rnb_offset & PAGE_MASK;
202         range->or_end = (nb[niocount - 1].rnb_offset +
203                          nb[niocount - 1].rnb_len - 1) | ~PAGE_MASK;
204 }
205
206 /**
207  * We obtain information just for a single extent, as the request can only be in
208  * a single place in the binary heap anyway.
209  */
210 #define ORR_NUM_EXTENTS 1
211
212 /**
213  * Converts the logical file offset range in \a range, to a physical disk offset
214  * range in \a range, for a request. Uses obd_get_info() in order to carry out a
215  * fiemap call and obtain backend-fs extent information. The returned range is
216  * in physical block numbers.
217  *
218  * \param[in]     nrq   the request
219  * \param[in]     oa    obdo struct for this request
220  * \param[in,out] range the offset range in bytes; logical range in, physical
221  *                      range out
222  *
223  * \retval 0    physical offsets obtained successfully
224  * \retvall < 0 error
225  */
226 static int nrs_orr_range_fill_physical(struct ptlrpc_nrs_request *nrq,
227                                        struct obdo *oa,
228                                        struct nrs_orr_req_range *range)
229 {
230         struct ptlrpc_request     *req = container_of(nrq,
231                                                       struct ptlrpc_request,
232                                                       rq_nrq);
233         char                       fiemap_buf[offsetof(struct fiemap,
234                                                   fm_extents[ORR_NUM_EXTENTS])];
235         struct fiemap              *fiemap = (struct fiemap *)fiemap_buf;
236         struct ll_fiemap_info_key  key;
237         loff_t                     start;
238         loff_t                     end;
239         int                        rc;
240
241         key = (typeof(key)) {
242                 .lfik_name = KEY_FIEMAP,
243                 .lfik_oa = *oa,
244                 .lfik_fiemap = {
245                         .fm_start = range->or_start,
246                         .fm_length = range->or_end - range->or_start,
247                         .fm_extent_count = ORR_NUM_EXTENTS
248                 }
249         };
250
251         rc = obd_get_info(req->rq_svc_thread->t_env, req->rq_export,
252                           sizeof(key), &key, NULL, fiemap);
253         if (rc < 0)
254                 GOTO(out, rc);
255
256         if (fiemap->fm_mapped_extents == 0 ||
257             fiemap->fm_mapped_extents > ORR_NUM_EXTENTS)
258                 GOTO(out, rc = -EFAULT);
259
260         /**
261          * Calculate the physical offset ranges for the request from the extent
262          * information and the logical request offsets.
263          */
264         start = fiemap->fm_extents[0].fe_physical + range->or_start -
265                 fiemap->fm_extents[0].fe_logical;
266         end = start + range->or_end - range->or_start;
267
268         range->or_start = start;
269         range->or_end = end;
270
271         nrq->nr_u.orr.or_physical_set = 1;
272 out:
273         return rc;
274 }
275
276 /**
277  * Sets the offset range the request covers; either in logical file
278  * offsets or in physical disk offsets.
279  *
280  * \param[in] nrq        the request
281  * \param[in] orrd       the ORR/TRR policy scheduler instance
282  * \param[in] opc        the request's opcode
283  * \param[in] moving_req is the request in the process of moving onto the
284  *                       high-priority NRS head?
285  *
286  * \retval 0    range filled successfully
287  * \retval != 0 error
288  */
289 static int nrs_orr_range_fill(struct ptlrpc_nrs_request *nrq,
290                               struct nrs_orr_data *orrd, __u32 opc,
291                               bool moving_req)
292 {
293         struct ptlrpc_request       *req = container_of(nrq,
294                                                         struct ptlrpc_request,
295                                                         rq_nrq);
296         struct obd_ioobj            *ioo;
297         struct niobuf_remote        *nb;
298         struct ost_body             *body;
299         struct nrs_orr_req_range     range;
300         int                          niocount;
301         int                          rc = 0;
302
303         /**
304          * If we are scheduling using physical disk offsets, but we have filled
305          * the offset information in the request previously
306          * (i.e. ldlm_lock_reorder_req() is moving the request to the
307          * high-priority NRS head), there is no need to do anything, and we can
308          * exit. Moreover than the lack of need, we would be unable to perform
309          * the obd_get_info() call required in nrs_orr_range_fill_physical(),
310          * because ldlm_lock_reorder_lock() calls into here while holding a
311          * spinlock, and retrieving fiemap information via obd_get_info() is a
312          * potentially sleeping operation.
313          */
314         if (orrd->od_physical && nrq->nr_u.orr.or_physical_set)
315                 return 0;
316
317         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
318         if (ioo == NULL)
319                 GOTO(out, rc = -EFAULT);
320
321         niocount = ioo->ioo_bufcnt;
322
323         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
324         if (nb == NULL)
325                 GOTO(out, rc = -EFAULT);
326
327         /**
328          * Use logical information from niobuf_remote structures.
329          */
330         nrs_orr_range_fill_logical(nb, niocount, &range);
331
332         /**
333          * Obtain physical offsets if selected, and this is an OST_READ RPC
334          * RPC. We do not enter this block if moving_req is set which indicates
335          * that the request is being moved to the high-priority NRS head by
336          * ldlm_lock_reorder_req(), as that function calls in here while holding
337          * a spinlock, and nrs_orr_range_physical() can sleep, so we just use
338          * logical file offsets for the range values for such requests.
339          */
340         if (orrd->od_physical && opc == OST_READ && !moving_req) {
341                 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
342                 if (body == NULL)
343                         GOTO(out, rc = -EFAULT);
344
345                 /**
346                  * Translate to physical block offsets from backend filesystem
347                  * extents.
348                  * Ignore return values; if obtaining the physical offsets
349                  * fails, use the logical offsets.
350                  */
351                 nrs_orr_range_fill_physical(nrq, &body->oa, &range);
352         }
353
354         nrq->nr_u.orr.or_range = range;
355 out:
356         return rc;
357 }
358
359 /**
360  * Generates a character string that can be used in order to register uniquely
361  * named libcfs_hash and slab objects for ORR/TRR policy instances. The
362  * character string is unique per policy instance, as it includes the policy's
363  * name, the CPT number, and a {reg|hp} token, and there is one policy instance
364  * per NRS head on each CPT, and the policy is only compatible with the ost_io
365  * service.
366  *
367  * \param[in] policy the policy instance
368  * \param[out] name  the character array that will hold the generated name
369  */
370 static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name)
371 {
372         snprintf(name, NRS_ORR_OBJ_NAME_MAX, "%s%s%s%d",
373                  "nrs_", policy->pol_desc->pd_name,
374                  policy->pol_nrs->nrs_queue_type == PTLRPC_NRS_QUEUE_REG ?
375                  "_reg_" : "_hp_", nrs_pol2cptid(policy));
376 }
377
378 /**
379  * ORR/TRR hash operations
380  */
381 #define NRS_ORR_BITS            24
382 #define NRS_ORR_BKT_BITS        12
383 #define NRS_ORR_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | CFS_HASH_ASSERT_EMPTY)
384
385 #define NRS_TRR_BITS            4
386 #define NRS_TRR_BKT_BITS        2
387 #define NRS_TRR_HASH_FLAGS      CFS_HASH_SPIN_BKTLOCK
388
389 static unsigned
390 nrs_orr_hop_hash(struct cfs_hash *hs, const void *key, unsigned mask)
391 {
392         return cfs_hash_djb2_hash(key, sizeof(struct nrs_orr_key), mask);
393 }
394
395 static void *nrs_orr_hop_key(struct hlist_node *hnode)
396 {
397         struct nrs_orr_object *orro = hlist_entry(hnode,
398                                                       struct nrs_orr_object,
399                                                       oo_hnode);
400         return &orro->oo_key;
401 }
402
403 static int nrs_orr_hop_keycmp(const void *key, struct hlist_node *hnode)
404 {
405         struct nrs_orr_object *orro = hlist_entry(hnode,
406                                                       struct nrs_orr_object,
407                                                       oo_hnode);
408
409         return lu_fid_eq(&orro->oo_key.ok_fid,
410                          &((struct nrs_orr_key *)key)->ok_fid);
411 }
412
413 static void *nrs_orr_hop_object(struct hlist_node *hnode)
414 {
415         return hlist_entry(hnode, struct nrs_orr_object, oo_hnode);
416 }
417
418 static void nrs_orr_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
419 {
420         struct nrs_orr_object *orro = hlist_entry(hnode,
421                                                       struct nrs_orr_object,
422                                                       oo_hnode);
423         orro->oo_ref++;
424 }
425
426 /**
427  * Removes an nrs_orr_object the hash and frees its memory, if the object has
428  * no active users.
429  */
430 static void nrs_orr_hop_put_free(struct cfs_hash *hs, struct hlist_node *hnode)
431 {
432         struct nrs_orr_object *orro = hlist_entry(hnode,
433                                                       struct nrs_orr_object,
434                                                       oo_hnode);
435         struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
436                                                    struct nrs_orr_data, od_res);
437         struct cfs_hash_bd     bd;
438
439         cfs_hash_bd_get_and_lock(hs, &orro->oo_key, &bd, 1);
440
441         if (--orro->oo_ref > 1) {
442                 cfs_hash_bd_unlock(hs, &bd, 1);
443
444                 return;
445         }
446         LASSERT(orro->oo_ref == 1);
447
448         cfs_hash_bd_del_locked(hs, &bd, hnode);
449         cfs_hash_bd_unlock(hs, &bd, 1);
450
451         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
452 }
453
454 static void nrs_orr_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
455 {
456         struct nrs_orr_object *orro = hlist_entry(hnode,
457                                                       struct nrs_orr_object,
458                                                       oo_hnode);
459         orro->oo_ref--;
460 }
461
462 static int nrs_trr_hop_keycmp(const void *key, struct hlist_node *hnode)
463 {
464         struct nrs_orr_object *orro = hlist_entry(hnode,
465                                                       struct nrs_orr_object,
466                                                       oo_hnode);
467
468         return orro->oo_key.ok_idx == ((struct nrs_orr_key *)key)->ok_idx;
469 }
470
471 static void nrs_trr_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
472 {
473         struct nrs_orr_object *orro = hlist_entry(hnode,
474                                                       struct nrs_orr_object,
475                                                       oo_hnode);
476         struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
477                                                    struct nrs_orr_data, od_res);
478
479         LASSERTF(orro->oo_ref == 0,
480                  "Busy NRS TRR policy object for OST with index %u, with %ld "
481                  "refs\n", orro->oo_key.ok_idx, orro->oo_ref);
482
483         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
484 }
485
486 static struct cfs_hash_ops nrs_orr_hash_ops = {
487         .hs_hash        = nrs_orr_hop_hash,
488         .hs_key         = nrs_orr_hop_key,
489         .hs_keycmp      = nrs_orr_hop_keycmp,
490         .hs_object      = nrs_orr_hop_object,
491         .hs_get         = nrs_orr_hop_get,
492         .hs_put         = nrs_orr_hop_put_free,
493         .hs_put_locked  = nrs_orr_hop_put,
494 };
495
496 static struct cfs_hash_ops nrs_trr_hash_ops = {
497         .hs_hash        = nrs_orr_hop_hash,
498         .hs_key         = nrs_orr_hop_key,
499         .hs_keycmp      = nrs_trr_hop_keycmp,
500         .hs_object      = nrs_orr_hop_object,
501         .hs_get         = nrs_orr_hop_get,
502         .hs_put         = nrs_orr_hop_put,
503         .hs_put_locked  = nrs_orr_hop_put,
504         .hs_exit        = nrs_trr_hop_exit,
505 };
506
507 #define NRS_ORR_QUANTUM_DFLT    256
508
509 /**
510  * Binary heap predicate.
511  *
512  * Uses
513  * ptlrpc_nrs_request::nr_u::orr::or_round,
514  * ptlrpc_nrs_request::nr_u::orr::or_sequence, and
515  * ptlrpc_nrs_request::nr_u::orr::or_range to compare two binheap nodes and
516  * produce a binary predicate that indicates their relative priority, so that
517  * the binary heap can perform the necessary sorting operations.
518  *
519  * \param[in] e1 the first binheap node to compare
520  * \param[in] e2 the second binheap node to compare
521  *
522  * \retval 0 e1 > e2
523  * \retval 1 e1 < e2
524  */
525 static int
526 orr_req_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
527 {
528         struct ptlrpc_nrs_request *nrq1;
529         struct ptlrpc_nrs_request *nrq2;
530
531         nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
532         nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
533
534         /**
535          * Requests have been scheduled against a different scheduling round.
536          */
537         if (nrq1->nr_u.orr.or_round < nrq2->nr_u.orr.or_round)
538                 return 1;
539         else if (nrq1->nr_u.orr.or_round > nrq2->nr_u.orr.or_round)
540                 return 0;
541
542         /**
543          * Requests have been scheduled against the same scheduling round, but
544          * belong to a different batch, i.e. they pertain to a different
545          * backend-fs object (for ORR policy instances) or OST (for TRR policy
546          * instances).
547          */
548         if (nrq1->nr_u.orr.or_sequence < nrq2->nr_u.orr.or_sequence)
549                 return 1;
550         else if (nrq1->nr_u.orr.or_sequence > nrq2->nr_u.orr.or_sequence)
551                 return 0;
552
553         /**
554          * If round numbers and sequence numbers are equal, the two requests
555          * have been scheduled on the same round, and belong to the same batch,
556          * which means they pertain to the same backend-fs object (if this is an
557          * ORR policy instance), or to the same OST (if this is a TRR policy
558          * instance), so these requests should be sorted by ascending offset
559          * order.
560          */
561         if (nrq1->nr_u.orr.or_range.or_start <
562             nrq2->nr_u.orr.or_range.or_start) {
563                 return 1;
564         } else if (nrq1->nr_u.orr.or_range.or_start >
565                  nrq2->nr_u.orr.or_range.or_start) {
566                 return 0;
567         } else {
568                 /**
569                  * Requests start from the same offset; Dispatch the shorter one
570                  * first; perhaps slightly more chances of hitting caches like
571                  * this.
572                  */
573                 return nrq1->nr_u.orr.or_range.or_end <
574                        nrq2->nr_u.orr.or_range.or_end;
575         }
576 }
577
578 /**
579  * ORR binary heap operations
580  */
581 static struct cfs_binheap_ops nrs_orr_heap_ops = {
582         .hop_enter      = NULL,
583         .hop_exit       = NULL,
584         .hop_compare    = orr_req_compare,
585 };
586
587 /**
588  * Prints a warning message if an ORR/TRR policy is started on a service with
589  * more than one CPT.  Not printed on the console for now, since we don't
590  * have any performance metrics in the first place, and it is annoying.
591  *
592  * \param[in] policy the policy instance
593  *
594  * \retval 0 success
595  */
596 static int nrs_orr_init(struct ptlrpc_nrs_policy *policy)
597 {
598         if (policy->pol_nrs->nrs_svcpt->scp_service->srv_ncpts > 1)
599                 CDEBUG(D_CONFIG, "%s: The %s NRS policy was registered on a "
600                       "service with multiple service partitions. This policy "
601                       "may perform better with a single partition.\n",
602                       policy->pol_nrs->nrs_svcpt->scp_service->srv_name,
603                       policy->pol_desc->pd_name);
604
605         return 0;
606 }
607
608 /**
609  * Called when an ORR policy instance is started.
610  *
611  * \param[in] policy the policy
612  *
613  * \retval -ENOMEM OOM error
614  * \retval 0       success
615  */
616 static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg)
617 {
618         struct nrs_orr_data    *orrd;
619         struct cfs_hash_ops            *ops;
620         unsigned                cur_bits;
621         unsigned                max_bits;
622         unsigned                bkt_bits;
623         unsigned                flags;
624         int                     rc = 0;
625         ENTRY;
626
627         OBD_CPT_ALLOC_PTR(orrd, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
628         if (orrd == NULL)
629                 RETURN(-ENOMEM);
630
631         /*
632          * Binary heap instance for sorted incoming requests.
633          */
634         orrd->od_binheap = cfs_binheap_create(&nrs_orr_heap_ops,
635                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
636                                               nrs_pol2cptab(policy),
637                                               nrs_pol2cptid(policy));
638         if (orrd->od_binheap == NULL)
639                 GOTO(out_orrd, rc = -ENOMEM);
640
641         nrs_orr_genobjname(policy, orrd->od_objname);
642
643         /**
644          * Slab cache for NRS ORR/TRR objects.
645          */
646         orrd->od_cache = kmem_cache_create(orrd->od_objname,
647                                            sizeof(struct nrs_orr_object),
648                                            0, 0, NULL);
649         if (orrd->od_cache == NULL)
650                 GOTO(out_binheap, rc = -ENOMEM);
651
652         if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
653                     NRS_POL_NAME_MAX) == 0) {
654                 ops = &nrs_orr_hash_ops;
655                 cur_bits = NRS_ORR_BITS;
656                 max_bits = NRS_ORR_BITS;
657                 bkt_bits = NRS_ORR_BKT_BITS;
658                 flags = NRS_ORR_HASH_FLAGS;
659         } else {
660                 ops = &nrs_trr_hash_ops;
661                 cur_bits = NRS_TRR_BITS;
662                 max_bits = NRS_TRR_BITS;
663                 bkt_bits = NRS_TRR_BKT_BITS;
664                 flags = NRS_TRR_HASH_FLAGS;
665         }
666
667         /**
668          * Hash for finding objects by struct nrs_orr_key.
669          * XXX: For TRR, it might be better to avoid using libcfs_hash?
670          * All that needs to be resolved are OST indices, and they
671          * will stay relatively stable during an OSS node's lifetime.
672          */
673         orrd->od_obj_hash = cfs_hash_create(orrd->od_objname, cur_bits,
674                                             max_bits, bkt_bits, 0,
675                                             CFS_HASH_MIN_THETA,
676                                             CFS_HASH_MAX_THETA, ops, flags);
677         if (orrd->od_obj_hash == NULL)
678                 GOTO(out_cache, rc = -ENOMEM);
679
680         /* XXX: Fields accessed unlocked */
681         orrd->od_quantum = NRS_ORR_QUANTUM_DFLT;
682         orrd->od_supp = NOS_DFLT;
683         orrd->od_physical = true;
684         /**
685          * Set to 1 so that the test inside nrs_orr_req_add() can evaluate to
686          * true.
687          */
688         orrd->od_sequence = 1;
689
690         policy->pol_private = orrd;
691
692         RETURN(rc);
693
694 out_cache:
695         kmem_cache_destroy(orrd->od_cache);
696 out_binheap:
697         cfs_binheap_destroy(orrd->od_binheap);
698 out_orrd:
699         OBD_FREE_PTR(orrd);
700
701         RETURN(rc);
702 }
703
704 /**
705  * Called when an ORR/TRR policy instance is stopped.
706  *
707  * Called when the policy has been instructed to transition to the
708  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more
709  * pending requests to serve.
710  *
711  * \param[in] policy the policy
712  */
713 static void nrs_orr_stop(struct ptlrpc_nrs_policy *policy)
714 {
715         struct nrs_orr_data *orrd = policy->pol_private;
716         ENTRY;
717
718         LASSERT(orrd != NULL);
719         LASSERT(orrd->od_binheap != NULL);
720         LASSERT(orrd->od_obj_hash != NULL);
721         LASSERT(orrd->od_cache != NULL);
722         LASSERT(cfs_binheap_is_empty(orrd->od_binheap));
723
724         cfs_binheap_destroy(orrd->od_binheap);
725         cfs_hash_putref(orrd->od_obj_hash);
726         kmem_cache_destroy(orrd->od_cache);
727
728         OBD_FREE_PTR(orrd);
729 }
730
731 /**
732  * Performs a policy-specific ctl function on ORR/TRR policy instances; similar
733  * to ioctl.
734  *
735  * \param[in]     policy the policy instance
736  * \param[in]     opc    the opcode
737  * \param[in,out] arg    used for passing parameters and information
738  *
739  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
740  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
741  *
742  * \retval 0   operation carried successfully
743  * \retval -ve error
744  */
745 static int nrs_orr_ctl(struct ptlrpc_nrs_policy *policy,
746                        enum ptlrpc_nrs_ctl opc, void *arg)
747 {
748         assert_spin_locked(&policy->pol_nrs->nrs_lock);
749
750         switch((enum nrs_ctl_orr)opc) {
751         default:
752                 RETURN(-EINVAL);
753
754         case NRS_CTL_ORR_RD_QUANTUM: {
755                 struct nrs_orr_data     *orrd = policy->pol_private;
756
757                 *(__u16 *)arg = orrd->od_quantum;
758                 }
759                 break;
760
761         case NRS_CTL_ORR_WR_QUANTUM: {
762                 struct nrs_orr_data     *orrd = policy->pol_private;
763
764                 orrd->od_quantum = *(__u16 *)arg;
765                 LASSERT(orrd->od_quantum != 0);
766                 }
767                 break;
768
769         case NRS_CTL_ORR_RD_OFF_TYPE: {
770                 struct nrs_orr_data     *orrd = policy->pol_private;
771
772                 *(bool *)arg = orrd->od_physical;
773                 }
774                 break;
775
776         case NRS_CTL_ORR_WR_OFF_TYPE: {
777                 struct nrs_orr_data     *orrd = policy->pol_private;
778
779                 orrd->od_physical = *(bool *)arg;
780                 }
781                 break;
782
783         case NRS_CTL_ORR_RD_SUPP_REQ: {
784                 struct nrs_orr_data     *orrd = policy->pol_private;
785
786                 *(enum nrs_orr_supp *)arg = orrd->od_supp;
787                 }
788                 break;
789
790         case NRS_CTL_ORR_WR_SUPP_REQ: {
791                 struct nrs_orr_data     *orrd = policy->pol_private;
792
793                 orrd->od_supp = *(enum nrs_orr_supp *)arg;
794                 LASSERT((orrd->od_supp & NOS_OST_RW) != 0);
795                 }
796                 break;
797         }
798         RETURN(0);
799 }
800
801 /**
802  * Obtains resources for ORR/TRR policy instances. The top-level resource lives
803  * inside \e nrs_orr_data and the second-level resource inside
804  * \e nrs_orr_object instances.
805  *
806  * \param[in]  policy     the policy for which resources are being taken for
807  *                        request \a nrq
808  * \param[in]  nrq        the request for which resources are being taken
809  * \param[in]  parent     parent resource, embedded in nrs_orr_data for the
810  *                        ORR/TRR policies
811  * \param[out] resp       used to return resource references
812  * \param[in]  moving_req signifies limited caller context; used to perform
813  *                        memory allocations in an atomic context in this
814  *                        policy
815  *
816  * \retval 0   we are returning a top-level, parent resource, one that is
817  *             embedded in an nrs_orr_data object
818  * \retval 1   we are returning a bottom-level resource, one that is embedded
819  *             in an nrs_orr_object object
820  *
821  * \see nrs_resource_get_safe()
822  */
823 static int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
824                            struct ptlrpc_nrs_request *nrq,
825                            const struct ptlrpc_nrs_resource *parent,
826                            struct ptlrpc_nrs_resource **resp, bool moving_req)
827 {
828         struct nrs_orr_data            *orrd;
829         struct nrs_orr_object          *orro;
830         struct nrs_orr_object          *tmp;
831         struct nrs_orr_key              key = { { { 0 } } };
832         __u32                           opc;
833         int                             rc = 0;
834
835         /**
836          * struct nrs_orr_data is requested.
837          */
838         if (parent == NULL) {
839                 *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res;
840                 return 0;
841         }
842
843         orrd = container_of(parent, struct nrs_orr_data, od_res);
844
845         /**
846          * If the request type is not supported, fail the enqueuing; the RPC
847          * will be handled by the fallback NRS policy.
848          */
849         if (!nrs_orr_req_supported(orrd, nrq, &opc))
850                 return -1;
851
852         /**
853          * Fill in the key for the request; OST FID for ORR policy instances,
854          * and OST index for TRR policy instances.
855          */
856         rc = nrs_orr_key_fill(orrd, nrq, opc, policy->pol_desc->pd_name, &key);
857         if (rc < 0)
858                 RETURN(rc);
859
860         /**
861          * Set the offset range the request covers
862          */
863         rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req);
864         if (rc < 0)
865                 RETURN(rc);
866
867         orro = cfs_hash_lookup(orrd->od_obj_hash, &key);
868         if (orro != NULL)
869                 goto out;
870
871         OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache,
872                                    nrs_pol2cptab(policy), nrs_pol2cptid(policy),
873                                    moving_req ? GFP_ATOMIC : GFP_NOFS);
874         if (orro == NULL)
875                 RETURN(-ENOMEM);
876
877         orro->oo_key = key;
878         orro->oo_ref = 1;
879
880         tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key,
881                                       &orro->oo_hnode);
882         if (tmp != orro) {
883                 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
884                 orro = tmp;
885         }
886 out:
887         /**
888          * For debugging purposes
889          */
890         nrq->nr_u.orr.or_key = orro->oo_key;
891
892         *resp = &orro->oo_res;
893
894         return 1;
895 }
896
897 /**
898  * Called when releasing references to the resource hierachy obtained for a
899  * request for scheduling using ORR/TRR policy instances
900  *
901  * \param[in] policy   the policy the resource belongs to
902  * \param[in] res      the resource to be released
903  */
904 static void nrs_orr_res_put(struct ptlrpc_nrs_policy *policy,
905                             const struct ptlrpc_nrs_resource *res)
906 {
907         struct nrs_orr_data     *orrd;
908         struct nrs_orr_object   *orro;
909
910         /**
911          * Do nothing for freeing parent, nrs_orr_data resources.
912          */
913         if (res->res_parent == NULL)
914                 return;
915
916         orro = container_of(res, struct nrs_orr_object, oo_res);
917         orrd = container_of(res->res_parent, struct nrs_orr_data, od_res);
918
919         cfs_hash_put(orrd->od_obj_hash, &orro->oo_hnode);
920 }
921
922 /**
923  * Called when polling an ORR/TRR policy instance for a request so that it can
924  * be served. Returns the request that is at the root of the binary heap, as
925  * that is the lowest priority one (i.e. libcfs_heap is an implementation of a
926  * min-heap)
927  *
928  * \param[in] policy the policy instance being polled
929  * \param[in] peek   when set, signifies that we just want to examine the
930  *                   request, and not handle it, so the request is not removed
931  *                   from the policy.
932  * \param[in] force  force the policy to return a request; unused in this policy
933  *
934  * \retval the request to be handled
935  * \retval NULL no request available
936  *
937  * \see ptlrpc_nrs_req_get_nolock()
938  * \see nrs_request_get()
939  */
940 static
941 struct ptlrpc_nrs_request *nrs_orr_req_get(struct ptlrpc_nrs_policy *policy,
942                                            bool peek, bool force)
943 {
944         struct nrs_orr_data       *orrd = policy->pol_private;
945         struct cfs_binheap_node   *node = cfs_binheap_root(orrd->od_binheap);
946         struct ptlrpc_nrs_request *nrq;
947
948         nrq = unlikely(node == NULL) ? NULL :
949               container_of(node, struct ptlrpc_nrs_request, nr_node);
950
951         if (likely(!peek && nrq != NULL)) {
952                 struct nrs_orr_object *orro;
953
954                 orro = container_of(nrs_request_resource(nrq),
955                                     struct nrs_orr_object, oo_res);
956
957                 LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
958
959                 cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
960                 orro->oo_active--;
961
962                 if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
963                                  NRS_POL_NAME_MAX) == 0)
964                         CDEBUG(D_RPCTRACE,
965                                "NRS: starting to handle %s request for object "
966                                "with FID "DFID", from OST with index %u, with "
967                                "round %llu\n", NRS_POL_NAME_ORR,
968                                PFID(&orro->oo_key.ok_fid),
969                                nrq->nr_u.orr.or_key.ok_idx,
970                                nrq->nr_u.orr.or_round);
971                 else
972                         CDEBUG(D_RPCTRACE,
973                                "NRS: starting to handle %s request from OST "
974                                "with index %u, with round %llu\n",
975                                NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
976                                nrq->nr_u.orr.or_round);
977
978                 /** Peek at the next request to be served */
979                 node = cfs_binheap_root(orrd->od_binheap);
980
981                 /** No more requests */
982                 if (unlikely(node == NULL)) {
983                         orrd->od_round++;
984                 } else {
985                         struct ptlrpc_nrs_request *next;
986
987                         next = container_of(node, struct ptlrpc_nrs_request,
988                                             nr_node);
989
990                         if (orrd->od_round < next->nr_u.orr.or_round)
991                                 orrd->od_round = next->nr_u.orr.or_round;
992                 }
993         }
994
995         return nrq;
996 }
997
998 /**
999  * Sort-adds request \a nrq to an ORR/TRR \a policy instance's set of queued
1000  * requests in the policy's binary heap.
1001  *
1002  * A scheduling round is a stream of requests that have been sorted in batches
1003  * according to the backend-fs object (for ORR policy instances) or OST (for TRR
1004  * policy instances) that they pertain to (as identified by its IDIF FID or OST
1005  * index respectively); there can be only one batch for each object or OST in
1006  * each round. The batches are of maximum size nrs_orr_data:od_quantum. When a
1007  * new request arrives for scheduling for an object or OST that has exhausted
1008  * its quantum in its current round, the request will be scheduled on the next
1009  * scheduling round. Requests are allowed to be scheduled against a round until
1010  * all requests for the round are serviced, so an object or OST might miss a
1011  * round if requests are not scheduled for it for a long enough period of time.
1012  * Objects or OSTs that miss a round will continue with having their next
1013  * request scheduled, starting at the round that requests are being dispatched
1014  * for, at the time of arrival of this request.
1015  *
1016  * Requests are tagged with the round number and a sequence number; the sequence
1017  * number indicates the relative ordering amongst the batches of requests in a
1018  * round, and is identical for all requests in a batch, as is the round number.
1019  * The round and sequence numbers are used by orr_req_compare() in order to use
1020  * nrs_orr_data::od_binheap in order to maintain an ordered set of rounds, with
1021  * each round consisting of an ordered set of batches of requests, and each
1022  * batch consisting of an ordered set of requests according to their logical
1023  * file or physical disk offsets.
1024  *
1025  * \param[in] policy the policy
1026  * \param[in] nrq    the request to add
1027  *
1028  * \retval 0    request successfully added
1029  * \retval != 0 error
1030  */
1031 static int nrs_orr_req_add(struct ptlrpc_nrs_policy *policy,
1032                            struct ptlrpc_nrs_request *nrq)
1033 {
1034         struct nrs_orr_data     *orrd;
1035         struct nrs_orr_object   *orro;
1036         int                      rc;
1037
1038         orro = container_of(nrs_request_resource(nrq),
1039                             struct nrs_orr_object, oo_res);
1040         orrd = container_of(nrs_request_resource(nrq)->res_parent,
1041                             struct nrs_orr_data, od_res);
1042
1043         if (orro->oo_quantum == 0 || orro->oo_round < orrd->od_round ||
1044             (orro->oo_active == 0 && orro->oo_quantum > 0)) {
1045
1046                 /**
1047                  * If there are no pending requests for the object/OST, but some
1048                  * of its quantum still remains unused, which implies we did not
1049                  * get a chance to schedule up to its maximum allowed batch size
1050                  * of requests in the previous round this object/OST
1051                  * participated in, schedule this next request on a new round;
1052                  * this avoids fragmentation of request batches caused by
1053                  * intermittent inactivity on the object/OST, at the expense of
1054                  * potentially slightly increased service time for the request
1055                  * batch this request will be a part of.
1056                  */
1057                 if (orro->oo_active == 0 && orro->oo_quantum > 0)
1058                         orro->oo_round++;
1059
1060                 /** A new scheduling round has commenced */
1061                 if (orro->oo_round < orrd->od_round)
1062                         orro->oo_round = orrd->od_round;
1063
1064                 /** I was not the last object/OST that scheduled a request */
1065                 if (orro->oo_sequence < orrd->od_sequence)
1066                         orro->oo_sequence = ++orrd->od_sequence;
1067                 /**
1068                  * Reset the quantum if we have reached the maximum quantum
1069                  * size for this batch, or even if we have not managed to
1070                  * complete a batch size up to its maximum allowed size.
1071                  * XXX: Accessed unlocked
1072                  */
1073                 orro->oo_quantum = orrd->od_quantum;
1074         }
1075
1076         nrq->nr_u.orr.or_round = orro->oo_round;
1077         nrq->nr_u.orr.or_sequence = orro->oo_sequence;
1078
1079         rc = cfs_binheap_insert(orrd->od_binheap, &nrq->nr_node);
1080         if (rc == 0) {
1081                 orro->oo_active++;
1082                 if (--orro->oo_quantum == 0)
1083                         orro->oo_round++;
1084         }
1085         return rc;
1086 }
1087
1088 /**
1089  * Removes request \a nrq from an ORR/TRR \a policy instance's set of queued
1090  * requests.
1091  *
1092  * \param[in] policy the policy
1093  * \param[in] nrq    the request to remove
1094  */
1095 static void nrs_orr_req_del(struct ptlrpc_nrs_policy *policy,
1096                             struct ptlrpc_nrs_request *nrq)
1097 {
1098         struct nrs_orr_data     *orrd;
1099         struct nrs_orr_object   *orro;
1100         bool                     is_root;
1101
1102         orro = container_of(nrs_request_resource(nrq),
1103                             struct nrs_orr_object, oo_res);
1104         orrd = container_of(nrs_request_resource(nrq)->res_parent,
1105                             struct nrs_orr_data, od_res);
1106
1107         LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
1108
1109         is_root = &nrq->nr_node == cfs_binheap_root(orrd->od_binheap);
1110
1111         cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
1112         orro->oo_active--;
1113
1114         /**
1115          * If we just deleted the node at the root of the binheap, we may have
1116          * to adjust round numbers.
1117          */
1118         if (unlikely(is_root)) {
1119                 /** Peek at the next request to be served */
1120                 struct cfs_binheap_node *node = cfs_binheap_root(orrd->od_binheap);
1121
1122                 /** No more requests */
1123                 if (unlikely(node == NULL)) {
1124                         orrd->od_round++;
1125                 } else {
1126                         nrq = container_of(node, struct ptlrpc_nrs_request,
1127                                            nr_node);
1128
1129                         if (orrd->od_round < nrq->nr_u.orr.or_round)
1130                                 orrd->od_round = nrq->nr_u.orr.or_round;
1131                 }
1132         }
1133 }
1134
1135 /**
1136  * Called right after the request \a nrq finishes being handled by ORR policy
1137  * instance \a policy.
1138  *
1139  * \param[in] policy the policy that handled the request
1140  * \param[in] nrq    the request that was handled
1141  */
1142 static void nrs_orr_req_stop(struct ptlrpc_nrs_policy *policy,
1143                              struct ptlrpc_nrs_request *nrq)
1144 {
1145         /** NB: resource control, credits etc can be added here */
1146         if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
1147                     NRS_POL_NAME_MAX) == 0)
1148                 CDEBUG(D_RPCTRACE,
1149                        "NRS: finished handling %s request for object with FID "
1150                        DFID", from OST with index %u, with round %llu\n",
1151                        NRS_POL_NAME_ORR, PFID(&nrq->nr_u.orr.or_key.ok_fid),
1152                        nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round);
1153         else
1154                 CDEBUG(D_RPCTRACE,
1155                        "NRS: finished handling %s request from OST with index %u,"
1156                        " with round %llu\n",
1157                        NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
1158                        nrq->nr_u.orr.or_round);
1159 }
1160
1161 /**
1162  * debugfs interface
1163  */
1164
1165 /**
1166  * This allows to bundle the policy name into the lprocfs_vars::data pointer
1167  * so that lprocfs read/write functions can be used by both the ORR and TRR
1168  * policies.
1169  */
1170 static struct nrs_lprocfs_orr_data {
1171         struct ptlrpc_service   *svc;
1172         char                    *name;
1173 } lprocfs_orr_data = {
1174         .name = NRS_POL_NAME_ORR
1175 }, lprocfs_trr_data = {
1176         .name = NRS_POL_NAME_TRR
1177 };
1178
1179 /**
1180  * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
1181  * for ORR/TRR policy instances on both the regular and high-priority NRS head
1182  * of a service, as long as a policy instance is not in the
1183  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1184  * state are skipped later by nrs_orr_ctl().
1185  *
1186  * Quantum values are in # of RPCs, and the output is in YAML format.
1187  *
1188  * For example:
1189  *
1190  *      reg_quantum:256
1191  *      hp_quantum:8
1192  *
1193  * XXX: the CRR-N version of this, ptlrpc_lprocfs_rd_nrs_crrn_quantum() is
1194  * almost identical; it can be reworked and then reused for ORR/TRR.
1195  */
1196 static int
1197 ptlrpc_lprocfs_nrs_orr_quantum_seq_show(struct seq_file *m, void *data)
1198 {
1199         struct nrs_lprocfs_orr_data *orr_data = m->private;
1200         struct ptlrpc_service       *svc = orr_data->svc;
1201         __u16                        quantum;
1202         int                          rc;
1203
1204         /**
1205          * Perform two separate calls to this as only one of the NRS heads'
1206          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
1207          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1208          */
1209         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1210                                        orr_data->name,
1211                                        NRS_CTL_ORR_RD_QUANTUM,
1212                                        true, &quantum);
1213         if (rc == 0) {
1214                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_REG "%-5d\n", quantum);
1215                 /**
1216                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1217                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1218                  */
1219         } else if (rc != -ENODEV) {
1220                 return rc;
1221         }
1222
1223         /**
1224          * We know the ost_io service which is the only one ORR/TRR policies are
1225          * compatible with, do have an HP NRS head, but it may be best to guard
1226          * against a possible change of this in the future.
1227          */
1228         if (!nrs_svc_has_hp(svc))
1229                 goto no_hp;
1230
1231         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1232                                        orr_data->name, NRS_CTL_ORR_RD_QUANTUM,
1233                                        true, &quantum);
1234         if (rc == 0) {
1235                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
1236                 /**
1237                  * Ignore -ENODEV as the high priority NRS head's policy may be
1238                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1239                  */
1240         } else if (rc != -ENODEV) {
1241                 return rc;
1242         }
1243
1244 no_hp:
1245
1246         return rc;
1247 }
1248
1249 /**
1250  * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
1251  * for ORR/TRR policy instances of a service. The user can set the quantum size
1252  * for the regular and high priority NRS head separately by specifying each
1253  * value, or both together in a single invocation.
1254  *
1255  * For example:
1256  *
1257  * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=req_quantum:64, to set the
1258  * request quantum size of the ORR policy instance on the regular NRS head of
1259  * the ost_io service to 64
1260  *
1261  * lctl set_param ost.OSS.ost_io.nrs_trr_quantum=hp_quantum:8 to set the request
1262  * quantum size of the TRR policy instance on the high priority NRS head of the
1263  * ost_io service to 8
1264  *
1265  * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=32, to set both the request
1266  * quantum size of the ORR policy instance on both the regular and the high
1267  * priority NRS head of the ost_io service to 32
1268  *
1269  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
1270  * are skipped later by nrs_orr_ctl().
1271  *
1272  * XXX: the CRR-N version of this, ptlrpc_lprocfs_wr_nrs_crrn_quantum() is
1273  * almost identical; it can be reworked and then reused for ORR/TRR.
1274  */
1275 static ssize_t
1276 ptlrpc_lprocfs_nrs_orr_quantum_seq_write(struct file *file,
1277                                          const char __user *buffer,
1278                                          size_t count, loff_t *off)
1279 {
1280         struct seq_file             *m = file->private_data;
1281         struct nrs_lprocfs_orr_data *orr_data = m->private;
1282         struct ptlrpc_service       *svc = orr_data->svc;
1283         enum ptlrpc_nrs_queue_type   queue = 0;
1284         char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
1285         char                        *val;
1286         long                         quantum_reg;
1287         long                         quantum_hp;
1288         /** lprocfs_find_named_value() modifies its argument, so keep a copy */
1289         size_t                       count_copy;
1290         int                          rc = 0;
1291         int                          rc2 = 0;
1292
1293         if (count > (sizeof(kernbuf) - 1))
1294                 return -EINVAL;
1295
1296         if (copy_from_user(kernbuf, buffer, count))
1297                 return -EFAULT;
1298
1299         kernbuf[count] = '\0';
1300
1301         count_copy = count;
1302
1303         /**
1304          * Check if the regular quantum value has been specified
1305          */
1306         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
1307                                        &count_copy);
1308         if (val != kernbuf) {
1309                 rc = kstrtol(val, 10, &quantum_reg);
1310                 if (rc)
1311                         return rc;
1312                 queue |= PTLRPC_NRS_QUEUE_REG;
1313         }
1314
1315         count_copy = count;
1316
1317         /**
1318          * Check if the high priority quantum value has been specified
1319          */
1320         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
1321                                        &count_copy);
1322         if (val != kernbuf) {
1323                 if (!nrs_svc_has_hp(svc))
1324                         return -ENODEV;
1325
1326                 rc = kstrtol(val, 10, &quantum_hp);
1327                 if (rc)
1328                         return rc;
1329
1330                 queue |= PTLRPC_NRS_QUEUE_HP;
1331         }
1332
1333         /**
1334          * If none of the queues has been specified, look for a valid numerical
1335          * value
1336          */
1337         if (queue == 0) {
1338                 rc = kstrtol(kernbuf, 10, &quantum_reg);
1339                 if (rc)
1340                         return rc;
1341
1342                 queue = PTLRPC_NRS_QUEUE_REG;
1343
1344                 if (nrs_svc_has_hp(svc)) {
1345                         queue |= PTLRPC_NRS_QUEUE_HP;
1346                         quantum_hp = quantum_reg;
1347                 }
1348         }
1349
1350         if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
1351             ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
1352             (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
1353             ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
1354                 return -EINVAL;
1355
1356         /**
1357          * We change the values on regular and HP NRS heads separately, so that
1358          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1359          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1360          * started the policy on either the regular or HP NRS head; i.e. we are
1361          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1362          * only if the operation fails with -ENODEV on all heads that have been
1363          * specified by the command; if at least one operation succeeds,
1364          * success is returned.
1365          */
1366         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1367                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1368                                                orr_data->name,
1369                                                NRS_CTL_ORR_WR_QUANTUM, false,
1370                                                &quantum_reg);
1371                 if ((rc < 0 && rc != -ENODEV) ||
1372                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1373                         return rc;
1374         }
1375
1376         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1377                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1378                                                 orr_data->name,
1379                                                 NRS_CTL_ORR_WR_QUANTUM, false,
1380                                                 &quantum_hp);
1381                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1382                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1383                         return rc2;
1384         }
1385
1386         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1387 }
1388
1389 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_quantum);
1390
1391 #define LPROCFS_NRS_OFF_NAME_REG                "reg_offset_type:"
1392 #define LPROCFS_NRS_OFF_NAME_HP                 "hp_offset_type:"
1393
1394 #define LPROCFS_NRS_OFF_NAME_PHYSICAL           "physical"
1395 #define LPROCFS_NRS_OFF_NAME_LOGICAL            "logical"
1396
1397 /**
1398  * Retrieves the offset type used by ORR/TRR policy instances on both the
1399  * regular and high-priority NRS head of a service, as long as a policy
1400  * instance is not in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state;
1401  * policy instances in this state are skipped later by nrs_orr_ctl().
1402  *
1403  * Offset type information is a (physical|logical) string, and output is
1404  * in YAML format.
1405  *
1406  * For example:
1407  *
1408  *      reg_offset_type:physical
1409  *      hp_offset_type:logical
1410  */
1411 static int
1412 ptlrpc_lprocfs_nrs_orr_offset_type_seq_show(struct seq_file *m, void *data)
1413 {
1414         struct nrs_lprocfs_orr_data *orr_data = m->private;
1415         struct ptlrpc_service       *svc = orr_data->svc;
1416         bool                         physical;
1417         int                          rc;
1418
1419         /**
1420          * Perform two separate calls to this as only one of the NRS heads'
1421          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1422          * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1423          */
1424         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1425                                        orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1426                                        true, &physical);
1427         if (rc == 0) {
1428                 seq_printf(m, LPROCFS_NRS_OFF_NAME_REG"%s\n",
1429                            physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1430                            LPROCFS_NRS_OFF_NAME_LOGICAL);
1431                 /**
1432                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1433                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1434                  */
1435         } else if (rc != -ENODEV) {
1436                 return rc;
1437         }
1438
1439         /**
1440          * We know the ost_io service which is the only one ORR/TRR policies are
1441          * compatible with, do have an HP NRS head, but it may be best to guard
1442          * against a possible change of this in the future.
1443          */
1444         if (!nrs_svc_has_hp(svc))
1445                 goto no_hp;
1446
1447         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1448                                        orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1449                                        true, &physical);
1450         if (rc == 0) {
1451                 seq_printf(m, LPROCFS_NRS_OFF_NAME_HP"%s\n",
1452                            physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1453                            LPROCFS_NRS_OFF_NAME_LOGICAL);
1454                 /**
1455                  * Ignore -ENODEV as the high priority NRS head's policy may be
1456                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1457                  */
1458         } else if (rc != -ENODEV) {
1459                 return rc;
1460         }
1461
1462 no_hp:
1463         return rc;
1464 }
1465
1466 /**
1467  * Max valid command string is the size of the labels, plus "physical" twice.
1468  * plus a separating ' '
1469  */
1470 #define LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD                                        \
1471         sizeof(LPROCFS_NRS_OFF_NAME_REG LPROCFS_NRS_OFF_NAME_PHYSICAL " "      \
1472                LPROCFS_NRS_OFF_NAME_HP LPROCFS_NRS_OFF_NAME_PHYSICAL)
1473
1474 /**
1475  * Sets the type of offsets used to order RPCs in ORR/TRR policy instances. The
1476  * user can set offset type for the regular or high priority NRS head
1477  * separately by specifying each value, or both together in a single invocation.
1478  *
1479  * For example:
1480  *
1481  * lctl set_param ost.OSS.ost_io.nrs_orr_offset_type=
1482  * reg_offset_type:physical, to enable the ORR policy instance on the regular
1483  * NRS head of the ost_io service to use physical disk offset ordering.
1484  *
1485  * lctl set_param ost.OSS.ost_io.nrs_trr_offset_type=logical, to enable the TRR
1486  * policy instances on both the regular ang high priority NRS heads of the
1487  * ost_io service to use logical file offset ordering.
1488  *
1489  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1490  * are skipped later by nrs_orr_ctl().
1491  */
1492 static ssize_t
1493 ptlrpc_lprocfs_nrs_orr_offset_type_seq_write(struct file *file,
1494                                              const char __user *buffer,
1495                                               size_t count,
1496                                              loff_t *off)
1497 {
1498         struct seq_file             *m = file->private_data;
1499         struct nrs_lprocfs_orr_data *orr_data = m->private;
1500         struct ptlrpc_service       *svc = orr_data->svc;
1501         enum ptlrpc_nrs_queue_type   queue = 0;
1502         char                         kernbuf[LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD];
1503         char                        *val_reg;
1504         char                        *val_hp;
1505         bool                         physical_reg;
1506         bool                         physical_hp;
1507         size_t                       count_copy;
1508         int                          rc = 0;
1509         int                          rc2 = 0;
1510
1511         if (count > (sizeof(kernbuf) - 1))
1512                 return -EINVAL;
1513
1514         if (copy_from_user(kernbuf, buffer, count))
1515                 return -EFAULT;
1516
1517         kernbuf[count] = '\0';
1518
1519         count_copy = count;
1520
1521         /**
1522          * Check if the regular offset type has been specified
1523          */
1524         val_reg = lprocfs_find_named_value(kernbuf,
1525                                            LPROCFS_NRS_OFF_NAME_REG,
1526                                            &count_copy);
1527         if (val_reg != kernbuf)
1528                 queue |= PTLRPC_NRS_QUEUE_REG;
1529
1530         count_copy = count;
1531
1532         /**
1533          * Check if the high priority offset type has been specified
1534          */
1535         val_hp = lprocfs_find_named_value(kernbuf, LPROCFS_NRS_OFF_NAME_HP,
1536                                           &count_copy);
1537         if (val_hp != kernbuf) {
1538                 if (!nrs_svc_has_hp(svc))
1539                         return -ENODEV;
1540
1541                 queue |= PTLRPC_NRS_QUEUE_HP;
1542         }
1543
1544         /**
1545          * If none of the queues has been specified, there may be a valid
1546          * command string at the start of the buffer.
1547          */
1548         if (queue == 0) {
1549                 queue = PTLRPC_NRS_QUEUE_REG;
1550
1551                 if (nrs_svc_has_hp(svc))
1552                         queue |= PTLRPC_NRS_QUEUE_HP;
1553         }
1554
1555         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1556                 if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1557                             sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1558                         physical_reg = true;
1559                 else if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_LOGICAL,
1560                          sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1561                         physical_reg = false;
1562                 else
1563                         return -EINVAL;
1564         }
1565
1566         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1567                 if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1568                             sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1569                         physical_hp = true;
1570                 else if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_LOGICAL,
1571                                  sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1572                         physical_hp = false;
1573                 else
1574                         return -EINVAL;
1575         }
1576
1577         /**
1578          * We change the values on regular and HP NRS heads separately, so that
1579          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1580          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1581          * started the policy on either the regular or HP NRS head; i.e. we are
1582          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1583          * only if the operation fails with -ENODEV on all heads that have been
1584          * specified by the command; if at least one operation succeeds,
1585          * success is returned.
1586          */
1587         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1588                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1589                                                orr_data->name,
1590                                                NRS_CTL_ORR_WR_OFF_TYPE, false,
1591                                                &physical_reg);
1592                 if ((rc < 0 && rc != -ENODEV) ||
1593                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1594                         return rc;
1595         }
1596
1597         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1598                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1599                                                 orr_data->name,
1600                                                 NRS_CTL_ORR_WR_OFF_TYPE, false,
1601                                                 &physical_hp);
1602                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1603                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1604                         return rc2;
1605         }
1606
1607         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1608 }
1609
1610 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_offset_type);
1611
1612 #define NRS_LPROCFS_REQ_SUPP_NAME_REG           "reg_supported:"
1613 #define NRS_LPROCFS_REQ_SUPP_NAME_HP            "hp_supported:"
1614
1615 #define LPROCFS_NRS_SUPP_NAME_READS             "reads"
1616 #define LPROCFS_NRS_SUPP_NAME_WRITES            "writes"
1617 #define LPROCFS_NRS_SUPP_NAME_READWRITES        "reads_and_writes"
1618
1619 /**
1620  * Translates enum nrs_orr_supp values to a corresponding string.
1621  */
1622 static const char *nrs_orr_supp2str(enum nrs_orr_supp supp)
1623 {
1624         switch(supp) {
1625         default:
1626                 LBUG();
1627         case NOS_OST_READ:
1628                 return LPROCFS_NRS_SUPP_NAME_READS;
1629         case NOS_OST_WRITE:
1630                 return LPROCFS_NRS_SUPP_NAME_WRITES;
1631         case NOS_OST_RW:
1632                 return LPROCFS_NRS_SUPP_NAME_READWRITES;
1633         }
1634 }
1635
1636 /**
1637  * Translates strings to the corresponding enum nrs_orr_supp value
1638  */
1639 static enum nrs_orr_supp nrs_orr_str2supp(const char *val)
1640 {
1641         if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READWRITES,
1642                     sizeof(LPROCFS_NRS_SUPP_NAME_READWRITES) - 1) == 0)
1643                 return NOS_OST_RW;
1644         else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READS,
1645                          sizeof(LPROCFS_NRS_SUPP_NAME_READS) - 1) == 0)
1646                 return NOS_OST_READ;
1647         else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_WRITES,
1648                          sizeof(LPROCFS_NRS_SUPP_NAME_WRITES) - 1) == 0)
1649                 return NOS_OST_WRITE;
1650         else
1651                 return -EINVAL;
1652 }
1653
1654 /**
1655  * Retrieves the type of RPCs handled at the point of invocation by ORR/TRR
1656  * policy instances on both the regular and high-priority NRS head of a service,
1657  * as long as a policy instance is not in the
1658  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1659  * state are skipped later by nrs_orr_ctl().
1660  *
1661  * Supported RPC type information is a (reads|writes|reads_and_writes) string,
1662  * and output is in YAML format.
1663  *
1664  * For example:
1665  *
1666  *      reg_supported:reads
1667  *      hp_supported:reads_and_writes
1668  */
1669 static int
1670 ptlrpc_lprocfs_nrs_orr_supported_seq_show(struct seq_file *m, void *data)
1671 {
1672         struct nrs_lprocfs_orr_data *orr_data = m->private;
1673         struct ptlrpc_service       *svc = orr_data->svc;
1674         enum nrs_orr_supp            supported;
1675         int                          rc;
1676
1677         /**
1678          * Perform two separate calls to this as only one of the NRS heads'
1679          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1680          * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1681          */
1682         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1683                                        orr_data->name,
1684                                        NRS_CTL_ORR_RD_SUPP_REQ, true,
1685                                        &supported);
1686
1687         if (rc == 0) {
1688                 seq_printf(m, NRS_LPROCFS_REQ_SUPP_NAME_REG"%s\n",
1689                            nrs_orr_supp2str(supported));
1690                 /**
1691                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1692                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1693                  */
1694         } else if (rc != -ENODEV) {
1695                 return rc;
1696         }
1697
1698         /**
1699          * We know the ost_io service which is the only one ORR/TRR policies are
1700          * compatible with, do have an HP NRS head, but it may be best to guard
1701          * against a possible change of this in the future.
1702          */
1703         if (!nrs_svc_has_hp(svc))
1704                 goto no_hp;
1705
1706         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1707                                        orr_data->name,
1708                                        NRS_CTL_ORR_RD_SUPP_REQ, true,
1709                                        &supported);
1710         if (rc == 0) {
1711                 seq_printf(m, NRS_LPROCFS_REQ_SUPP_NAME_HP"%s\n",
1712                            nrs_orr_supp2str(supported));
1713                 /**
1714                  * Ignore -ENODEV as the high priority NRS head's policy may be
1715                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1716                  */
1717         } else if (rc != -ENODEV) {
1718                 return rc;
1719         }
1720
1721 no_hp:
1722
1723         return rc;
1724 }
1725
1726 /**
1727  * Max valid command string is the size of the labels, plus "reads_and_writes"
1728  * twice, plus a separating ' '
1729  */
1730 #define LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD                                        \
1731         sizeof(NRS_LPROCFS_REQ_SUPP_NAME_REG LPROCFS_NRS_SUPP_NAME_READWRITES  \
1732                NRS_LPROCFS_REQ_SUPP_NAME_HP LPROCFS_NRS_SUPP_NAME_READWRITES   \
1733                " ")
1734
1735 /**
1736  * Sets the type of RPCs handled by ORR/TRR policy instances. The user can
1737  * modify this setting for the regular or high priority NRS heads separately, or
1738  * both together in a single invocation.
1739  *
1740  * For example:
1741  *
1742  * lctl set_param ost.OSS.ost_io.nrs_orr_supported=
1743  * "reg_supported:reads", to enable the ORR policy instance on the regular NRS
1744  * head of the ost_io service to handle OST_READ RPCs.
1745  *
1746  * lctl set_param ost.OSS.ost_io.nrs_trr_supported=reads_and_writes, to enable
1747  * the TRR policy instances on both the regular ang high priority NRS heads of
1748  * the ost_io service to use handle OST_READ and OST_WRITE RPCs.
1749  *
1750  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1751  * are skipped later by nrs_orr_ctl().
1752  */
1753 static ssize_t
1754 ptlrpc_lprocfs_nrs_orr_supported_seq_write(struct file *file,
1755                                            const char __user *buffer,
1756                                            size_t count,
1757                                            loff_t *off)
1758 {
1759         struct seq_file             *m = file->private_data;
1760         struct nrs_lprocfs_orr_data *orr_data = m->private;
1761         struct ptlrpc_service       *svc = orr_data->svc;
1762         enum ptlrpc_nrs_queue_type   queue = 0;
1763         char                         kernbuf[LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD];
1764         char                        *val_reg;
1765         char                        *val_hp;
1766         enum nrs_orr_supp            supp_reg;
1767         enum nrs_orr_supp            supp_hp;
1768         size_t                       count_copy;
1769         int                          rc = 0;
1770         int                          rc2 = 0;
1771
1772         if (count > (sizeof(kernbuf) - 1))
1773                 return -EINVAL;
1774
1775         if (copy_from_user(kernbuf, buffer, count))
1776                 return -EFAULT;
1777
1778         kernbuf[count] = '\0';
1779
1780         count_copy = count;
1781
1782         /**
1783          * Check if the regular supported requests setting has been specified
1784          */
1785         val_reg = lprocfs_find_named_value(kernbuf,
1786                                            NRS_LPROCFS_REQ_SUPP_NAME_REG,
1787                                            &count_copy);
1788         if (val_reg != kernbuf)
1789                 queue |= PTLRPC_NRS_QUEUE_REG;
1790
1791         count_copy = count;
1792
1793         /**
1794          * Check if the high priority supported requests setting has been
1795          * specified
1796          */
1797         val_hp = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_REQ_SUPP_NAME_HP,
1798                                           &count_copy);
1799         if (val_hp != kernbuf) {
1800                 if (!nrs_svc_has_hp(svc))
1801                         return -ENODEV;
1802
1803                 queue |= PTLRPC_NRS_QUEUE_HP;
1804         }
1805
1806         /**
1807          * If none of the queues has been specified, there may be a valid
1808          * command string at the start of the buffer.
1809          */
1810         if (queue == 0) {
1811                 queue = PTLRPC_NRS_QUEUE_REG;
1812
1813                 if (nrs_svc_has_hp(svc))
1814                         queue |= PTLRPC_NRS_QUEUE_HP;
1815         }
1816
1817         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1818                 supp_reg = nrs_orr_str2supp(val_reg);
1819                 if (supp_reg == -EINVAL)
1820                         return -EINVAL;
1821         }
1822
1823         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1824                 supp_hp = nrs_orr_str2supp(val_hp);
1825                 if (supp_hp == -EINVAL)
1826                         return -EINVAL;
1827         }
1828
1829         /**
1830          * We change the values on regular and HP NRS heads separately, so that
1831          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1832          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1833          * started the policy on either the regular or HP NRS head; i.e. we are
1834          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1835          * only if the operation fails with -ENODEV on all heads that have been
1836          * specified by the command; if at least one operation succeeds,
1837          * success is returned.
1838          */
1839         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1840                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1841                                                orr_data->name,
1842                                                NRS_CTL_ORR_WR_SUPP_REQ, false,
1843                                                &supp_reg);
1844                 if ((rc < 0 && rc != -ENODEV) ||
1845                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1846                         return rc;
1847         }
1848
1849         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1850                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1851                                                 orr_data->name,
1852                                                 NRS_CTL_ORR_WR_SUPP_REQ, false,
1853                                                 &supp_hp);
1854                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1855                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1856                         return rc2;
1857         }
1858
1859         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1860 }
1861
1862 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_supported);
1863
1864 static int nrs_orr_lprocfs_init(struct ptlrpc_service *svc)
1865 {
1866         int     i;
1867
1868         struct ldebugfs_vars nrs_orr_lprocfs_vars[] = {
1869                 { .name         = "nrs_orr_quantum",
1870                   .fops         = &ptlrpc_lprocfs_nrs_orr_quantum_fops  },
1871                 { .name         = "nrs_orr_offset_type",
1872                   .fops         = &ptlrpc_lprocfs_nrs_orr_offset_type_fops },
1873                 { .name         = "nrs_orr_supported",
1874                   .fops         = &ptlrpc_lprocfs_nrs_orr_supported_fops },
1875                 { NULL }
1876         };
1877
1878         if (!svc->srv_debugfs_entry)
1879                 return 0;
1880
1881         lprocfs_orr_data.svc = svc;
1882
1883         for (i = 0; i < ARRAY_SIZE(nrs_orr_lprocfs_vars); i++)
1884                 nrs_orr_lprocfs_vars[i].data = &lprocfs_orr_data;
1885
1886         ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_orr_lprocfs_vars, NULL);
1887
1888         return 0;
1889 }
1890
1891 static const struct ptlrpc_nrs_pol_ops nrs_orr_ops = {
1892         .op_policy_init         = nrs_orr_init,
1893         .op_policy_start        = nrs_orr_start,
1894         .op_policy_stop         = nrs_orr_stop,
1895         .op_policy_ctl          = nrs_orr_ctl,
1896         .op_res_get             = nrs_orr_res_get,
1897         .op_res_put             = nrs_orr_res_put,
1898         .op_req_get             = nrs_orr_req_get,
1899         .op_req_enqueue         = nrs_orr_req_add,
1900         .op_req_dequeue         = nrs_orr_req_del,
1901         .op_req_stop            = nrs_orr_req_stop,
1902         .op_lprocfs_init        = nrs_orr_lprocfs_init,
1903 };
1904
1905 struct ptlrpc_nrs_pol_conf nrs_conf_orr = {
1906         .nc_name                = NRS_POL_NAME_ORR,
1907         .nc_ops                 = &nrs_orr_ops,
1908         .nc_compat              = nrs_policy_compat_one,
1909         .nc_compat_svc_name     = "ost_io",
1910 };
1911
1912 /**
1913  * TRR, Target-based Round Robin policy
1914  *
1915  * TRR reuses much of the functions and data structures of ORR
1916  */
1917 static int nrs_trr_lprocfs_init(struct ptlrpc_service *svc)
1918 {
1919         int     i;
1920
1921         struct ldebugfs_vars nrs_trr_lprocfs_vars[] = {
1922                 { .name         = "nrs_trr_quantum",
1923                   .fops         = &ptlrpc_lprocfs_nrs_orr_quantum_fops },
1924                 { .name         = "nrs_trr_offset_type",
1925                   .fops         = &ptlrpc_lprocfs_nrs_orr_offset_type_fops },
1926                 { .name         = "nrs_trr_supported",
1927                   .fops         = &ptlrpc_lprocfs_nrs_orr_supported_fops },
1928                 { NULL }
1929         };
1930
1931         if (!svc->srv_debugfs_entry)
1932                 return 0;
1933
1934         lprocfs_trr_data.svc = svc;
1935
1936         for (i = 0; i < ARRAY_SIZE(nrs_trr_lprocfs_vars); i++)
1937                 nrs_trr_lprocfs_vars[i].data = &lprocfs_trr_data;
1938
1939         ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_trr_lprocfs_vars, NULL);
1940
1941         return 0;
1942 }
1943
1944 /**
1945  * Reuse much of the ORR functionality for TRR.
1946  */
1947 static const struct ptlrpc_nrs_pol_ops nrs_trr_ops = {
1948         .op_policy_init         = nrs_orr_init,
1949         .op_policy_start        = nrs_orr_start,
1950         .op_policy_stop         = nrs_orr_stop,
1951         .op_policy_ctl          = nrs_orr_ctl,
1952         .op_res_get             = nrs_orr_res_get,
1953         .op_res_put             = nrs_orr_res_put,
1954         .op_req_get             = nrs_orr_req_get,
1955         .op_req_enqueue         = nrs_orr_req_add,
1956         .op_req_dequeue         = nrs_orr_req_del,
1957         .op_req_stop            = nrs_orr_req_stop,
1958         .op_lprocfs_init        = nrs_trr_lprocfs_init,
1959 };
1960
1961 struct ptlrpc_nrs_pol_conf nrs_conf_trr = {
1962         .nc_name                = NRS_POL_NAME_TRR,
1963         .nc_ops                 = &nrs_trr_ops,
1964         .nc_compat              = nrs_policy_compat_one,
1965         .nc_compat_svc_name     = "ost_io",
1966 };
1967
1968 /** @} ORR/TRR policy */
1969
1970 /** @} nrs */