Whamcloud - gitweb
LU-12616 obclass: fix MDS start/stop race
[fs/lustre-release.git] / lustre / ptlrpc / nrs_orr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2017, Intel Corporation.
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs_orr.c
29  *
30  * Network Request Scheduler (NRS) ORR and TRR policies
31  *
32  * Request scheduling in a Round-Robin manner over backend-fs objects and OSTs
33  * respectively
34  *
35  * Author: Liang Zhen <liang@whamcloud.com>
36  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
37  */
38 #ifdef HAVE_SERVER_SUPPORT
39
40 /**
41  * \addtogoup nrs
42  * @{
43  */
44 #define DEBUG_SUBSYSTEM S_RPC
45 #include <obd_support.h>
46 #include <obd_class.h>
47 #include <lustre_net.h>
48 #include <lustre_req_layout.h>
49 #include "ptlrpc_internal.h"
50
51 /**
52  * \name ORR/TRR policy
53  *
54  * ORR/TRR (Object-based Round Robin/Target-based Round Robin) NRS policies
55  *
56  * ORR performs batched Round Robin shceduling of brw RPCs, based on the FID of
57  * the backend-fs object that the brw RPC pertains to; the TRR policy performs
58  * batched Round Robin scheduling of brw RPCs, based on the OST index that the
59  * RPC pertains to. Both policies also order RPCs in each batch in ascending
60  * offset order, which is lprocfs-tunable between logical file offsets, and
61  * physical disk offsets, as reported by fiemap.
62  *
63  * The TRR policy reuses much of the functionality of ORR. These two scheduling
64  * algorithms could alternatively be implemented under a single NRS policy, that
65  * uses an lprocfs tunable in order to switch between the two types of
66  * scheduling behaviour. The two algorithms have been implemented as separate
67  * policies for reasons of clarity to the user, and to avoid issues that would
68  * otherwise arise at the point of switching between behaviours in the case of
69  * having a single policy, such as resource cleanup for nrs_orr_object
70  * instances. It is possible that this may need to be re-examined in the future,
71  * along with potentially coalescing other policies that perform batched request
72  * scheduling in a Round-Robin manner, all into one policy.
73  *
74  * @{
75  */
76
77 #define NRS_POL_NAME_ORR        "orr"
78 #define NRS_POL_NAME_TRR        "trr"
79
80 /**
81  * Checks if the RPC type of \a nrq is currently handled by an ORR/TRR policy
82  *
83  * \param[in]  orrd   the ORR/TRR policy scheduler instance
84  * \param[in]  nrq    the request
85  * \param[out] opcode the opcode is saved here, just in order to avoid calling
86  *                    lustre_msg_get_opc() again later
87  *
88  * \retval true  request type is supported by the policy instance
89  * \retval false request type is not supported by the policy instance
90  */
91 static bool nrs_orr_req_supported(struct nrs_orr_data *orrd,
92                                   struct ptlrpc_nrs_request *nrq, __u32 *opcode)
93 {
94         struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
95                                                    rq_nrq);
96         __u32                   opc = lustre_msg_get_opc(req->rq_reqmsg);
97         bool                    rc = false;
98
99         /**
100          * XXX: nrs_orr_data::od_supp accessed unlocked.
101          */
102         switch (opc) {
103         case OST_READ:
104                 rc = orrd->od_supp & NOS_OST_READ;
105                 break;
106         case OST_WRITE:
107                 rc = orrd->od_supp & NOS_OST_WRITE;
108                 break;
109         }
110
111         if (rc)
112                 *opcode = opc;
113
114         return rc;
115 }
116
117 /**
118  * Returns the ORR/TRR key fields for the request \a nrq in \a key.
119  *
120  * \param[in]  orrd the ORR/TRR policy scheduler instance
121  * \param[in]  nrq  the request
122  * \param[in]  opc  the request's opcode
123  * \param[in]  name the policy name
124  * \param[out] key  fields of the key are returned here.
125  *
126  * \retval 0   key filled successfully
127  * \retval < 0 error
128  */
129 static int nrs_orr_key_fill(struct nrs_orr_data *orrd,
130                             struct ptlrpc_nrs_request *nrq, __u32 opc,
131                             char *name, struct nrs_orr_key *key)
132 {
133         struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
134                                                    rq_nrq);
135         struct ost_body        *body;
136         __u32                   ost_idx;
137         bool                    is_orr = strncmp(name, NRS_POL_NAME_ORR,
138                                                  NRS_POL_NAME_MAX) == 0;
139
140         LASSERT(req != NULL);
141
142         /**
143          * This is an attempt to fill in the request key fields while
144          * moving a request from the regular to the high-priority NRS
145          * head (via ldlm_lock_reorder_req()), but the request key has
146          * been adequately filled when nrs_orr_res_get() was called through
147          * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR/TRR
148          * policy, so there is nothing to do.
149          */
150         if ((is_orr && nrq->nr_u.orr.or_orr_set) ||
151             (!is_orr && nrq->nr_u.orr.or_trr_set)) {
152                 *key = nrq->nr_u.orr.or_key;
153                 return 0;
154         }
155
156         /* Bounce unconnected requests to the default policy. */
157         if (req->rq_export == NULL)
158                 return -ENOTCONN;
159
160         if (nrq->nr_u.orr.or_orr_set || nrq->nr_u.orr.or_trr_set)
161                 memset(&nrq->nr_u.orr.or_key, 0, sizeof(nrq->nr_u.orr.or_key));
162
163         ost_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
164
165         if (is_orr) {
166                 int     rc;
167                 /**
168                  * The request pill for OST_READ and OST_WRITE requests is
169                  * initialized in the ost_io service's
170                  * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(),
171                  * so no need to redo it here.
172                  */
173                 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
174                 if (body == NULL)
175                         RETURN(-EFAULT);
176
177                 rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx);
178                 if (rc < 0)
179                         return rc;
180
181                 nrq->nr_u.orr.or_orr_set = 1;
182         } else {
183                 key->ok_idx = ost_idx;
184                 nrq->nr_u.orr.or_trr_set = 1;
185         }
186
187         return 0;
188 }
189
190 /**
191  * Populates the range values in \a range with logical offsets obtained via
192  * \a nb.
193  *
194  * \param[in]  nb       niobuf_remote struct array for this request
195  * \param[in]  niocount count of niobuf_remote structs for this request
196  * \param[out] range    the offset range is returned here
197  */
198 static void nrs_orr_range_fill_logical(struct niobuf_remote *nb, int niocount,
199                                        struct nrs_orr_req_range *range)
200 {
201         /* Should we do this at page boundaries ? */
202         range->or_start = nb[0].rnb_offset & PAGE_MASK;
203         range->or_end = (nb[niocount - 1].rnb_offset +
204                          nb[niocount - 1].rnb_len - 1) | ~PAGE_MASK;
205 }
206
207 /**
208  * We obtain information just for a single extent, as the request can only be in
209  * a single place in the binary heap anyway.
210  */
211 #define ORR_NUM_EXTENTS 1
212
213 /**
214  * Converts the logical file offset range in \a range, to a physical disk offset
215  * range in \a range, for a request. Uses obd_get_info() in order to carry out a
216  * fiemap call and obtain backend-fs extent information. The returned range is
217  * in physical block numbers.
218  *
219  * \param[in]     nrq   the request
220  * \param[in]     oa    obdo struct for this request
221  * \param[in,out] range the offset range in bytes; logical range in, physical
222  *                      range out
223  *
224  * \retval 0    physical offsets obtained successfully
225  * \retvall < 0 error
226  */
227 static int nrs_orr_range_fill_physical(struct ptlrpc_nrs_request *nrq,
228                                        struct obdo *oa,
229                                        struct nrs_orr_req_range *range)
230 {
231         struct ptlrpc_request     *req = container_of(nrq,
232                                                       struct ptlrpc_request,
233                                                       rq_nrq);
234         char                       fiemap_buf[offsetof(struct fiemap,
235                                                   fm_extents[ORR_NUM_EXTENTS])];
236         struct fiemap              *fiemap = (struct fiemap *)fiemap_buf;
237         struct ll_fiemap_info_key  key;
238         loff_t                     start;
239         loff_t                     end;
240         int                        rc;
241
242         key = (typeof(key)) {
243                 .lfik_name = KEY_FIEMAP,
244                 .lfik_oa = *oa,
245                 .lfik_fiemap = {
246                         .fm_start = range->or_start,
247                         .fm_length = range->or_end - range->or_start,
248                         .fm_extent_count = ORR_NUM_EXTENTS
249                 }
250         };
251
252         rc = obd_get_info(req->rq_svc_thread->t_env, req->rq_export,
253                           sizeof(key), &key, NULL, fiemap);
254         if (rc < 0)
255                 GOTO(out, rc);
256
257         if (fiemap->fm_mapped_extents == 0 ||
258             fiemap->fm_mapped_extents > ORR_NUM_EXTENTS)
259                 GOTO(out, rc = -EFAULT);
260
261         /**
262          * Calculate the physical offset ranges for the request from the extent
263          * information and the logical request offsets.
264          */
265         start = fiemap->fm_extents[0].fe_physical + range->or_start -
266                 fiemap->fm_extents[0].fe_logical;
267         end = start + range->or_end - range->or_start;
268
269         range->or_start = start;
270         range->or_end = end;
271
272         nrq->nr_u.orr.or_physical_set = 1;
273 out:
274         return rc;
275 }
276
277 /**
278  * Sets the offset range the request covers; either in logical file
279  * offsets or in physical disk offsets.
280  *
281  * \param[in] nrq        the request
282  * \param[in] orrd       the ORR/TRR policy scheduler instance
283  * \param[in] opc        the request's opcode
284  * \param[in] moving_req is the request in the process of moving onto the
285  *                       high-priority NRS head?
286  *
287  * \retval 0    range filled successfully
288  * \retval != 0 error
289  */
290 static int nrs_orr_range_fill(struct ptlrpc_nrs_request *nrq,
291                               struct nrs_orr_data *orrd, __u32 opc,
292                               bool moving_req)
293 {
294         struct ptlrpc_request       *req = container_of(nrq,
295                                                         struct ptlrpc_request,
296                                                         rq_nrq);
297         struct obd_ioobj            *ioo;
298         struct niobuf_remote        *nb;
299         struct ost_body             *body;
300         struct nrs_orr_req_range     range;
301         int                          niocount;
302         int                          rc = 0;
303
304         /**
305          * If we are scheduling using physical disk offsets, but we have filled
306          * the offset information in the request previously
307          * (i.e. ldlm_lock_reorder_req() is moving the request to the
308          * high-priority NRS head), there is no need to do anything, and we can
309          * exit. Moreover than the lack of need, we would be unable to perform
310          * the obd_get_info() call required in nrs_orr_range_fill_physical(),
311          * because ldlm_lock_reorder_lock() calls into here while holding a
312          * spinlock, and retrieving fiemap information via obd_get_info() is a
313          * potentially sleeping operation.
314          */
315         if (orrd->od_physical && nrq->nr_u.orr.or_physical_set)
316                 return 0;
317
318         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
319         if (ioo == NULL)
320                 GOTO(out, rc = -EFAULT);
321
322         niocount = ioo->ioo_bufcnt;
323
324         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
325         if (nb == NULL)
326                 GOTO(out, rc = -EFAULT);
327
328         /**
329          * Use logical information from niobuf_remote structures.
330          */
331         nrs_orr_range_fill_logical(nb, niocount, &range);
332
333         /**
334          * Obtain physical offsets if selected, and this is an OST_READ RPC
335          * RPC. We do not enter this block if moving_req is set which indicates
336          * that the request is being moved to the high-priority NRS head by
337          * ldlm_lock_reorder_req(), as that function calls in here while holding
338          * a spinlock, and nrs_orr_range_physical() can sleep, so we just use
339          * logical file offsets for the range values for such requests.
340          */
341         if (orrd->od_physical && opc == OST_READ && !moving_req) {
342                 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
343                 if (body == NULL)
344                         GOTO(out, rc = -EFAULT);
345
346                 /**
347                  * Translate to physical block offsets from backend filesystem
348                  * extents.
349                  * Ignore return values; if obtaining the physical offsets
350                  * fails, use the logical offsets.
351                  */
352                 nrs_orr_range_fill_physical(nrq, &body->oa, &range);
353         }
354
355         nrq->nr_u.orr.or_range = range;
356 out:
357         return rc;
358 }
359
360 /**
361  * Generates a character string that can be used in order to register uniquely
362  * named libcfs_hash and slab objects for ORR/TRR policy instances. The
363  * character string is unique per policy instance, as it includes the policy's
364  * name, the CPT number, and a {reg|hp} token, and there is one policy instance
365  * per NRS head on each CPT, and the policy is only compatible with the ost_io
366  * service.
367  *
368  * \param[in] policy the policy instance
369  * \param[out] name  the character array that will hold the generated name
370  */
371 static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name)
372 {
373         snprintf(name, NRS_ORR_OBJ_NAME_MAX, "%s%s%s%d",
374                  "nrs_", policy->pol_desc->pd_name,
375                  policy->pol_nrs->nrs_queue_type == PTLRPC_NRS_QUEUE_REG ?
376                  "_reg_" : "_hp_", nrs_pol2cptid(policy));
377 }
378
379 /**
380  * ORR/TRR hash operations
381  */
382 #define NRS_ORR_BITS            24
383 #define NRS_ORR_BKT_BITS        12
384 #define NRS_ORR_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | CFS_HASH_ASSERT_EMPTY)
385
386 #define NRS_TRR_BITS            4
387 #define NRS_TRR_BKT_BITS        2
388 #define NRS_TRR_HASH_FLAGS      CFS_HASH_SPIN_BKTLOCK
389
390 static unsigned
391 nrs_orr_hop_hash(struct cfs_hash *hs, const void *key, unsigned mask)
392 {
393         return cfs_hash_djb2_hash(key, sizeof(struct nrs_orr_key), mask);
394 }
395
396 static void *nrs_orr_hop_key(struct hlist_node *hnode)
397 {
398         struct nrs_orr_object *orro = hlist_entry(hnode,
399                                                       struct nrs_orr_object,
400                                                       oo_hnode);
401         return &orro->oo_key;
402 }
403
404 static int nrs_orr_hop_keycmp(const void *key, struct hlist_node *hnode)
405 {
406         struct nrs_orr_object *orro = hlist_entry(hnode,
407                                                       struct nrs_orr_object,
408                                                       oo_hnode);
409
410         return lu_fid_eq(&orro->oo_key.ok_fid,
411                          &((struct nrs_orr_key *)key)->ok_fid);
412 }
413
414 static void *nrs_orr_hop_object(struct hlist_node *hnode)
415 {
416         return hlist_entry(hnode, struct nrs_orr_object, oo_hnode);
417 }
418
419 static void nrs_orr_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
420 {
421         struct nrs_orr_object *orro = hlist_entry(hnode,
422                                                       struct nrs_orr_object,
423                                                       oo_hnode);
424         orro->oo_ref++;
425 }
426
427 /**
428  * Removes an nrs_orr_object the hash and frees its memory, if the object has
429  * no active users.
430  */
431 static void nrs_orr_hop_put_free(struct cfs_hash *hs, struct hlist_node *hnode)
432 {
433         struct nrs_orr_object *orro = hlist_entry(hnode,
434                                                       struct nrs_orr_object,
435                                                       oo_hnode);
436         struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
437                                                    struct nrs_orr_data, od_res);
438         struct cfs_hash_bd     bd;
439
440         cfs_hash_bd_get_and_lock(hs, &orro->oo_key, &bd, 1);
441
442         if (--orro->oo_ref > 1) {
443                 cfs_hash_bd_unlock(hs, &bd, 1);
444
445                 return;
446         }
447         LASSERT(orro->oo_ref == 1);
448
449         cfs_hash_bd_del_locked(hs, &bd, hnode);
450         cfs_hash_bd_unlock(hs, &bd, 1);
451
452         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
453 }
454
455 static void nrs_orr_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
456 {
457         struct nrs_orr_object *orro = hlist_entry(hnode,
458                                                       struct nrs_orr_object,
459                                                       oo_hnode);
460         orro->oo_ref--;
461 }
462
463 static int nrs_trr_hop_keycmp(const void *key, struct hlist_node *hnode)
464 {
465         struct nrs_orr_object *orro = hlist_entry(hnode,
466                                                       struct nrs_orr_object,
467                                                       oo_hnode);
468
469         return orro->oo_key.ok_idx == ((struct nrs_orr_key *)key)->ok_idx;
470 }
471
472 static void nrs_trr_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
473 {
474         struct nrs_orr_object *orro = hlist_entry(hnode,
475                                                       struct nrs_orr_object,
476                                                       oo_hnode);
477         struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
478                                                    struct nrs_orr_data, od_res);
479
480         LASSERTF(orro->oo_ref == 0,
481                  "Busy NRS TRR policy object for OST with index %u, with %ld "
482                  "refs\n", orro->oo_key.ok_idx, orro->oo_ref);
483
484         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
485 }
486
487 static struct cfs_hash_ops nrs_orr_hash_ops = {
488         .hs_hash        = nrs_orr_hop_hash,
489         .hs_key         = nrs_orr_hop_key,
490         .hs_keycmp      = nrs_orr_hop_keycmp,
491         .hs_object      = nrs_orr_hop_object,
492         .hs_get         = nrs_orr_hop_get,
493         .hs_put         = nrs_orr_hop_put_free,
494         .hs_put_locked  = nrs_orr_hop_put,
495 };
496
497 static struct cfs_hash_ops nrs_trr_hash_ops = {
498         .hs_hash        = nrs_orr_hop_hash,
499         .hs_key         = nrs_orr_hop_key,
500         .hs_keycmp      = nrs_trr_hop_keycmp,
501         .hs_object      = nrs_orr_hop_object,
502         .hs_get         = nrs_orr_hop_get,
503         .hs_put         = nrs_orr_hop_put,
504         .hs_put_locked  = nrs_orr_hop_put,
505         .hs_exit        = nrs_trr_hop_exit,
506 };
507
508 #define NRS_ORR_QUANTUM_DFLT    256
509
510 /**
511  * Binary heap predicate.
512  *
513  * Uses
514  * ptlrpc_nrs_request::nr_u::orr::or_round,
515  * ptlrpc_nrs_request::nr_u::orr::or_sequence, and
516  * ptlrpc_nrs_request::nr_u::orr::or_range to compare two binheap nodes and
517  * produce a binary predicate that indicates their relative priority, so that
518  * the binary heap can perform the necessary sorting operations.
519  *
520  * \param[in] e1 the first binheap node to compare
521  * \param[in] e2 the second binheap node to compare
522  *
523  * \retval 0 e1 > e2
524  * \retval 1 e1 < e2
525  */
526 static int
527 orr_req_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
528 {
529         struct ptlrpc_nrs_request *nrq1;
530         struct ptlrpc_nrs_request *nrq2;
531
532         nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
533         nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
534
535         /**
536          * Requests have been scheduled against a different scheduling round.
537          */
538         if (nrq1->nr_u.orr.or_round < nrq2->nr_u.orr.or_round)
539                 return 1;
540         else if (nrq1->nr_u.orr.or_round > nrq2->nr_u.orr.or_round)
541                 return 0;
542
543         /**
544          * Requests have been scheduled against the same scheduling round, but
545          * belong to a different batch, i.e. they pertain to a different
546          * backend-fs object (for ORR policy instances) or OST (for TRR policy
547          * instances).
548          */
549         if (nrq1->nr_u.orr.or_sequence < nrq2->nr_u.orr.or_sequence)
550                 return 1;
551         else if (nrq1->nr_u.orr.or_sequence > nrq2->nr_u.orr.or_sequence)
552                 return 0;
553
554         /**
555          * If round numbers and sequence numbers are equal, the two requests
556          * have been scheduled on the same round, and belong to the same batch,
557          * which means they pertain to the same backend-fs object (if this is an
558          * ORR policy instance), or to the same OST (if this is a TRR policy
559          * instance), so these requests should be sorted by ascending offset
560          * order.
561          */
562         if (nrq1->nr_u.orr.or_range.or_start <
563             nrq2->nr_u.orr.or_range.or_start) {
564                 return 1;
565         } else if (nrq1->nr_u.orr.or_range.or_start >
566                  nrq2->nr_u.orr.or_range.or_start) {
567                 return 0;
568         } else {
569                 /**
570                  * Requests start from the same offset; Dispatch the shorter one
571                  * first; perhaps slightly more chances of hitting caches like
572                  * this.
573                  */
574                 return nrq1->nr_u.orr.or_range.or_end <
575                        nrq2->nr_u.orr.or_range.or_end;
576         }
577 }
578
579 /**
580  * ORR binary heap operations
581  */
582 static struct cfs_binheap_ops nrs_orr_heap_ops = {
583         .hop_enter      = NULL,
584         .hop_exit       = NULL,
585         .hop_compare    = orr_req_compare,
586 };
587
588 /**
589  * Prints a warning message if an ORR/TRR policy is started on a service with
590  * more than one CPT.  Not printed on the console for now, since we don't
591  * have any performance metrics in the first place, and it is annoying.
592  *
593  * \param[in] policy the policy instance
594  *
595  * \retval 0 success
596  */
597 static int nrs_orr_init(struct ptlrpc_nrs_policy *policy)
598 {
599         if (policy->pol_nrs->nrs_svcpt->scp_service->srv_ncpts > 1)
600                 CDEBUG(D_CONFIG, "%s: The %s NRS policy was registered on a "
601                       "service with multiple service partitions. This policy "
602                       "may perform better with a single partition.\n",
603                       policy->pol_nrs->nrs_svcpt->scp_service->srv_name,
604                       policy->pol_desc->pd_name);
605
606         return 0;
607 }
608
609 /**
610  * Called when an ORR policy instance is started.
611  *
612  * \param[in] policy the policy
613  *
614  * \retval -ENOMEM OOM error
615  * \retval 0       success
616  */
617 static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg)
618 {
619         struct nrs_orr_data    *orrd;
620         struct cfs_hash_ops            *ops;
621         unsigned                cur_bits;
622         unsigned                max_bits;
623         unsigned                bkt_bits;
624         unsigned                flags;
625         int                     rc = 0;
626         ENTRY;
627
628         OBD_CPT_ALLOC_PTR(orrd, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
629         if (orrd == NULL)
630                 RETURN(-ENOMEM);
631
632         /*
633          * Binary heap instance for sorted incoming requests.
634          */
635         orrd->od_binheap = cfs_binheap_create(&nrs_orr_heap_ops,
636                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
637                                               nrs_pol2cptab(policy),
638                                               nrs_pol2cptid(policy));
639         if (orrd->od_binheap == NULL)
640                 GOTO(out_orrd, rc = -ENOMEM);
641
642         nrs_orr_genobjname(policy, orrd->od_objname);
643
644         /**
645          * Slab cache for NRS ORR/TRR objects.
646          */
647         orrd->od_cache = kmem_cache_create(orrd->od_objname,
648                                            sizeof(struct nrs_orr_object),
649                                            0, 0, NULL);
650         if (orrd->od_cache == NULL)
651                 GOTO(out_binheap, rc = -ENOMEM);
652
653         if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
654                     NRS_POL_NAME_MAX) == 0) {
655                 ops = &nrs_orr_hash_ops;
656                 cur_bits = NRS_ORR_BITS;
657                 max_bits = NRS_ORR_BITS;
658                 bkt_bits = NRS_ORR_BKT_BITS;
659                 flags = NRS_ORR_HASH_FLAGS;
660         } else {
661                 ops = &nrs_trr_hash_ops;
662                 cur_bits = NRS_TRR_BITS;
663                 max_bits = NRS_TRR_BITS;
664                 bkt_bits = NRS_TRR_BKT_BITS;
665                 flags = NRS_TRR_HASH_FLAGS;
666         }
667
668         /**
669          * Hash for finding objects by struct nrs_orr_key.
670          * XXX: For TRR, it might be better to avoid using libcfs_hash?
671          * All that needs to be resolved are OST indices, and they
672          * will stay relatively stable during an OSS node's lifetime.
673          */
674         orrd->od_obj_hash = cfs_hash_create(orrd->od_objname, cur_bits,
675                                             max_bits, bkt_bits, 0,
676                                             CFS_HASH_MIN_THETA,
677                                             CFS_HASH_MAX_THETA, ops, flags);
678         if (orrd->od_obj_hash == NULL)
679                 GOTO(out_cache, rc = -ENOMEM);
680
681         /* XXX: Fields accessed unlocked */
682         orrd->od_quantum = NRS_ORR_QUANTUM_DFLT;
683         orrd->od_supp = NOS_DFLT;
684         orrd->od_physical = true;
685         /**
686          * Set to 1 so that the test inside nrs_orr_req_add() can evaluate to
687          * true.
688          */
689         orrd->od_sequence = 1;
690
691         policy->pol_private = orrd;
692
693         RETURN(rc);
694
695 out_cache:
696         kmem_cache_destroy(orrd->od_cache);
697 out_binheap:
698         cfs_binheap_destroy(orrd->od_binheap);
699 out_orrd:
700         OBD_FREE_PTR(orrd);
701
702         RETURN(rc);
703 }
704
705 /**
706  * Called when an ORR/TRR policy instance is stopped.
707  *
708  * Called when the policy has been instructed to transition to the
709  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more
710  * pending requests to serve.
711  *
712  * \param[in] policy the policy
713  */
714 static void nrs_orr_stop(struct ptlrpc_nrs_policy *policy)
715 {
716         struct nrs_orr_data *orrd = policy->pol_private;
717         ENTRY;
718
719         LASSERT(orrd != NULL);
720         LASSERT(orrd->od_binheap != NULL);
721         LASSERT(orrd->od_obj_hash != NULL);
722         LASSERT(orrd->od_cache != NULL);
723         LASSERT(cfs_binheap_is_empty(orrd->od_binheap));
724
725         cfs_binheap_destroy(orrd->od_binheap);
726         cfs_hash_putref(orrd->od_obj_hash);
727         kmem_cache_destroy(orrd->od_cache);
728
729         OBD_FREE_PTR(orrd);
730 }
731
732 /**
733  * Performs a policy-specific ctl function on ORR/TRR policy instances; similar
734  * to ioctl.
735  *
736  * \param[in]     policy the policy instance
737  * \param[in]     opc    the opcode
738  * \param[in,out] arg    used for passing parameters and information
739  *
740  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
741  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
742  *
743  * \retval 0   operation carried successfully
744  * \retval -ve error
745  */
746 static int nrs_orr_ctl(struct ptlrpc_nrs_policy *policy,
747                        enum ptlrpc_nrs_ctl opc, void *arg)
748 {
749         assert_spin_locked(&policy->pol_nrs->nrs_lock);
750
751         switch((enum nrs_ctl_orr)opc) {
752         default:
753                 RETURN(-EINVAL);
754
755         case NRS_CTL_ORR_RD_QUANTUM: {
756                 struct nrs_orr_data     *orrd = policy->pol_private;
757
758                 *(__u16 *)arg = orrd->od_quantum;
759                 }
760                 break;
761
762         case NRS_CTL_ORR_WR_QUANTUM: {
763                 struct nrs_orr_data     *orrd = policy->pol_private;
764
765                 orrd->od_quantum = *(__u16 *)arg;
766                 LASSERT(orrd->od_quantum != 0);
767                 }
768                 break;
769
770         case NRS_CTL_ORR_RD_OFF_TYPE: {
771                 struct nrs_orr_data     *orrd = policy->pol_private;
772
773                 *(bool *)arg = orrd->od_physical;
774                 }
775                 break;
776
777         case NRS_CTL_ORR_WR_OFF_TYPE: {
778                 struct nrs_orr_data     *orrd = policy->pol_private;
779
780                 orrd->od_physical = *(bool *)arg;
781                 }
782                 break;
783
784         case NRS_CTL_ORR_RD_SUPP_REQ: {
785                 struct nrs_orr_data     *orrd = policy->pol_private;
786
787                 *(enum nrs_orr_supp *)arg = orrd->od_supp;
788                 }
789                 break;
790
791         case NRS_CTL_ORR_WR_SUPP_REQ: {
792                 struct nrs_orr_data     *orrd = policy->pol_private;
793
794                 orrd->od_supp = *(enum nrs_orr_supp *)arg;
795                 LASSERT((orrd->od_supp & NOS_OST_RW) != 0);
796                 }
797                 break;
798         }
799         RETURN(0);
800 }
801
802 /**
803  * Obtains resources for ORR/TRR policy instances. The top-level resource lives
804  * inside \e nrs_orr_data and the second-level resource inside
805  * \e nrs_orr_object instances.
806  *
807  * \param[in]  policy     the policy for which resources are being taken for
808  *                        request \a nrq
809  * \param[in]  nrq        the request for which resources are being taken
810  * \param[in]  parent     parent resource, embedded in nrs_orr_data for the
811  *                        ORR/TRR policies
812  * \param[out] resp       used to return resource references
813  * \param[in]  moving_req signifies limited caller context; used to perform
814  *                        memory allocations in an atomic context in this
815  *                        policy
816  *
817  * \retval 0   we are returning a top-level, parent resource, one that is
818  *             embedded in an nrs_orr_data object
819  * \retval 1   we are returning a bottom-level resource, one that is embedded
820  *             in an nrs_orr_object object
821  *
822  * \see nrs_resource_get_safe()
823  */
824 static int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
825                            struct ptlrpc_nrs_request *nrq,
826                            const struct ptlrpc_nrs_resource *parent,
827                            struct ptlrpc_nrs_resource **resp, bool moving_req)
828 {
829         struct nrs_orr_data            *orrd;
830         struct nrs_orr_object          *orro;
831         struct nrs_orr_object          *tmp;
832         struct nrs_orr_key              key = { { { 0 } } };
833         __u32                           opc;
834         int                             rc = 0;
835
836         /**
837          * struct nrs_orr_data is requested.
838          */
839         if (parent == NULL) {
840                 *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res;
841                 return 0;
842         }
843
844         orrd = container_of(parent, struct nrs_orr_data, od_res);
845
846         /**
847          * If the request type is not supported, fail the enqueuing; the RPC
848          * will be handled by the fallback NRS policy.
849          */
850         if (!nrs_orr_req_supported(orrd, nrq, &opc))
851                 return -1;
852
853         /**
854          * Fill in the key for the request; OST FID for ORR policy instances,
855          * and OST index for TRR policy instances.
856          */
857         rc = nrs_orr_key_fill(orrd, nrq, opc, policy->pol_desc->pd_name, &key);
858         if (rc < 0)
859                 RETURN(rc);
860
861         /**
862          * Set the offset range the request covers
863          */
864         rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req);
865         if (rc < 0)
866                 RETURN(rc);
867
868         orro = cfs_hash_lookup(orrd->od_obj_hash, &key);
869         if (orro != NULL)
870                 goto out;
871
872         OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache,
873                                    nrs_pol2cptab(policy), nrs_pol2cptid(policy),
874                                    moving_req ? GFP_ATOMIC : GFP_NOFS);
875         if (orro == NULL)
876                 RETURN(-ENOMEM);
877
878         orro->oo_key = key;
879         orro->oo_ref = 1;
880
881         tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key,
882                                       &orro->oo_hnode);
883         if (tmp != orro) {
884                 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
885                 orro = tmp;
886         }
887 out:
888         /**
889          * For debugging purposes
890          */
891         nrq->nr_u.orr.or_key = orro->oo_key;
892
893         *resp = &orro->oo_res;
894
895         return 1;
896 }
897
898 /**
899  * Called when releasing references to the resource hierachy obtained for a
900  * request for scheduling using ORR/TRR policy instances
901  *
902  * \param[in] policy   the policy the resource belongs to
903  * \param[in] res      the resource to be released
904  */
905 static void nrs_orr_res_put(struct ptlrpc_nrs_policy *policy,
906                             const struct ptlrpc_nrs_resource *res)
907 {
908         struct nrs_orr_data     *orrd;
909         struct nrs_orr_object   *orro;
910
911         /**
912          * Do nothing for freeing parent, nrs_orr_data resources.
913          */
914         if (res->res_parent == NULL)
915                 return;
916
917         orro = container_of(res, struct nrs_orr_object, oo_res);
918         orrd = container_of(res->res_parent, struct nrs_orr_data, od_res);
919
920         cfs_hash_put(orrd->od_obj_hash, &orro->oo_hnode);
921 }
922
923 /**
924  * Called when polling an ORR/TRR policy instance for a request so that it can
925  * be served. Returns the request that is at the root of the binary heap, as
926  * that is the lowest priority one (i.e. libcfs_heap is an implementation of a
927  * min-heap)
928  *
929  * \param[in] policy the policy instance being polled
930  * \param[in] peek   when set, signifies that we just want to examine the
931  *                   request, and not handle it, so the request is not removed
932  *                   from the policy.
933  * \param[in] force  force the policy to return a request; unused in this policy
934  *
935  * \retval the request to be handled
936  * \retval NULL no request available
937  *
938  * \see ptlrpc_nrs_req_get_nolock()
939  * \see nrs_request_get()
940  */
941 static
942 struct ptlrpc_nrs_request *nrs_orr_req_get(struct ptlrpc_nrs_policy *policy,
943                                            bool peek, bool force)
944 {
945         struct nrs_orr_data       *orrd = policy->pol_private;
946         struct cfs_binheap_node   *node = cfs_binheap_root(orrd->od_binheap);
947         struct ptlrpc_nrs_request *nrq;
948
949         nrq = unlikely(node == NULL) ? NULL :
950               container_of(node, struct ptlrpc_nrs_request, nr_node);
951
952         if (likely(!peek && nrq != NULL)) {
953                 struct nrs_orr_object *orro;
954
955                 orro = container_of(nrs_request_resource(nrq),
956                                     struct nrs_orr_object, oo_res);
957
958                 LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
959
960                 cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
961                 orro->oo_active--;
962
963                 if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
964                                  NRS_POL_NAME_MAX) == 0)
965                         CDEBUG(D_RPCTRACE,
966                                "NRS: starting to handle %s request for object "
967                                "with FID "DFID", from OST with index %u, with "
968                                "round %llu\n", NRS_POL_NAME_ORR,
969                                PFID(&orro->oo_key.ok_fid),
970                                nrq->nr_u.orr.or_key.ok_idx,
971                                nrq->nr_u.orr.or_round);
972                 else
973                         CDEBUG(D_RPCTRACE,
974                                "NRS: starting to handle %s request from OST "
975                                "with index %u, with round %llu\n",
976                                NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
977                                nrq->nr_u.orr.or_round);
978
979                 /** Peek at the next request to be served */
980                 node = cfs_binheap_root(orrd->od_binheap);
981
982                 /** No more requests */
983                 if (unlikely(node == NULL)) {
984                         orrd->od_round++;
985                 } else {
986                         struct ptlrpc_nrs_request *next;
987
988                         next = container_of(node, struct ptlrpc_nrs_request,
989                                             nr_node);
990
991                         if (orrd->od_round < next->nr_u.orr.or_round)
992                                 orrd->od_round = next->nr_u.orr.or_round;
993                 }
994         }
995
996         return nrq;
997 }
998
999 /**
1000  * Sort-adds request \a nrq to an ORR/TRR \a policy instance's set of queued
1001  * requests in the policy's binary heap.
1002  *
1003  * A scheduling round is a stream of requests that have been sorted in batches
1004  * according to the backend-fs object (for ORR policy instances) or OST (for TRR
1005  * policy instances) that they pertain to (as identified by its IDIF FID or OST
1006  * index respectively); there can be only one batch for each object or OST in
1007  * each round. The batches are of maximum size nrs_orr_data:od_quantum. When a
1008  * new request arrives for scheduling for an object or OST that has exhausted
1009  * its quantum in its current round, the request will be scheduled on the next
1010  * scheduling round. Requests are allowed to be scheduled against a round until
1011  * all requests for the round are serviced, so an object or OST might miss a
1012  * round if requests are not scheduled for it for a long enough period of time.
1013  * Objects or OSTs that miss a round will continue with having their next
1014  * request scheduled, starting at the round that requests are being dispatched
1015  * for, at the time of arrival of this request.
1016  *
1017  * Requests are tagged with the round number and a sequence number; the sequence
1018  * number indicates the relative ordering amongst the batches of requests in a
1019  * round, and is identical for all requests in a batch, as is the round number.
1020  * The round and sequence numbers are used by orr_req_compare() in order to use
1021  * nrs_orr_data::od_binheap in order to maintain an ordered set of rounds, with
1022  * each round consisting of an ordered set of batches of requests, and each
1023  * batch consisting of an ordered set of requests according to their logical
1024  * file or physical disk offsets.
1025  *
1026  * \param[in] policy the policy
1027  * \param[in] nrq    the request to add
1028  *
1029  * \retval 0    request successfully added
1030  * \retval != 0 error
1031  */
1032 static int nrs_orr_req_add(struct ptlrpc_nrs_policy *policy,
1033                            struct ptlrpc_nrs_request *nrq)
1034 {
1035         struct nrs_orr_data     *orrd;
1036         struct nrs_orr_object   *orro;
1037         int                      rc;
1038
1039         orro = container_of(nrs_request_resource(nrq),
1040                             struct nrs_orr_object, oo_res);
1041         orrd = container_of(nrs_request_resource(nrq)->res_parent,
1042                             struct nrs_orr_data, od_res);
1043
1044         if (orro->oo_quantum == 0 || orro->oo_round < orrd->od_round ||
1045             (orro->oo_active == 0 && orro->oo_quantum > 0)) {
1046
1047                 /**
1048                  * If there are no pending requests for the object/OST, but some
1049                  * of its quantum still remains unused, which implies we did not
1050                  * get a chance to schedule up to its maximum allowed batch size
1051                  * of requests in the previous round this object/OST
1052                  * participated in, schedule this next request on a new round;
1053                  * this avoids fragmentation of request batches caused by
1054                  * intermittent inactivity on the object/OST, at the expense of
1055                  * potentially slightly increased service time for the request
1056                  * batch this request will be a part of.
1057                  */
1058                 if (orro->oo_active == 0 && orro->oo_quantum > 0)
1059                         orro->oo_round++;
1060
1061                 /** A new scheduling round has commenced */
1062                 if (orro->oo_round < orrd->od_round)
1063                         orro->oo_round = orrd->od_round;
1064
1065                 /** I was not the last object/OST that scheduled a request */
1066                 if (orro->oo_sequence < orrd->od_sequence)
1067                         orro->oo_sequence = ++orrd->od_sequence;
1068                 /**
1069                  * Reset the quantum if we have reached the maximum quantum
1070                  * size for this batch, or even if we have not managed to
1071                  * complete a batch size up to its maximum allowed size.
1072                  * XXX: Accessed unlocked
1073                  */
1074                 orro->oo_quantum = orrd->od_quantum;
1075         }
1076
1077         nrq->nr_u.orr.or_round = orro->oo_round;
1078         nrq->nr_u.orr.or_sequence = orro->oo_sequence;
1079
1080         rc = cfs_binheap_insert(orrd->od_binheap, &nrq->nr_node);
1081         if (rc == 0) {
1082                 orro->oo_active++;
1083                 if (--orro->oo_quantum == 0)
1084                         orro->oo_round++;
1085         }
1086         return rc;
1087 }
1088
1089 /**
1090  * Removes request \a nrq from an ORR/TRR \a policy instance's set of queued
1091  * requests.
1092  *
1093  * \param[in] policy the policy
1094  * \param[in] nrq    the request to remove
1095  */
1096 static void nrs_orr_req_del(struct ptlrpc_nrs_policy *policy,
1097                             struct ptlrpc_nrs_request *nrq)
1098 {
1099         struct nrs_orr_data     *orrd;
1100         struct nrs_orr_object   *orro;
1101         bool                     is_root;
1102
1103         orro = container_of(nrs_request_resource(nrq),
1104                             struct nrs_orr_object, oo_res);
1105         orrd = container_of(nrs_request_resource(nrq)->res_parent,
1106                             struct nrs_orr_data, od_res);
1107
1108         LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
1109
1110         is_root = &nrq->nr_node == cfs_binheap_root(orrd->od_binheap);
1111
1112         cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
1113         orro->oo_active--;
1114
1115         /**
1116          * If we just deleted the node at the root of the binheap, we may have
1117          * to adjust round numbers.
1118          */
1119         if (unlikely(is_root)) {
1120                 /** Peek at the next request to be served */
1121                 struct cfs_binheap_node *node = cfs_binheap_root(orrd->od_binheap);
1122
1123                 /** No more requests */
1124                 if (unlikely(node == NULL)) {
1125                         orrd->od_round++;
1126                 } else {
1127                         nrq = container_of(node, struct ptlrpc_nrs_request,
1128                                            nr_node);
1129
1130                         if (orrd->od_round < nrq->nr_u.orr.or_round)
1131                                 orrd->od_round = nrq->nr_u.orr.or_round;
1132                 }
1133         }
1134 }
1135
1136 /**
1137  * Called right after the request \a nrq finishes being handled by ORR policy
1138  * instance \a policy.
1139  *
1140  * \param[in] policy the policy that handled the request
1141  * \param[in] nrq    the request that was handled
1142  */
1143 static void nrs_orr_req_stop(struct ptlrpc_nrs_policy *policy,
1144                              struct ptlrpc_nrs_request *nrq)
1145 {
1146         /** NB: resource control, credits etc can be added here */
1147         if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
1148                     NRS_POL_NAME_MAX) == 0)
1149                 CDEBUG(D_RPCTRACE,
1150                        "NRS: finished handling %s request for object with FID "
1151                        DFID", from OST with index %u, with round %llu\n",
1152                        NRS_POL_NAME_ORR, PFID(&nrq->nr_u.orr.or_key.ok_fid),
1153                        nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round);
1154         else
1155                 CDEBUG(D_RPCTRACE,
1156                        "NRS: finished handling %s request from OST with index %u,"
1157                        " with round %llu\n",
1158                        NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
1159                        nrq->nr_u.orr.or_round);
1160 }
1161
1162 /**
1163  * debugfs interface
1164  */
1165
1166 /**
1167  * This allows to bundle the policy name into the lprocfs_vars::data pointer
1168  * so that lprocfs read/write functions can be used by both the ORR and TRR
1169  * policies.
1170  */
1171 static struct nrs_lprocfs_orr_data {
1172         struct ptlrpc_service   *svc;
1173         char                    *name;
1174 } lprocfs_orr_data = {
1175         .name = NRS_POL_NAME_ORR
1176 }, lprocfs_trr_data = {
1177         .name = NRS_POL_NAME_TRR
1178 };
1179
1180 /**
1181  * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
1182  * for ORR/TRR policy instances on both the regular and high-priority NRS head
1183  * of a service, as long as a policy instance is not in the
1184  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1185  * state are skipped later by nrs_orr_ctl().
1186  *
1187  * Quantum values are in # of RPCs, and the output is in YAML format.
1188  *
1189  * For example:
1190  *
1191  *      reg_quantum:256
1192  *      hp_quantum:8
1193  *
1194  * XXX: the CRR-N version of this, ptlrpc_lprocfs_rd_nrs_crrn_quantum() is
1195  * almost identical; it can be reworked and then reused for ORR/TRR.
1196  */
1197 static int
1198 ptlrpc_lprocfs_nrs_orr_quantum_seq_show(struct seq_file *m, void *data)
1199 {
1200         struct nrs_lprocfs_orr_data *orr_data = m->private;
1201         struct ptlrpc_service       *svc = orr_data->svc;
1202         __u16                        quantum;
1203         int                          rc;
1204
1205         /**
1206          * Perform two separate calls to this as only one of the NRS heads'
1207          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
1208          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1209          */
1210         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1211                                        orr_data->name,
1212                                        NRS_CTL_ORR_RD_QUANTUM,
1213                                        true, &quantum);
1214         if (rc == 0) {
1215                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_REG "%-5d\n", quantum);
1216                 /**
1217                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1218                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1219                  */
1220         } else if (rc != -ENODEV) {
1221                 return rc;
1222         }
1223
1224         /**
1225          * We know the ost_io service which is the only one ORR/TRR policies are
1226          * compatible with, do have an HP NRS head, but it may be best to guard
1227          * against a possible change of this in the future.
1228          */
1229         if (!nrs_svc_has_hp(svc))
1230                 goto no_hp;
1231
1232         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1233                                        orr_data->name, NRS_CTL_ORR_RD_QUANTUM,
1234                                        true, &quantum);
1235         if (rc == 0) {
1236                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
1237                 /**
1238                  * Ignore -ENODEV as the high priority NRS head's policy may be
1239                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1240                  */
1241         } else if (rc != -ENODEV) {
1242                 return rc;
1243         }
1244
1245 no_hp:
1246
1247         return rc;
1248 }
1249
1250 /**
1251  * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
1252  * for ORR/TRR policy instances of a service. The user can set the quantum size
1253  * for the regular and high priority NRS head separately by specifying each
1254  * value, or both together in a single invocation.
1255  *
1256  * For example:
1257  *
1258  * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=req_quantum:64, to set the
1259  * request quantum size of the ORR policy instance on the regular NRS head of
1260  * the ost_io service to 64
1261  *
1262  * lctl set_param ost.OSS.ost_io.nrs_trr_quantum=hp_quantum:8 to set the request
1263  * quantum size of the TRR policy instance on the high priority NRS head of the
1264  * ost_io service to 8
1265  *
1266  * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=32, to set both the request
1267  * quantum size of the ORR policy instance on both the regular and the high
1268  * priority NRS head of the ost_io service to 32
1269  *
1270  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
1271  * are skipped later by nrs_orr_ctl().
1272  *
1273  * XXX: the CRR-N version of this, ptlrpc_lprocfs_wr_nrs_crrn_quantum() is
1274  * almost identical; it can be reworked and then reused for ORR/TRR.
1275  */
1276 static ssize_t
1277 ptlrpc_lprocfs_nrs_orr_quantum_seq_write(struct file *file,
1278                                          const char __user *buffer,
1279                                          size_t count, loff_t *off)
1280 {
1281         struct seq_file             *m = file->private_data;
1282         struct nrs_lprocfs_orr_data *orr_data = m->private;
1283         struct ptlrpc_service       *svc = orr_data->svc;
1284         enum ptlrpc_nrs_queue_type   queue = 0;
1285         char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
1286         char                        *val;
1287         long                         quantum_reg;
1288         long                         quantum_hp;
1289         /** lprocfs_find_named_value() modifies its argument, so keep a copy */
1290         size_t                       count_copy;
1291         int                          rc = 0;
1292         int                          rc2 = 0;
1293
1294         if (count > (sizeof(kernbuf) - 1))
1295                 return -EINVAL;
1296
1297         if (copy_from_user(kernbuf, buffer, count))
1298                 return -EFAULT;
1299
1300         kernbuf[count] = '\0';
1301
1302         count_copy = count;
1303
1304         /**
1305          * Check if the regular quantum value has been specified
1306          */
1307         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
1308                                        &count_copy);
1309         if (val != kernbuf) {
1310                 rc = kstrtol(val, 10, &quantum_reg);
1311                 if (rc)
1312                         return rc;
1313                 queue |= PTLRPC_NRS_QUEUE_REG;
1314         }
1315
1316         count_copy = count;
1317
1318         /**
1319          * Check if the high priority quantum value has been specified
1320          */
1321         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
1322                                        &count_copy);
1323         if (val != kernbuf) {
1324                 if (!nrs_svc_has_hp(svc))
1325                         return -ENODEV;
1326
1327                 rc = kstrtol(val, 10, &quantum_hp);
1328                 if (rc)
1329                         return rc;
1330
1331                 queue |= PTLRPC_NRS_QUEUE_HP;
1332         }
1333
1334         /**
1335          * If none of the queues has been specified, look for a valid numerical
1336          * value
1337          */
1338         if (queue == 0) {
1339                 rc = kstrtol(kernbuf, 10, &quantum_reg);
1340                 if (rc)
1341                         return rc;
1342
1343                 queue = PTLRPC_NRS_QUEUE_REG;
1344
1345                 if (nrs_svc_has_hp(svc)) {
1346                         queue |= PTLRPC_NRS_QUEUE_HP;
1347                         quantum_hp = quantum_reg;
1348                 }
1349         }
1350
1351         if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
1352             ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
1353             (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
1354             ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
1355                 return -EINVAL;
1356
1357         /**
1358          * We change the values on regular and HP NRS heads separately, so that
1359          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1360          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1361          * started the policy on either the regular or HP NRS head; i.e. we are
1362          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1363          * only if the operation fails with -ENODEV on all heads that have been
1364          * specified by the command; if at least one operation succeeds,
1365          * success is returned.
1366          */
1367         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1368                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1369                                                orr_data->name,
1370                                                NRS_CTL_ORR_WR_QUANTUM, false,
1371                                                &quantum_reg);
1372                 if ((rc < 0 && rc != -ENODEV) ||
1373                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1374                         return rc;
1375         }
1376
1377         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1378                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1379                                                 orr_data->name,
1380                                                 NRS_CTL_ORR_WR_QUANTUM, false,
1381                                                 &quantum_hp);
1382                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1383                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1384                         return rc2;
1385         }
1386
1387         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1388 }
1389
1390 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_quantum);
1391
1392 #define LPROCFS_NRS_OFF_NAME_REG                "reg_offset_type:"
1393 #define LPROCFS_NRS_OFF_NAME_HP                 "hp_offset_type:"
1394
1395 #define LPROCFS_NRS_OFF_NAME_PHYSICAL           "physical"
1396 #define LPROCFS_NRS_OFF_NAME_LOGICAL            "logical"
1397
1398 /**
1399  * Retrieves the offset type used by ORR/TRR policy instances on both the
1400  * regular and high-priority NRS head of a service, as long as a policy
1401  * instance is not in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state;
1402  * policy instances in this state are skipped later by nrs_orr_ctl().
1403  *
1404  * Offset type information is a (physical|logical) string, and output is
1405  * in YAML format.
1406  *
1407  * For example:
1408  *
1409  *      reg_offset_type:physical
1410  *      hp_offset_type:logical
1411  */
1412 static int
1413 ptlrpc_lprocfs_nrs_orr_offset_type_seq_show(struct seq_file *m, void *data)
1414 {
1415         struct nrs_lprocfs_orr_data *orr_data = m->private;
1416         struct ptlrpc_service       *svc = orr_data->svc;
1417         bool                         physical;
1418         int                          rc;
1419
1420         /**
1421          * Perform two separate calls to this as only one of the NRS heads'
1422          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1423          * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1424          */
1425         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1426                                        orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1427                                        true, &physical);
1428         if (rc == 0) {
1429                 seq_printf(m, LPROCFS_NRS_OFF_NAME_REG"%s\n",
1430                            physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1431                            LPROCFS_NRS_OFF_NAME_LOGICAL);
1432                 /**
1433                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1434                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1435                  */
1436         } else if (rc != -ENODEV) {
1437                 return rc;
1438         }
1439
1440         /**
1441          * We know the ost_io service which is the only one ORR/TRR policies are
1442          * compatible with, do have an HP NRS head, but it may be best to guard
1443          * against a possible change of this in the future.
1444          */
1445         if (!nrs_svc_has_hp(svc))
1446                 goto no_hp;
1447
1448         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1449                                        orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1450                                        true, &physical);
1451         if (rc == 0) {
1452                 seq_printf(m, LPROCFS_NRS_OFF_NAME_HP"%s\n",
1453                            physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1454                            LPROCFS_NRS_OFF_NAME_LOGICAL);
1455                 /**
1456                  * Ignore -ENODEV as the high priority NRS head's policy may be
1457                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1458                  */
1459         } else if (rc != -ENODEV) {
1460                 return rc;
1461         }
1462
1463 no_hp:
1464         return rc;
1465 }
1466
1467 /**
1468  * Max valid command string is the size of the labels, plus "physical" twice.
1469  * plus a separating ' '
1470  */
1471 #define LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD                                        \
1472         sizeof(LPROCFS_NRS_OFF_NAME_REG LPROCFS_NRS_OFF_NAME_PHYSICAL " "      \
1473                LPROCFS_NRS_OFF_NAME_HP LPROCFS_NRS_OFF_NAME_PHYSICAL)
1474
1475 /**
1476  * Sets the type of offsets used to order RPCs in ORR/TRR policy instances. The
1477  * user can set offset type for the regular or high priority NRS head
1478  * separately by specifying each value, or both together in a single invocation.
1479  *
1480  * For example:
1481  *
1482  * lctl set_param ost.OSS.ost_io.nrs_orr_offset_type=
1483  * reg_offset_type:physical, to enable the ORR policy instance on the regular
1484  * NRS head of the ost_io service to use physical disk offset ordering.
1485  *
1486  * lctl set_param ost.OSS.ost_io.nrs_trr_offset_type=logical, to enable the TRR
1487  * policy instances on both the regular ang high priority NRS heads of the
1488  * ost_io service to use logical file offset ordering.
1489  *
1490  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1491  * are skipped later by nrs_orr_ctl().
1492  */
1493 static ssize_t
1494 ptlrpc_lprocfs_nrs_orr_offset_type_seq_write(struct file *file,
1495                                              const char __user *buffer,
1496                                               size_t count,
1497                                              loff_t *off)
1498 {
1499         struct seq_file             *m = file->private_data;
1500         struct nrs_lprocfs_orr_data *orr_data = m->private;
1501         struct ptlrpc_service       *svc = orr_data->svc;
1502         enum ptlrpc_nrs_queue_type   queue = 0;
1503         char                         kernbuf[LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD];
1504         char                        *val_reg;
1505         char                        *val_hp;
1506         bool                         physical_reg;
1507         bool                         physical_hp;
1508         size_t                       count_copy;
1509         int                          rc = 0;
1510         int                          rc2 = 0;
1511
1512         if (count > (sizeof(kernbuf) - 1))
1513                 return -EINVAL;
1514
1515         if (copy_from_user(kernbuf, buffer, count))
1516                 return -EFAULT;
1517
1518         kernbuf[count] = '\0';
1519
1520         count_copy = count;
1521
1522         /**
1523          * Check if the regular offset type has been specified
1524          */
1525         val_reg = lprocfs_find_named_value(kernbuf,
1526                                            LPROCFS_NRS_OFF_NAME_REG,
1527                                            &count_copy);
1528         if (val_reg != kernbuf)
1529                 queue |= PTLRPC_NRS_QUEUE_REG;
1530
1531         count_copy = count;
1532
1533         /**
1534          * Check if the high priority offset type has been specified
1535          */
1536         val_hp = lprocfs_find_named_value(kernbuf, LPROCFS_NRS_OFF_NAME_HP,
1537                                           &count_copy);
1538         if (val_hp != kernbuf) {
1539                 if (!nrs_svc_has_hp(svc))
1540                         return -ENODEV;
1541
1542                 queue |= PTLRPC_NRS_QUEUE_HP;
1543         }
1544
1545         /**
1546          * If none of the queues has been specified, there may be a valid
1547          * command string at the start of the buffer.
1548          */
1549         if (queue == 0) {
1550                 queue = PTLRPC_NRS_QUEUE_REG;
1551
1552                 if (nrs_svc_has_hp(svc))
1553                         queue |= PTLRPC_NRS_QUEUE_HP;
1554         }
1555
1556         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1557                 if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1558                             sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1559                         physical_reg = true;
1560                 else if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_LOGICAL,
1561                          sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1562                         physical_reg = false;
1563                 else
1564                         return -EINVAL;
1565         }
1566
1567         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1568                 if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1569                             sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1570                         physical_hp = true;
1571                 else if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_LOGICAL,
1572                                  sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1573                         physical_hp = false;
1574                 else
1575                         return -EINVAL;
1576         }
1577
1578         /**
1579          * We change the values on regular and HP NRS heads separately, so that
1580          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1581          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1582          * started the policy on either the regular or HP NRS head; i.e. we are
1583          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1584          * only if the operation fails with -ENODEV on all heads that have been
1585          * specified by the command; if at least one operation succeeds,
1586          * success is returned.
1587          */
1588         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1589                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1590                                                orr_data->name,
1591                                                NRS_CTL_ORR_WR_OFF_TYPE, false,
1592                                                &physical_reg);
1593                 if ((rc < 0 && rc != -ENODEV) ||
1594                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1595                         return rc;
1596         }
1597
1598         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1599                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1600                                                 orr_data->name,
1601                                                 NRS_CTL_ORR_WR_OFF_TYPE, false,
1602                                                 &physical_hp);
1603                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1604                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1605                         return rc2;
1606         }
1607
1608         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1609 }
1610
1611 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_offset_type);
1612
1613 #define NRS_LPROCFS_REQ_SUPP_NAME_REG           "reg_supported:"
1614 #define NRS_LPROCFS_REQ_SUPP_NAME_HP            "hp_supported:"
1615
1616 #define LPROCFS_NRS_SUPP_NAME_READS             "reads"
1617 #define LPROCFS_NRS_SUPP_NAME_WRITES            "writes"
1618 #define LPROCFS_NRS_SUPP_NAME_READWRITES        "reads_and_writes"
1619
1620 /**
1621  * Translates enum nrs_orr_supp values to a corresponding string.
1622  */
1623 static const char *nrs_orr_supp2str(enum nrs_orr_supp supp)
1624 {
1625         switch(supp) {
1626         default:
1627                 LBUG();
1628         case NOS_OST_READ:
1629                 return LPROCFS_NRS_SUPP_NAME_READS;
1630         case NOS_OST_WRITE:
1631                 return LPROCFS_NRS_SUPP_NAME_WRITES;
1632         case NOS_OST_RW:
1633                 return LPROCFS_NRS_SUPP_NAME_READWRITES;
1634         }
1635 }
1636
1637 /**
1638  * Translates strings to the corresponding enum nrs_orr_supp value
1639  */
1640 static enum nrs_orr_supp nrs_orr_str2supp(const char *val)
1641 {
1642         if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READWRITES,
1643                     sizeof(LPROCFS_NRS_SUPP_NAME_READWRITES) - 1) == 0)
1644                 return NOS_OST_RW;
1645         else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READS,
1646                          sizeof(LPROCFS_NRS_SUPP_NAME_READS) - 1) == 0)
1647                 return NOS_OST_READ;
1648         else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_WRITES,
1649                          sizeof(LPROCFS_NRS_SUPP_NAME_WRITES) - 1) == 0)
1650                 return NOS_OST_WRITE;
1651         else
1652                 return -EINVAL;
1653 }
1654
1655 /**
1656  * Retrieves the type of RPCs handled at the point of invocation by ORR/TRR
1657  * policy instances on both the regular and high-priority NRS head of a service,
1658  * as long as a policy instance is not in the
1659  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1660  * state are skipped later by nrs_orr_ctl().
1661  *
1662  * Supported RPC type information is a (reads|writes|reads_and_writes) string,
1663  * and output is in YAML format.
1664  *
1665  * For example:
1666  *
1667  *      reg_supported:reads
1668  *      hp_supported:reads_and_writes
1669  */
1670 static int
1671 ptlrpc_lprocfs_nrs_orr_supported_seq_show(struct seq_file *m, void *data)
1672 {
1673         struct nrs_lprocfs_orr_data *orr_data = m->private;
1674         struct ptlrpc_service       *svc = orr_data->svc;
1675         enum nrs_orr_supp            supported;
1676         int                          rc;
1677
1678         /**
1679          * Perform two separate calls to this as only one of the NRS heads'
1680          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1681          * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1682          */
1683         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1684                                        orr_data->name,
1685                                        NRS_CTL_ORR_RD_SUPP_REQ, true,
1686                                        &supported);
1687
1688         if (rc == 0) {
1689                 seq_printf(m, NRS_LPROCFS_REQ_SUPP_NAME_REG"%s\n",
1690                            nrs_orr_supp2str(supported));
1691                 /**
1692                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1693                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1694                  */
1695         } else if (rc != -ENODEV) {
1696                 return rc;
1697         }
1698
1699         /**
1700          * We know the ost_io service which is the only one ORR/TRR policies are
1701          * compatible with, do have an HP NRS head, but it may be best to guard
1702          * against a possible change of this in the future.
1703          */
1704         if (!nrs_svc_has_hp(svc))
1705                 goto no_hp;
1706
1707         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1708                                        orr_data->name,
1709                                        NRS_CTL_ORR_RD_SUPP_REQ, true,
1710                                        &supported);
1711         if (rc == 0) {
1712                 seq_printf(m, NRS_LPROCFS_REQ_SUPP_NAME_HP"%s\n",
1713                            nrs_orr_supp2str(supported));
1714                 /**
1715                  * Ignore -ENODEV as the high priority NRS head's policy may be
1716                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1717                  */
1718         } else if (rc != -ENODEV) {
1719                 return rc;
1720         }
1721
1722 no_hp:
1723
1724         return rc;
1725 }
1726
1727 /**
1728  * Max valid command string is the size of the labels, plus "reads_and_writes"
1729  * twice, plus a separating ' '
1730  */
1731 #define LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD                                        \
1732         sizeof(NRS_LPROCFS_REQ_SUPP_NAME_REG LPROCFS_NRS_SUPP_NAME_READWRITES  \
1733                NRS_LPROCFS_REQ_SUPP_NAME_HP LPROCFS_NRS_SUPP_NAME_READWRITES   \
1734                " ")
1735
1736 /**
1737  * Sets the type of RPCs handled by ORR/TRR policy instances. The user can
1738  * modify this setting for the regular or high priority NRS heads separately, or
1739  * both together in a single invocation.
1740  *
1741  * For example:
1742  *
1743  * lctl set_param ost.OSS.ost_io.nrs_orr_supported=
1744  * "reg_supported:reads", to enable the ORR policy instance on the regular NRS
1745  * head of the ost_io service to handle OST_READ RPCs.
1746  *
1747  * lctl set_param ost.OSS.ost_io.nrs_trr_supported=reads_and_writes, to enable
1748  * the TRR policy instances on both the regular ang high priority NRS heads of
1749  * the ost_io service to use handle OST_READ and OST_WRITE RPCs.
1750  *
1751  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1752  * are skipped later by nrs_orr_ctl().
1753  */
1754 static ssize_t
1755 ptlrpc_lprocfs_nrs_orr_supported_seq_write(struct file *file,
1756                                            const char __user *buffer,
1757                                            size_t count,
1758                                            loff_t *off)
1759 {
1760         struct seq_file             *m = file->private_data;
1761         struct nrs_lprocfs_orr_data *orr_data = m->private;
1762         struct ptlrpc_service       *svc = orr_data->svc;
1763         enum ptlrpc_nrs_queue_type   queue = 0;
1764         char                         kernbuf[LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD];
1765         char                        *val_reg;
1766         char                        *val_hp;
1767         enum nrs_orr_supp            supp_reg;
1768         enum nrs_orr_supp            supp_hp;
1769         size_t                       count_copy;
1770         int                          rc = 0;
1771         int                          rc2 = 0;
1772
1773         if (count > (sizeof(kernbuf) - 1))
1774                 return -EINVAL;
1775
1776         if (copy_from_user(kernbuf, buffer, count))
1777                 return -EFAULT;
1778
1779         kernbuf[count] = '\0';
1780
1781         count_copy = count;
1782
1783         /**
1784          * Check if the regular supported requests setting has been specified
1785          */
1786         val_reg = lprocfs_find_named_value(kernbuf,
1787                                            NRS_LPROCFS_REQ_SUPP_NAME_REG,
1788                                            &count_copy);
1789         if (val_reg != kernbuf)
1790                 queue |= PTLRPC_NRS_QUEUE_REG;
1791
1792         count_copy = count;
1793
1794         /**
1795          * Check if the high priority supported requests setting has been
1796          * specified
1797          */
1798         val_hp = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_REQ_SUPP_NAME_HP,
1799                                           &count_copy);
1800         if (val_hp != kernbuf) {
1801                 if (!nrs_svc_has_hp(svc))
1802                         return -ENODEV;
1803
1804                 queue |= PTLRPC_NRS_QUEUE_HP;
1805         }
1806
1807         /**
1808          * If none of the queues has been specified, there may be a valid
1809          * command string at the start of the buffer.
1810          */
1811         if (queue == 0) {
1812                 queue = PTLRPC_NRS_QUEUE_REG;
1813
1814                 if (nrs_svc_has_hp(svc))
1815                         queue |= PTLRPC_NRS_QUEUE_HP;
1816         }
1817
1818         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1819                 supp_reg = nrs_orr_str2supp(val_reg);
1820                 if (supp_reg == -EINVAL)
1821                         return -EINVAL;
1822         }
1823
1824         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1825                 supp_hp = nrs_orr_str2supp(val_hp);
1826                 if (supp_hp == -EINVAL)
1827                         return -EINVAL;
1828         }
1829
1830         /**
1831          * We change the values on regular and HP NRS heads separately, so that
1832          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1833          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1834          * started the policy on either the regular or HP NRS head; i.e. we are
1835          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1836          * only if the operation fails with -ENODEV on all heads that have been
1837          * specified by the command; if at least one operation succeeds,
1838          * success is returned.
1839          */
1840         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1841                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1842                                                orr_data->name,
1843                                                NRS_CTL_ORR_WR_SUPP_REQ, false,
1844                                                &supp_reg);
1845                 if ((rc < 0 && rc != -ENODEV) ||
1846                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1847                         return rc;
1848         }
1849
1850         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1851                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1852                                                 orr_data->name,
1853                                                 NRS_CTL_ORR_WR_SUPP_REQ, false,
1854                                                 &supp_hp);
1855                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1856                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1857                         return rc2;
1858         }
1859
1860         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1861 }
1862
1863 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_supported);
1864
1865 static int nrs_orr_lprocfs_init(struct ptlrpc_service *svc)
1866 {
1867         int     i;
1868
1869         struct lprocfs_vars nrs_orr_lprocfs_vars[] = {
1870                 { .name         = "nrs_orr_quantum",
1871                   .fops         = &ptlrpc_lprocfs_nrs_orr_quantum_fops  },
1872                 { .name         = "nrs_orr_offset_type",
1873                   .fops         = &ptlrpc_lprocfs_nrs_orr_offset_type_fops },
1874                 { .name         = "nrs_orr_supported",
1875                   .fops         = &ptlrpc_lprocfs_nrs_orr_supported_fops },
1876                 { NULL }
1877         };
1878
1879         if (IS_ERR_OR_NULL(svc->srv_debugfs_entry))
1880                 return 0;
1881
1882         lprocfs_orr_data.svc = svc;
1883
1884         for (i = 0; i < ARRAY_SIZE(nrs_orr_lprocfs_vars); i++)
1885                 nrs_orr_lprocfs_vars[i].data = &lprocfs_orr_data;
1886
1887         return ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_orr_lprocfs_vars,
1888                                  NULL);
1889 }
1890
1891 static const struct ptlrpc_nrs_pol_ops nrs_orr_ops = {
1892         .op_policy_init         = nrs_orr_init,
1893         .op_policy_start        = nrs_orr_start,
1894         .op_policy_stop         = nrs_orr_stop,
1895         .op_policy_ctl          = nrs_orr_ctl,
1896         .op_res_get             = nrs_orr_res_get,
1897         .op_res_put             = nrs_orr_res_put,
1898         .op_req_get             = nrs_orr_req_get,
1899         .op_req_enqueue         = nrs_orr_req_add,
1900         .op_req_dequeue         = nrs_orr_req_del,
1901         .op_req_stop            = nrs_orr_req_stop,
1902         .op_lprocfs_init        = nrs_orr_lprocfs_init,
1903 };
1904
1905 struct ptlrpc_nrs_pol_conf nrs_conf_orr = {
1906         .nc_name                = NRS_POL_NAME_ORR,
1907         .nc_ops                 = &nrs_orr_ops,
1908         .nc_compat              = nrs_policy_compat_one,
1909         .nc_compat_svc_name     = "ost_io",
1910 };
1911
1912 /**
1913  * TRR, Target-based Round Robin policy
1914  *
1915  * TRR reuses much of the functions and data structures of ORR
1916  */
1917 static int nrs_trr_lprocfs_init(struct ptlrpc_service *svc)
1918 {
1919         int     i;
1920
1921         struct lprocfs_vars nrs_trr_lprocfs_vars[] = {
1922                 { .name         = "nrs_trr_quantum",
1923                   .fops         = &ptlrpc_lprocfs_nrs_orr_quantum_fops },
1924                 { .name         = "nrs_trr_offset_type",
1925                   .fops         = &ptlrpc_lprocfs_nrs_orr_offset_type_fops },
1926                 { .name         = "nrs_trr_supported",
1927                   .fops         = &ptlrpc_lprocfs_nrs_orr_supported_fops },
1928                 { NULL }
1929         };
1930
1931         if (IS_ERR_OR_NULL(svc->srv_debugfs_entry))
1932                 return 0;
1933
1934         lprocfs_trr_data.svc = svc;
1935
1936         for (i = 0; i < ARRAY_SIZE(nrs_trr_lprocfs_vars); i++)
1937                 nrs_trr_lprocfs_vars[i].data = &lprocfs_trr_data;
1938
1939         return ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_trr_lprocfs_vars,
1940                                  NULL);
1941 }
1942
1943 /**
1944  * Reuse much of the ORR functionality for TRR.
1945  */
1946 static const struct ptlrpc_nrs_pol_ops nrs_trr_ops = {
1947         .op_policy_init         = nrs_orr_init,
1948         .op_policy_start        = nrs_orr_start,
1949         .op_policy_stop         = nrs_orr_stop,
1950         .op_policy_ctl          = nrs_orr_ctl,
1951         .op_res_get             = nrs_orr_res_get,
1952         .op_res_put             = nrs_orr_res_put,
1953         .op_req_get             = nrs_orr_req_get,
1954         .op_req_enqueue         = nrs_orr_req_add,
1955         .op_req_dequeue         = nrs_orr_req_del,
1956         .op_req_stop            = nrs_orr_req_stop,
1957         .op_lprocfs_init        = nrs_trr_lprocfs_init,
1958 };
1959
1960 struct ptlrpc_nrs_pol_conf nrs_conf_trr = {
1961         .nc_name                = NRS_POL_NAME_TRR,
1962         .nc_ops                 = &nrs_trr_ops,
1963         .nc_compat              = nrs_policy_compat_one,
1964         .nc_compat_svc_name     = "ost_io",
1965 };
1966
1967 /** @} ORR/TRR policy */
1968
1969 /** @} nrs */
1970
1971 #endif /* HAVE_SERVER_SUPPORT */