Whamcloud - gitweb
LU-6201 llite: remove duplicate fiemap defines
[fs/lustre-release.git] / lustre / ptlrpc / nrs_orr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs_orr.c
29  *
30  * Network Request Scheduler (NRS) ORR and TRR policies
31  *
32  * Request scheduling in a Round-Robin manner over backend-fs objects and OSTs
33  * respectively
34  *
35  * Author: Liang Zhen <liang@whamcloud.com>
36  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
37  */
38 #ifdef HAVE_SERVER_SUPPORT
39
40 /**
41  * \addtogoup nrs
42  * @{
43  */
44 #define DEBUG_SUBSYSTEM S_RPC
45 #include <obd_support.h>
46 #include <obd_class.h>
47 #include <lustre_net.h>
48 #include <lustre/lustre_idl.h>
49 #include <lustre_req_layout.h>
50 #include "ptlrpc_internal.h"
51
52 /**
53  * \name ORR/TRR policy
54  *
55  * ORR/TRR (Object-based Round Robin/Target-based Round Robin) NRS policies
56  *
57  * ORR performs batched Round Robin shceduling of brw RPCs, based on the FID of
58  * the backend-fs object that the brw RPC pertains to; the TRR policy performs
59  * batched Round Robin scheduling of brw RPCs, based on the OST index that the
60  * RPC pertains to. Both policies also order RPCs in each batch in ascending
61  * offset order, which is lprocfs-tunable between logical file offsets, and
62  * physical disk offsets, as reported by fiemap.
63  *
64  * The TRR policy reuses much of the functionality of ORR. These two scheduling
65  * algorithms could alternatively be implemented under a single NRS policy, that
66  * uses an lprocfs tunable in order to switch between the two types of
67  * scheduling behaviour. The two algorithms have been implemented as separate
68  * policies for reasons of clarity to the user, and to avoid issues that would
69  * otherwise arise at the point of switching between behaviours in the case of
70  * having a single policy, such as resource cleanup for nrs_orr_object
71  * instances. It is possible that this may need to be re-examined in the future,
72  * along with potentially coalescing other policies that perform batched request
73  * scheduling in a Round-Robin manner, all into one policy.
74  *
75  * @{
76  */
77
78 #define NRS_POL_NAME_ORR        "orr"
79 #define NRS_POL_NAME_TRR        "trr"
80
81 /**
82  * Checks if the RPC type of \a nrq is currently handled by an ORR/TRR policy
83  *
84  * \param[in]  orrd   the ORR/TRR policy scheduler instance
85  * \param[in]  nrq    the request
86  * \param[out] opcode the opcode is saved here, just in order to avoid calling
87  *                    lustre_msg_get_opc() again later
88  *
89  * \retval true  request type is supported by the policy instance
90  * \retval false request type is not supported by the policy instance
91  */
92 static bool nrs_orr_req_supported(struct nrs_orr_data *orrd,
93                                   struct ptlrpc_nrs_request *nrq, __u32 *opcode)
94 {
95         struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
96                                                    rq_nrq);
97         __u32                   opc = lustre_msg_get_opc(req->rq_reqmsg);
98         bool                    rc = false;
99
100         /**
101          * XXX: nrs_orr_data::od_supp accessed unlocked.
102          */
103         switch (opc) {
104         case OST_READ:
105                 rc = orrd->od_supp & NOS_OST_READ;
106                 break;
107         case OST_WRITE:
108                 rc = orrd->od_supp & NOS_OST_WRITE;
109                 break;
110         }
111
112         if (rc)
113                 *opcode = opc;
114
115         return rc;
116 }
117
118 /**
119  * Returns the ORR/TRR key fields for the request \a nrq in \a key.
120  *
121  * \param[in]  orrd the ORR/TRR policy scheduler instance
122  * \param[in]  nrq  the request
123  * \param[in]  opc  the request's opcode
124  * \param[in]  name the policy name
125  * \param[out] key  fields of the key are returned here.
126  *
127  * \retval 0   key filled successfully
128  * \retval < 0 error
129  */
130 static int nrs_orr_key_fill(struct nrs_orr_data *orrd,
131                             struct ptlrpc_nrs_request *nrq, __u32 opc,
132                             char *name, struct nrs_orr_key *key)
133 {
134         struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
135                                                    rq_nrq);
136         struct ost_body        *body;
137         __u32                   ost_idx;
138         bool                    is_orr = strncmp(name, NRS_POL_NAME_ORR,
139                                                  NRS_POL_NAME_MAX) == 0;
140
141         LASSERT(req != NULL);
142
143         /**
144          * This is an attempt to fill in the request key fields while
145          * moving a request from the regular to the high-priority NRS
146          * head (via ldlm_lock_reorder_req()), but the request key has
147          * been adequately filled when nrs_orr_res_get() was called through
148          * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR/TRR
149          * policy, so there is nothing to do.
150          */
151         if ((is_orr && nrq->nr_u.orr.or_orr_set) ||
152             (!is_orr && nrq->nr_u.orr.or_trr_set)) {
153                 *key = nrq->nr_u.orr.or_key;
154                 return 0;
155         }
156
157         /* Bounce unconnected requests to the default policy. */
158         if (req->rq_export == NULL)
159                 return -ENOTCONN;
160
161         if (nrq->nr_u.orr.or_orr_set || nrq->nr_u.orr.or_trr_set)
162                 memset(&nrq->nr_u.orr.or_key, 0, sizeof(nrq->nr_u.orr.or_key));
163
164         ost_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
165
166         if (is_orr) {
167                 int     rc;
168                 /**
169                  * The request pill for OST_READ and OST_WRITE requests is
170                  * initialized in the ost_io service's
171                  * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(),
172                  * so no need to redo it here.
173                  */
174                 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
175                 if (body == NULL)
176                         RETURN(-EFAULT);
177
178                 rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx);
179                 if (rc < 0)
180                         return rc;
181
182                 nrq->nr_u.orr.or_orr_set = 1;
183         } else {
184                 key->ok_idx = ost_idx;
185                 nrq->nr_u.orr.or_trr_set = 1;
186         }
187
188         return 0;
189 }
190
191 /**
192  * Populates the range values in \a range with logical offsets obtained via
193  * \a nb.
194  *
195  * \param[in]  nb       niobuf_remote struct array for this request
196  * \param[in]  niocount count of niobuf_remote structs for this request
197  * \param[out] range    the offset range is returned here
198  */
199 static void nrs_orr_range_fill_logical(struct niobuf_remote *nb, int niocount,
200                                        struct nrs_orr_req_range *range)
201 {
202         /* Should we do this at page boundaries ? */
203         range->or_start = nb[0].rnb_offset & PAGE_MASK;
204         range->or_end = (nb[niocount - 1].rnb_offset +
205                          nb[niocount - 1].rnb_len - 1) | ~PAGE_MASK;
206 }
207
208 /**
209  * We obtain information just for a single extent, as the request can only be in
210  * a single place in the binary heap anyway.
211  */
212 #define ORR_NUM_EXTENTS 1
213
214 /**
215  * Converts the logical file offset range in \a range, to a physical disk offset
216  * range in \a range, for a request. Uses obd_get_info() in order to carry out a
217  * fiemap call and obtain backend-fs extent information. The returned range is
218  * in physical block numbers.
219  *
220  * \param[in]     nrq   the request
221  * \param[in]     oa    obdo struct for this request
222  * \param[in,out] range the offset range in bytes; logical range in, physical
223  *                      range out
224  *
225  * \retval 0    physical offsets obtained successfully
226  * \retvall < 0 error
227  */
228 static int nrs_orr_range_fill_physical(struct ptlrpc_nrs_request *nrq,
229                                        struct obdo *oa,
230                                        struct nrs_orr_req_range *range)
231 {
232         struct ptlrpc_request     *req = container_of(nrq,
233                                                       struct ptlrpc_request,
234                                                       rq_nrq);
235         char                       fiemap_buf[offsetof(struct fiemap,
236                                                   fm_extents[ORR_NUM_EXTENTS])];
237         struct fiemap              *fiemap = (struct fiemap *)fiemap_buf;
238         struct ll_fiemap_info_key  key;
239         loff_t                     start;
240         loff_t                     end;
241         int                        rc;
242
243         key = (typeof(key)) {
244                 .lfik_name = KEY_FIEMAP,
245                 .lfik_oa = *oa,
246                 .lfik_fiemap = {
247                         .fm_start = range->or_start,
248                         .fm_length = range->or_end - range->or_start,
249                         .fm_extent_count = ORR_NUM_EXTENTS
250                 }
251         };
252
253         rc = obd_get_info(req->rq_svc_thread->t_env, req->rq_export,
254                           sizeof(key), &key, NULL, fiemap);
255         if (rc < 0)
256                 GOTO(out, rc);
257
258         if (fiemap->fm_mapped_extents == 0 ||
259             fiemap->fm_mapped_extents > ORR_NUM_EXTENTS)
260                 GOTO(out, rc = -EFAULT);
261
262         /**
263          * Calculate the physical offset ranges for the request from the extent
264          * information and the logical request offsets.
265          */
266         start = fiemap->fm_extents[0].fe_physical + range->or_start -
267                 fiemap->fm_extents[0].fe_logical;
268         end = start + range->or_end - range->or_start;
269
270         range->or_start = start;
271         range->or_end = end;
272
273         nrq->nr_u.orr.or_physical_set = 1;
274 out:
275         return rc;
276 }
277
278 /**
279  * Sets the offset range the request covers; either in logical file
280  * offsets or in physical disk offsets.
281  *
282  * \param[in] nrq        the request
283  * \param[in] orrd       the ORR/TRR policy scheduler instance
284  * \param[in] opc        the request's opcode
285  * \param[in] moving_req is the request in the process of moving onto the
286  *                       high-priority NRS head?
287  *
288  * \retval 0    range filled successfully
289  * \retval != 0 error
290  */
291 static int nrs_orr_range_fill(struct ptlrpc_nrs_request *nrq,
292                               struct nrs_orr_data *orrd, __u32 opc,
293                               bool moving_req)
294 {
295         struct ptlrpc_request       *req = container_of(nrq,
296                                                         struct ptlrpc_request,
297                                                         rq_nrq);
298         struct obd_ioobj            *ioo;
299         struct niobuf_remote        *nb;
300         struct ost_body             *body;
301         struct nrs_orr_req_range     range;
302         int                          niocount;
303         int                          rc = 0;
304
305         /**
306          * If we are scheduling using physical disk offsets, but we have filled
307          * the offset information in the request previously
308          * (i.e. ldlm_lock_reorder_req() is moving the request to the
309          * high-priority NRS head), there is no need to do anything, and we can
310          * exit. Moreover than the lack of need, we would be unable to perform
311          * the obd_get_info() call required in nrs_orr_range_fill_physical(),
312          * because ldlm_lock_reorder_lock() calls into here while holding a
313          * spinlock, and retrieving fiemap information via obd_get_info() is a
314          * potentially sleeping operation.
315          */
316         if (orrd->od_physical && nrq->nr_u.orr.or_physical_set)
317                 return 0;
318
319         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
320         if (ioo == NULL)
321                 GOTO(out, rc = -EFAULT);
322
323         niocount = ioo->ioo_bufcnt;
324
325         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
326         if (nb == NULL)
327                 GOTO(out, rc = -EFAULT);
328
329         /**
330          * Use logical information from niobuf_remote structures.
331          */
332         nrs_orr_range_fill_logical(nb, niocount, &range);
333
334         /**
335          * Obtain physical offsets if selected, and this is an OST_READ RPC
336          * RPC. We do not enter this block if moving_req is set which indicates
337          * that the request is being moved to the high-priority NRS head by
338          * ldlm_lock_reorder_req(), as that function calls in here while holding
339          * a spinlock, and nrs_orr_range_physical() can sleep, so we just use
340          * logical file offsets for the range values for such requests.
341          */
342         if (orrd->od_physical && opc == OST_READ && !moving_req) {
343                 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
344                 if (body == NULL)
345                         GOTO(out, rc = -EFAULT);
346
347                 /**
348                  * Translate to physical block offsets from backend filesystem
349                  * extents.
350                  * Ignore return values; if obtaining the physical offsets
351                  * fails, use the logical offsets.
352                  */
353                 nrs_orr_range_fill_physical(nrq, &body->oa, &range);
354         }
355
356         nrq->nr_u.orr.or_range = range;
357 out:
358         return rc;
359 }
360
361 /**
362  * Generates a character string that can be used in order to register uniquely
363  * named libcfs_hash and slab objects for ORR/TRR policy instances. The
364  * character string is unique per policy instance, as it includes the policy's
365  * name, the CPT number, and a {reg|hp} token, and there is one policy instance
366  * per NRS head on each CPT, and the policy is only compatible with the ost_io
367  * service.
368  *
369  * \param[in] policy the policy instance
370  * \param[out] name  the character array that will hold the generated name
371  */
372 static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name)
373 {
374         snprintf(name, NRS_ORR_OBJ_NAME_MAX, "%s%s%s%d",
375                  "nrs_", policy->pol_desc->pd_name,
376                  policy->pol_nrs->nrs_queue_type == PTLRPC_NRS_QUEUE_REG ?
377                  "_reg_" : "_hp_", nrs_pol2cptid(policy));
378 }
379
380 /**
381  * ORR/TRR hash operations
382  */
383 #define NRS_ORR_BITS            24
384 #define NRS_ORR_BKT_BITS        12
385 #define NRS_ORR_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | CFS_HASH_ASSERT_EMPTY)
386
387 #define NRS_TRR_BITS            4
388 #define NRS_TRR_BKT_BITS        2
389 #define NRS_TRR_HASH_FLAGS      CFS_HASH_SPIN_BKTLOCK
390
391 static unsigned nrs_orr_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask)
392 {
393         return cfs_hash_djb2_hash(key, sizeof(struct nrs_orr_key), mask);
394 }
395
396 static void *nrs_orr_hop_key(struct hlist_node *hnode)
397 {
398         struct nrs_orr_object *orro = hlist_entry(hnode,
399                                                       struct nrs_orr_object,
400                                                       oo_hnode);
401         return &orro->oo_key;
402 }
403
404 static int nrs_orr_hop_keycmp(const void *key, struct hlist_node *hnode)
405 {
406         struct nrs_orr_object *orro = hlist_entry(hnode,
407                                                       struct nrs_orr_object,
408                                                       oo_hnode);
409
410         return lu_fid_eq(&orro->oo_key.ok_fid,
411                          &((struct nrs_orr_key *)key)->ok_fid);
412 }
413
414 static void *nrs_orr_hop_object(struct hlist_node *hnode)
415 {
416         return hlist_entry(hnode, struct nrs_orr_object, oo_hnode);
417 }
418
419 static void nrs_orr_hop_get(cfs_hash_t *hs, struct hlist_node *hnode)
420 {
421         struct nrs_orr_object *orro = hlist_entry(hnode,
422                                                       struct nrs_orr_object,
423                                                       oo_hnode);
424         orro->oo_ref++;
425 }
426
427 /**
428  * Removes an nrs_orr_object the hash and frees its memory, if the object has
429  * no active users.
430  */
431 static void nrs_orr_hop_put_free(cfs_hash_t *hs, struct hlist_node *hnode)
432 {
433         struct nrs_orr_object *orro = hlist_entry(hnode,
434                                                       struct nrs_orr_object,
435                                                       oo_hnode);
436         struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
437                                                    struct nrs_orr_data, od_res);
438         cfs_hash_bd_t          bd;
439
440         cfs_hash_bd_get_and_lock(hs, &orro->oo_key, &bd, 1);
441
442         if (--orro->oo_ref > 1) {
443                 cfs_hash_bd_unlock(hs, &bd, 1);
444
445                 return;
446         }
447         LASSERT(orro->oo_ref == 1);
448
449         cfs_hash_bd_del_locked(hs, &bd, hnode);
450         cfs_hash_bd_unlock(hs, &bd, 1);
451
452         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
453 }
454
455 static void nrs_orr_hop_put(cfs_hash_t *hs, struct hlist_node *hnode)
456 {
457         struct nrs_orr_object *orro = hlist_entry(hnode,
458                                                       struct nrs_orr_object,
459                                                       oo_hnode);
460         orro->oo_ref--;
461 }
462
463 static int nrs_trr_hop_keycmp(const void *key, struct hlist_node *hnode)
464 {
465         struct nrs_orr_object *orro = hlist_entry(hnode,
466                                                       struct nrs_orr_object,
467                                                       oo_hnode);
468
469         return orro->oo_key.ok_idx == ((struct nrs_orr_key *)key)->ok_idx;
470 }
471
472 static void nrs_trr_hop_exit(cfs_hash_t *hs, struct hlist_node *hnode)
473 {
474         struct nrs_orr_object *orro = hlist_entry(hnode,
475                                                       struct nrs_orr_object,
476                                                       oo_hnode);
477         struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
478                                                    struct nrs_orr_data, od_res);
479
480         LASSERTF(orro->oo_ref == 0,
481                  "Busy NRS TRR policy object for OST with index %u, with %ld "
482                  "refs\n", orro->oo_key.ok_idx, orro->oo_ref);
483
484         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
485 }
486
487 static cfs_hash_ops_t nrs_orr_hash_ops = {
488         .hs_hash        = nrs_orr_hop_hash,
489         .hs_key         = nrs_orr_hop_key,
490         .hs_keycmp      = nrs_orr_hop_keycmp,
491         .hs_object      = nrs_orr_hop_object,
492         .hs_get         = nrs_orr_hop_get,
493         .hs_put         = nrs_orr_hop_put_free,
494         .hs_put_locked  = nrs_orr_hop_put,
495 };
496
497 static cfs_hash_ops_t nrs_trr_hash_ops = {
498         .hs_hash        = nrs_orr_hop_hash,
499         .hs_key         = nrs_orr_hop_key,
500         .hs_keycmp      = nrs_trr_hop_keycmp,
501         .hs_object      = nrs_orr_hop_object,
502         .hs_get         = nrs_orr_hop_get,
503         .hs_put         = nrs_orr_hop_put,
504         .hs_put_locked  = nrs_orr_hop_put,
505         .hs_exit        = nrs_trr_hop_exit,
506 };
507
508 #define NRS_ORR_QUANTUM_DFLT    256
509
510 /**
511  * Binary heap predicate.
512  *
513  * Uses
514  * ptlrpc_nrs_request::nr_u::orr::or_round,
515  * ptlrpc_nrs_request::nr_u::orr::or_sequence, and
516  * ptlrpc_nrs_request::nr_u::orr::or_range to compare two binheap nodes and
517  * produce a binary predicate that indicates their relative priority, so that
518  * the binary heap can perform the necessary sorting operations.
519  *
520  * \param[in] e1 the first binheap node to compare
521  * \param[in] e2 the second binheap node to compare
522  *
523  * \retval 0 e1 > e2
524  * \retval 1 e1 < e2
525  */
526 static int orr_req_compare(cfs_binheap_node_t *e1, cfs_binheap_node_t *e2)
527 {
528         struct ptlrpc_nrs_request *nrq1;
529         struct ptlrpc_nrs_request *nrq2;
530
531         nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
532         nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
533
534         /**
535          * Requests have been scheduled against a different scheduling round.
536          */
537         if (nrq1->nr_u.orr.or_round < nrq2->nr_u.orr.or_round)
538                 return 1;
539         else if (nrq1->nr_u.orr.or_round > nrq2->nr_u.orr.or_round)
540                 return 0;
541
542         /**
543          * Requests have been scheduled against the same scheduling round, but
544          * belong to a different batch, i.e. they pertain to a different
545          * backend-fs object (for ORR policy instances) or OST (for TRR policy
546          * instances).
547          */
548         if (nrq1->nr_u.orr.or_sequence < nrq2->nr_u.orr.or_sequence)
549                 return 1;
550         else if (nrq1->nr_u.orr.or_sequence > nrq2->nr_u.orr.or_sequence)
551                 return 0;
552
553         /**
554          * If round numbers and sequence numbers are equal, the two requests
555          * have been scheduled on the same round, and belong to the same batch,
556          * which means they pertain to the same backend-fs object (if this is an
557          * ORR policy instance), or to the same OST (if this is a TRR policy
558          * instance), so these requests should be sorted by ascending offset
559          * order.
560          */
561         if (nrq1->nr_u.orr.or_range.or_start <
562             nrq2->nr_u.orr.or_range.or_start) {
563                 return 1;
564         } else if (nrq1->nr_u.orr.or_range.or_start >
565                  nrq2->nr_u.orr.or_range.or_start) {
566                 return 0;
567         } else {
568                 /**
569                  * Requests start from the same offset; Dispatch the shorter one
570                  * first; perhaps slightly more chances of hitting caches like
571                  * this.
572                  */
573                 return nrq1->nr_u.orr.or_range.or_end <
574                        nrq2->nr_u.orr.or_range.or_end;
575         }
576 }
577
578 /**
579  * ORR binary heap operations
580  */
581 static cfs_binheap_ops_t nrs_orr_heap_ops = {
582         .hop_enter      = NULL,
583         .hop_exit       = NULL,
584         .hop_compare    = orr_req_compare,
585 };
586
587 /**
588  * Prints a warning message if an ORR/TRR policy is started on a service with
589  * more than one CPT.  Not printed on the console for now, since we don't
590  * have any performance metrics in the first place, and it is annoying.
591  *
592  * \param[in] policy the policy instance
593  *
594  * \retval 0 success
595  */
596 static int nrs_orr_init(struct ptlrpc_nrs_policy *policy)
597 {
598         if (policy->pol_nrs->nrs_svcpt->scp_service->srv_ncpts > 1)
599                 CDEBUG(D_CONFIG, "%s: The %s NRS policy was registered on a "
600                       "service with multiple service partitions. This policy "
601                       "may perform better with a single partition.\n",
602                       policy->pol_nrs->nrs_svcpt->scp_service->srv_name,
603                       policy->pol_desc->pd_name);
604
605         return 0;
606 }
607
608 /**
609  * Called when an ORR policy instance is started.
610  *
611  * \param[in] policy the policy
612  *
613  * \retval -ENOMEM OOM error
614  * \retval 0       success
615  */
616 static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg)
617 {
618         struct nrs_orr_data    *orrd;
619         cfs_hash_ops_t         *ops;
620         unsigned                cur_bits;
621         unsigned                max_bits;
622         unsigned                bkt_bits;
623         unsigned                flags;
624         int                     rc = 0;
625         ENTRY;
626
627         OBD_CPT_ALLOC_PTR(orrd, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
628         if (orrd == NULL)
629                 RETURN(-ENOMEM);
630
631         /*
632          * Binary heap instance for sorted incoming requests.
633          */
634         orrd->od_binheap = cfs_binheap_create(&nrs_orr_heap_ops,
635                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
636                                               nrs_pol2cptab(policy),
637                                               nrs_pol2cptid(policy));
638         if (orrd->od_binheap == NULL)
639                 GOTO(failed, rc = -ENOMEM);
640
641         nrs_orr_genobjname(policy, orrd->od_objname);
642
643         /**
644          * Slab cache for NRS ORR/TRR objects.
645          */
646         orrd->od_cache = kmem_cache_create(orrd->od_objname,
647                                            sizeof(struct nrs_orr_object),
648                                            0, 0, NULL);
649         if (orrd->od_cache == NULL)
650                 GOTO(failed, rc = -ENOMEM);
651
652         if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
653                     NRS_POL_NAME_MAX) == 0) {
654                 ops = &nrs_orr_hash_ops;
655                 cur_bits = NRS_ORR_BITS;
656                 max_bits = NRS_ORR_BITS;
657                 bkt_bits = NRS_ORR_BKT_BITS;
658                 flags = NRS_ORR_HASH_FLAGS;
659         } else {
660                 ops = &nrs_trr_hash_ops;
661                 cur_bits = NRS_TRR_BITS;
662                 max_bits = NRS_TRR_BITS;
663                 bkt_bits = NRS_TRR_BKT_BITS;
664                 flags = NRS_TRR_HASH_FLAGS;
665         }
666
667         /**
668          * Hash for finding objects by struct nrs_orr_key.
669          * XXX: For TRR, it might be better to avoid using libcfs_hash?
670          * All that needs to be resolved are OST indices, and they
671          * will stay relatively stable during an OSS node's lifetime.
672          */
673         orrd->od_obj_hash = cfs_hash_create(orrd->od_objname, cur_bits,
674                                             max_bits, bkt_bits, 0,
675                                             CFS_HASH_MIN_THETA,
676                                             CFS_HASH_MAX_THETA, ops, flags);
677         if (orrd->od_obj_hash == NULL)
678                 GOTO(failed, rc = -ENOMEM);
679
680         /* XXX: Fields accessed unlocked */
681         orrd->od_quantum = NRS_ORR_QUANTUM_DFLT;
682         orrd->od_supp = NOS_DFLT;
683         orrd->od_physical = true;
684         /**
685          * Set to 1 so that the test inside nrs_orr_req_add() can evaluate to
686          * true.
687          */
688         orrd->od_sequence = 1;
689
690         policy->pol_private = orrd;
691
692         RETURN(rc);
693
694 failed:
695         if (orrd->od_cache) {
696                 kmem_cache_destroy(orrd->od_cache);
697                 LASSERTF(rc == 0, "Could not destroy od_cache slab\n");
698         }
699         if (orrd->od_binheap != NULL)
700                 cfs_binheap_destroy(orrd->od_binheap);
701
702         OBD_FREE_PTR(orrd);
703
704         RETURN(rc);
705 }
706
707 /**
708  * Called when an ORR/TRR policy instance is stopped.
709  *
710  * Called when the policy has been instructed to transition to the
711  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more
712  * pending requests to serve.
713  *
714  * \param[in] policy the policy
715  */
716 static void nrs_orr_stop(struct ptlrpc_nrs_policy *policy)
717 {
718         struct nrs_orr_data *orrd = policy->pol_private;
719         ENTRY;
720
721         LASSERT(orrd != NULL);
722         LASSERT(orrd->od_binheap != NULL);
723         LASSERT(orrd->od_obj_hash != NULL);
724         LASSERT(orrd->od_cache != NULL);
725         LASSERT(cfs_binheap_is_empty(orrd->od_binheap));
726
727         cfs_binheap_destroy(orrd->od_binheap);
728         cfs_hash_putref(orrd->od_obj_hash);
729         kmem_cache_destroy(orrd->od_cache);
730
731         OBD_FREE_PTR(orrd);
732 }
733
734 /**
735  * Performs a policy-specific ctl function on ORR/TRR policy instances; similar
736  * to ioctl.
737  *
738  * \param[in]     policy the policy instance
739  * \param[in]     opc    the opcode
740  * \param[in,out] arg    used for passing parameters and information
741  *
742  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
743  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
744  *
745  * \retval 0   operation carried successfully
746  * \retval -ve error
747  */
748 static int nrs_orr_ctl(struct ptlrpc_nrs_policy *policy,
749                        enum ptlrpc_nrs_ctl opc, void *arg)
750 {
751         assert_spin_locked(&policy->pol_nrs->nrs_lock);
752
753         switch((enum nrs_ctl_orr)opc) {
754         default:
755                 RETURN(-EINVAL);
756
757         case NRS_CTL_ORR_RD_QUANTUM: {
758                 struct nrs_orr_data     *orrd = policy->pol_private;
759
760                 *(__u16 *)arg = orrd->od_quantum;
761                 }
762                 break;
763
764         case NRS_CTL_ORR_WR_QUANTUM: {
765                 struct nrs_orr_data     *orrd = policy->pol_private;
766
767                 orrd->od_quantum = *(__u16 *)arg;
768                 LASSERT(orrd->od_quantum != 0);
769                 }
770                 break;
771
772         case NRS_CTL_ORR_RD_OFF_TYPE: {
773                 struct nrs_orr_data     *orrd = policy->pol_private;
774
775                 *(bool *)arg = orrd->od_physical;
776                 }
777                 break;
778
779         case NRS_CTL_ORR_WR_OFF_TYPE: {
780                 struct nrs_orr_data     *orrd = policy->pol_private;
781
782                 orrd->od_physical = *(bool *)arg;
783                 }
784                 break;
785
786         case NRS_CTL_ORR_RD_SUPP_REQ: {
787                 struct nrs_orr_data     *orrd = policy->pol_private;
788
789                 *(enum nrs_orr_supp *)arg = orrd->od_supp;
790                 }
791                 break;
792
793         case NRS_CTL_ORR_WR_SUPP_REQ: {
794                 struct nrs_orr_data     *orrd = policy->pol_private;
795
796                 orrd->od_supp = *(enum nrs_orr_supp *)arg;
797                 LASSERT((orrd->od_supp & NOS_OST_RW) != 0);
798                 }
799                 break;
800         }
801         RETURN(0);
802 }
803
804 /**
805  * Obtains resources for ORR/TRR policy instances. The top-level resource lives
806  * inside \e nrs_orr_data and the second-level resource inside
807  * \e nrs_orr_object instances.
808  *
809  * \param[in]  policy     the policy for which resources are being taken for
810  *                        request \a nrq
811  * \param[in]  nrq        the request for which resources are being taken
812  * \param[in]  parent     parent resource, embedded in nrs_orr_data for the
813  *                        ORR/TRR policies
814  * \param[out] resp       used to return resource references
815  * \param[in]  moving_req signifies limited caller context; used to perform
816  *                        memory allocations in an atomic context in this
817  *                        policy
818  *
819  * \retval 0   we are returning a top-level, parent resource, one that is
820  *             embedded in an nrs_orr_data object
821  * \retval 1   we are returning a bottom-level resource, one that is embedded
822  *             in an nrs_orr_object object
823  *
824  * \see nrs_resource_get_safe()
825  */
826 static int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
827                            struct ptlrpc_nrs_request *nrq,
828                            const struct ptlrpc_nrs_resource *parent,
829                            struct ptlrpc_nrs_resource **resp, bool moving_req)
830 {
831         struct nrs_orr_data            *orrd;
832         struct nrs_orr_object          *orro;
833         struct nrs_orr_object          *tmp;
834         struct nrs_orr_key              key = { { { 0 } } };
835         __u32                           opc;
836         int                             rc = 0;
837
838         /**
839          * struct nrs_orr_data is requested.
840          */
841         if (parent == NULL) {
842                 *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res;
843                 return 0;
844         }
845
846         orrd = container_of(parent, struct nrs_orr_data, od_res);
847
848         /**
849          * If the request type is not supported, fail the enqueuing; the RPC
850          * will be handled by the fallback NRS policy.
851          */
852         if (!nrs_orr_req_supported(orrd, nrq, &opc))
853                 return -1;
854
855         /**
856          * Fill in the key for the request; OST FID for ORR policy instances,
857          * and OST index for TRR policy instances.
858          */
859         rc = nrs_orr_key_fill(orrd, nrq, opc, policy->pol_desc->pd_name, &key);
860         if (rc < 0)
861                 RETURN(rc);
862
863         /**
864          * Set the offset range the request covers
865          */
866         rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req);
867         if (rc < 0)
868                 RETURN(rc);
869
870         orro = cfs_hash_lookup(orrd->od_obj_hash, &key);
871         if (orro != NULL)
872                 goto out;
873
874         OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache,
875                                    nrs_pol2cptab(policy), nrs_pol2cptid(policy),
876                                    moving_req ? GFP_ATOMIC : GFP_NOFS);
877         if (orro == NULL)
878                 RETURN(-ENOMEM);
879
880         orro->oo_key = key;
881         orro->oo_ref = 1;
882
883         tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key,
884                                       &orro->oo_hnode);
885         if (tmp != orro) {
886                 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
887                 orro = tmp;
888         }
889 out:
890         /**
891          * For debugging purposes
892          */
893         nrq->nr_u.orr.or_key = orro->oo_key;
894
895         *resp = &orro->oo_res;
896
897         return 1;
898 }
899
900 /**
901  * Called when releasing references to the resource hierachy obtained for a
902  * request for scheduling using ORR/TRR policy instances
903  *
904  * \param[in] policy   the policy the resource belongs to
905  * \param[in] res      the resource to be released
906  */
907 static void nrs_orr_res_put(struct ptlrpc_nrs_policy *policy,
908                             const struct ptlrpc_nrs_resource *res)
909 {
910         struct nrs_orr_data     *orrd;
911         struct nrs_orr_object   *orro;
912
913         /**
914          * Do nothing for freeing parent, nrs_orr_data resources.
915          */
916         if (res->res_parent == NULL)
917                 return;
918
919         orro = container_of(res, struct nrs_orr_object, oo_res);
920         orrd = container_of(res->res_parent, struct nrs_orr_data, od_res);
921
922         cfs_hash_put(orrd->od_obj_hash, &orro->oo_hnode);
923 }
924
925 /**
926  * Called when polling an ORR/TRR policy instance for a request so that it can
927  * be served. Returns the request that is at the root of the binary heap, as
928  * that is the lowest priority one (i.e. libcfs_heap is an implementation of a
929  * min-heap)
930  *
931  * \param[in] policy the policy instance being polled
932  * \param[in] peek   when set, signifies that we just want to examine the
933  *                   request, and not handle it, so the request is not removed
934  *                   from the policy.
935  * \param[in] force  force the policy to return a request; unused in this policy
936  *
937  * \retval the request to be handled
938  * \retval NULL no request available
939  *
940  * \see ptlrpc_nrs_req_get_nolock()
941  * \see nrs_request_get()
942  */
943 static
944 struct ptlrpc_nrs_request *nrs_orr_req_get(struct ptlrpc_nrs_policy *policy,
945                                            bool peek, bool force)
946 {
947         struct nrs_orr_data       *orrd = policy->pol_private;
948         cfs_binheap_node_t        *node = cfs_binheap_root(orrd->od_binheap);
949         struct ptlrpc_nrs_request *nrq;
950
951         nrq = unlikely(node == NULL) ? NULL :
952               container_of(node, struct ptlrpc_nrs_request, nr_node);
953
954         if (likely(!peek && nrq != NULL)) {
955                 struct nrs_orr_object *orro;
956
957                 orro = container_of(nrs_request_resource(nrq),
958                                     struct nrs_orr_object, oo_res);
959
960                 LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
961
962                 cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
963                 orro->oo_active--;
964
965                 if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
966                                  NRS_POL_NAME_MAX) == 0)
967                         CDEBUG(D_RPCTRACE,
968                                "NRS: starting to handle %s request for object "
969                                "with FID "DFID", from OST with index %u, with "
970                                "round "LPU64"\n", NRS_POL_NAME_ORR,
971                                PFID(&orro->oo_key.ok_fid),
972                                nrq->nr_u.orr.or_key.ok_idx,
973                                nrq->nr_u.orr.or_round);
974                 else
975                         CDEBUG(D_RPCTRACE,
976                                "NRS: starting to handle %s request from OST "
977                                "with index %u, with round "LPU64"\n",
978                                NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
979                                nrq->nr_u.orr.or_round);
980
981                 /** Peek at the next request to be served */
982                 node = cfs_binheap_root(orrd->od_binheap);
983
984                 /** No more requests */
985                 if (unlikely(node == NULL)) {
986                         orrd->od_round++;
987                 } else {
988                         struct ptlrpc_nrs_request *next;
989
990                         next = container_of(node, struct ptlrpc_nrs_request,
991                                             nr_node);
992
993                         if (orrd->od_round < next->nr_u.orr.or_round)
994                                 orrd->od_round = next->nr_u.orr.or_round;
995                 }
996         }
997
998         return nrq;
999 }
1000
1001 /**
1002  * Sort-adds request \a nrq to an ORR/TRR \a policy instance's set of queued
1003  * requests in the policy's binary heap.
1004  *
1005  * A scheduling round is a stream of requests that have been sorted in batches
1006  * according to the backend-fs object (for ORR policy instances) or OST (for TRR
1007  * policy instances) that they pertain to (as identified by its IDIF FID or OST
1008  * index respectively); there can be only one batch for each object or OST in
1009  * each round. The batches are of maximum size nrs_orr_data:od_quantum. When a
1010  * new request arrives for scheduling for an object or OST that has exhausted
1011  * its quantum in its current round, the request will be scheduled on the next
1012  * scheduling round. Requests are allowed to be scheduled against a round until
1013  * all requests for the round are serviced, so an object or OST might miss a
1014  * round if requests are not scheduled for it for a long enough period of time.
1015  * Objects or OSTs that miss a round will continue with having their next
1016  * request scheduled, starting at the round that requests are being dispatched
1017  * for, at the time of arrival of this request.
1018  *
1019  * Requests are tagged with the round number and a sequence number; the sequence
1020  * number indicates the relative ordering amongst the batches of requests in a
1021  * round, and is identical for all requests in a batch, as is the round number.
1022  * The round and sequence numbers are used by orr_req_compare() in order to use
1023  * nrs_orr_data::od_binheap in order to maintain an ordered set of rounds, with
1024  * each round consisting of an ordered set of batches of requests, and each
1025  * batch consisting of an ordered set of requests according to their logical
1026  * file or physical disk offsets.
1027  *
1028  * \param[in] policy the policy
1029  * \param[in] nrq    the request to add
1030  *
1031  * \retval 0    request successfully added
1032  * \retval != 0 error
1033  */
1034 static int nrs_orr_req_add(struct ptlrpc_nrs_policy *policy,
1035                            struct ptlrpc_nrs_request *nrq)
1036 {
1037         struct nrs_orr_data     *orrd;
1038         struct nrs_orr_object   *orro;
1039         int                      rc;
1040
1041         orro = container_of(nrs_request_resource(nrq),
1042                             struct nrs_orr_object, oo_res);
1043         orrd = container_of(nrs_request_resource(nrq)->res_parent,
1044                             struct nrs_orr_data, od_res);
1045
1046         if (orro->oo_quantum == 0 || orro->oo_round < orrd->od_round ||
1047             (orro->oo_active == 0 && orro->oo_quantum > 0)) {
1048
1049                 /**
1050                  * If there are no pending requests for the object/OST, but some
1051                  * of its quantum still remains unused, which implies we did not
1052                  * get a chance to schedule up to its maximum allowed batch size
1053                  * of requests in the previous round this object/OST
1054                  * participated in, schedule this next request on a new round;
1055                  * this avoids fragmentation of request batches caused by
1056                  * intermittent inactivity on the object/OST, at the expense of
1057                  * potentially slightly increased service time for the request
1058                  * batch this request will be a part of.
1059                  */
1060                 if (orro->oo_active == 0 && orro->oo_quantum > 0)
1061                         orro->oo_round++;
1062
1063                 /** A new scheduling round has commenced */
1064                 if (orro->oo_round < orrd->od_round)
1065                         orro->oo_round = orrd->od_round;
1066
1067                 /** I was not the last object/OST that scheduled a request */
1068                 if (orro->oo_sequence < orrd->od_sequence)
1069                         orro->oo_sequence = ++orrd->od_sequence;
1070                 /**
1071                  * Reset the quantum if we have reached the maximum quantum
1072                  * size for this batch, or even if we have not managed to
1073                  * complete a batch size up to its maximum allowed size.
1074                  * XXX: Accessed unlocked
1075                  */
1076                 orro->oo_quantum = orrd->od_quantum;
1077         }
1078
1079         nrq->nr_u.orr.or_round = orro->oo_round;
1080         nrq->nr_u.orr.or_sequence = orro->oo_sequence;
1081
1082         rc = cfs_binheap_insert(orrd->od_binheap, &nrq->nr_node);
1083         if (rc == 0) {
1084                 orro->oo_active++;
1085                 if (--orro->oo_quantum == 0)
1086                         orro->oo_round++;
1087         }
1088         return rc;
1089 }
1090
1091 /**
1092  * Removes request \a nrq from an ORR/TRR \a policy instance's set of queued
1093  * requests.
1094  *
1095  * \param[in] policy the policy
1096  * \param[in] nrq    the request to remove
1097  */
1098 static void nrs_orr_req_del(struct ptlrpc_nrs_policy *policy,
1099                             struct ptlrpc_nrs_request *nrq)
1100 {
1101         struct nrs_orr_data     *orrd;
1102         struct nrs_orr_object   *orro;
1103         bool                     is_root;
1104
1105         orro = container_of(nrs_request_resource(nrq),
1106                             struct nrs_orr_object, oo_res);
1107         orrd = container_of(nrs_request_resource(nrq)->res_parent,
1108                             struct nrs_orr_data, od_res);
1109
1110         LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
1111
1112         is_root = &nrq->nr_node == cfs_binheap_root(orrd->od_binheap);
1113
1114         cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
1115         orro->oo_active--;
1116
1117         /**
1118          * If we just deleted the node at the root of the binheap, we may have
1119          * to adjust round numbers.
1120          */
1121         if (unlikely(is_root)) {
1122                 /** Peek at the next request to be served */
1123                 cfs_binheap_node_t *node = cfs_binheap_root(orrd->od_binheap);
1124
1125                 /** No more requests */
1126                 if (unlikely(node == NULL)) {
1127                         orrd->od_round++;
1128                 } else {
1129                         nrq = container_of(node, struct ptlrpc_nrs_request,
1130                                            nr_node);
1131
1132                         if (orrd->od_round < nrq->nr_u.orr.or_round)
1133                                 orrd->od_round = nrq->nr_u.orr.or_round;
1134                 }
1135         }
1136 }
1137
1138 /**
1139  * Called right after the request \a nrq finishes being handled by ORR policy
1140  * instance \a policy.
1141  *
1142  * \param[in] policy the policy that handled the request
1143  * \param[in] nrq    the request that was handled
1144  */
1145 static void nrs_orr_req_stop(struct ptlrpc_nrs_policy *policy,
1146                              struct ptlrpc_nrs_request *nrq)
1147 {
1148         /** NB: resource control, credits etc can be added here */
1149         if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
1150                     NRS_POL_NAME_MAX) == 0)
1151                 CDEBUG(D_RPCTRACE,
1152                        "NRS: finished handling %s request for object with FID "
1153                        DFID", from OST with index %u, with round "LPU64"\n",
1154                        NRS_POL_NAME_ORR, PFID(&nrq->nr_u.orr.or_key.ok_fid),
1155                        nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round);
1156         else
1157                 CDEBUG(D_RPCTRACE,
1158                        "NRS: finished handling %s request from OST with index %u,"
1159                        " with round "LPU64"\n",
1160                        NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
1161                        nrq->nr_u.orr.or_round);
1162 }
1163
1164 /**
1165  * lprocfs interface
1166  */
1167
1168 #ifdef CONFIG_PROC_FS
1169
1170 /**
1171  * This allows to bundle the policy name into the lprocfs_vars::data pointer
1172  * so that lprocfs read/write functions can be used by both the ORR and TRR
1173  * policies.
1174  */
1175 static struct nrs_lprocfs_orr_data {
1176         struct ptlrpc_service   *svc;
1177         char                    *name;
1178 } lprocfs_orr_data = {
1179         .name = NRS_POL_NAME_ORR
1180 }, lprocfs_trr_data = {
1181         .name = NRS_POL_NAME_TRR
1182 };
1183
1184 /**
1185  * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
1186  * for ORR/TRR policy instances on both the regular and high-priority NRS head
1187  * of a service, as long as a policy instance is not in the
1188  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1189  * state are skipped later by nrs_orr_ctl().
1190  *
1191  * Quantum values are in # of RPCs, and the output is in YAML format.
1192  *
1193  * For example:
1194  *
1195  *      reg_quantum:256
1196  *      hp_quantum:8
1197  *
1198  * XXX: the CRR-N version of this, ptlrpc_lprocfs_rd_nrs_crrn_quantum() is
1199  * almost identical; it can be reworked and then reused for ORR/TRR.
1200  */
1201 static int
1202 ptlrpc_lprocfs_nrs_orr_quantum_seq_show(struct seq_file *m, void *data)
1203 {
1204         struct nrs_lprocfs_orr_data *orr_data = m->private;
1205         struct ptlrpc_service       *svc = orr_data->svc;
1206         __u16                        quantum;
1207         int                          rc;
1208
1209         /**
1210          * Perform two separate calls to this as only one of the NRS heads'
1211          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
1212          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1213          */
1214         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1215                                        orr_data->name,
1216                                        NRS_CTL_ORR_RD_QUANTUM,
1217                                        true, &quantum);
1218         if (rc == 0) {
1219                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_REG "%-5d\n", quantum);
1220                 /**
1221                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1222                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1223                  */
1224         } else if (rc != -ENODEV) {
1225                 return rc;
1226         }
1227
1228         /**
1229          * We know the ost_io service which is the only one ORR/TRR policies are
1230          * compatible with, do have an HP NRS head, but it may be best to guard
1231          * against a possible change of this in the future.
1232          */
1233         if (!nrs_svc_has_hp(svc))
1234                 goto no_hp;
1235
1236         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1237                                        orr_data->name, NRS_CTL_ORR_RD_QUANTUM,
1238                                        true, &quantum);
1239         if (rc == 0) {
1240                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
1241                 /**
1242                  * Ignore -ENODEV as the high priority NRS head's policy may be
1243                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1244                  */
1245         } else if (rc != -ENODEV) {
1246                 return rc;
1247         }
1248
1249 no_hp:
1250
1251         return rc;
1252 }
1253
1254 /**
1255  * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
1256  * for ORR/TRR policy instances of a service. The user can set the quantum size
1257  * for the regular and high priority NRS head separately by specifying each
1258  * value, or both together in a single invocation.
1259  *
1260  * For example:
1261  *
1262  * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=req_quantum:64, to set the
1263  * request quantum size of the ORR policy instance on the regular NRS head of
1264  * the ost_io service to 64
1265  *
1266  * lctl set_param ost.OSS.ost_io.nrs_trr_quantum=hp_quantum:8 to set the request
1267  * quantum size of the TRR policy instance on the high priority NRS head of the
1268  * ost_io service to 8
1269  *
1270  * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=32, to set both the request
1271  * quantum size of the ORR policy instance on both the regular and the high
1272  * priority NRS head of the ost_io service to 32
1273  *
1274  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
1275  * are skipped later by nrs_orr_ctl().
1276  *
1277  * XXX: the CRR-N version of this, ptlrpc_lprocfs_wr_nrs_crrn_quantum() is
1278  * almost identical; it can be reworked and then reused for ORR/TRR.
1279  */
1280 static ssize_t
1281 ptlrpc_lprocfs_nrs_orr_quantum_seq_write(struct file *file, const char *buffer,
1282                                          size_t count, loff_t *off)
1283 {
1284         struct seq_file             *m = file->private_data;
1285         struct nrs_lprocfs_orr_data *orr_data = m->private;
1286         struct ptlrpc_service       *svc = orr_data->svc;
1287         enum ptlrpc_nrs_queue_type   queue = 0;
1288         char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
1289         char                        *val;
1290         long                         quantum_reg;
1291         long                         quantum_hp;
1292         /** lprocfs_find_named_value() modifies its argument, so keep a copy */
1293         size_t                       count_copy;
1294         int                          rc = 0;
1295         int                          rc2 = 0;
1296
1297         if (count > (sizeof(kernbuf) - 1))
1298                 return -EINVAL;
1299
1300         if (copy_from_user(kernbuf, buffer, count))
1301                 return -EFAULT;
1302
1303         kernbuf[count] = '\0';
1304
1305         count_copy = count;
1306
1307         /**
1308          * Check if the regular quantum value has been specified
1309          */
1310         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
1311                                        &count_copy);
1312         if (val != kernbuf) {
1313                 quantum_reg = simple_strtol(val, NULL, 10);
1314
1315                 queue |= PTLRPC_NRS_QUEUE_REG;
1316         }
1317
1318         count_copy = count;
1319
1320         /**
1321          * Check if the high priority quantum value has been specified
1322          */
1323         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
1324                                        &count_copy);
1325         if (val != kernbuf) {
1326                 if (!nrs_svc_has_hp(svc))
1327                         return -ENODEV;
1328
1329                 quantum_hp = simple_strtol(val, NULL, 10);
1330
1331                 queue |= PTLRPC_NRS_QUEUE_HP;
1332         }
1333
1334         /**
1335          * If none of the queues has been specified, look for a valid numerical
1336          * value
1337          */
1338         if (queue == 0) {
1339                 if (!isdigit(kernbuf[0]))
1340                         return -EINVAL;
1341
1342                 quantum_reg = simple_strtol(kernbuf, NULL, 10);
1343
1344                 queue = PTLRPC_NRS_QUEUE_REG;
1345
1346                 if (nrs_svc_has_hp(svc)) {
1347                         queue |= PTLRPC_NRS_QUEUE_HP;
1348                         quantum_hp = quantum_reg;
1349                 }
1350         }
1351
1352         if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
1353             ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
1354             (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
1355             ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
1356                 return -EINVAL;
1357
1358         /**
1359          * We change the values on regular and HP NRS heads separately, so that
1360          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1361          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1362          * started the policy on either the regular or HP NRS head; i.e. we are
1363          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1364          * only if the operation fails with -ENODEV on all heads that have been
1365          * specified by the command; if at least one operation succeeds,
1366          * success is returned.
1367          */
1368         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1369                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1370                                                orr_data->name,
1371                                                NRS_CTL_ORR_WR_QUANTUM, false,
1372                                                &quantum_reg);
1373                 if ((rc < 0 && rc != -ENODEV) ||
1374                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1375                         return rc;
1376         }
1377
1378         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1379                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1380                                                 orr_data->name,
1381                                                 NRS_CTL_ORR_WR_QUANTUM, false,
1382                                                 &quantum_hp);
1383                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1384                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1385                         return rc2;
1386         }
1387
1388         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1389 }
1390 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_quantum);
1391
1392 #define LPROCFS_NRS_OFF_NAME_REG                "reg_offset_type:"
1393 #define LPROCFS_NRS_OFF_NAME_HP                 "hp_offset_type:"
1394
1395 #define LPROCFS_NRS_OFF_NAME_PHYSICAL           "physical"
1396 #define LPROCFS_NRS_OFF_NAME_LOGICAL            "logical"
1397
1398 /**
1399  * Retrieves the offset type used by ORR/TRR policy instances on both the
1400  * regular and high-priority NRS head of a service, as long as a policy
1401  * instance is not in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state;
1402  * policy instances in this state are skipped later by nrs_orr_ctl().
1403  *
1404  * Offset type information is a (physical|logical) string, and output is
1405  * in YAML format.
1406  *
1407  * For example:
1408  *
1409  *      reg_offset_type:physical
1410  *      hp_offset_type:logical
1411  */
1412 static int
1413 ptlrpc_lprocfs_nrs_orr_offset_type_seq_show(struct seq_file *m, void *data)
1414 {
1415         struct nrs_lprocfs_orr_data *orr_data = m->private;
1416         struct ptlrpc_service       *svc = orr_data->svc;
1417         bool                         physical;
1418         int                          rc;
1419
1420         /**
1421          * Perform two separate calls to this as only one of the NRS heads'
1422          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1423          * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1424          */
1425         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1426                                        orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1427                                        true, &physical);
1428         if (rc == 0) {
1429                 seq_printf(m, LPROCFS_NRS_OFF_NAME_REG"%s\n",
1430                            physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1431                            LPROCFS_NRS_OFF_NAME_LOGICAL);
1432                 /**
1433                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1434                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1435                  */
1436         } else if (rc != -ENODEV) {
1437                 return rc;
1438         }
1439
1440         /**
1441          * We know the ost_io service which is the only one ORR/TRR policies are
1442          * compatible with, do have an HP NRS head, but it may be best to guard
1443          * against a possible change of this in the future.
1444          */
1445         if (!nrs_svc_has_hp(svc))
1446                 goto no_hp;
1447
1448         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1449                                        orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1450                                        true, &physical);
1451         if (rc == 0) {
1452                 seq_printf(m, LPROCFS_NRS_OFF_NAME_HP"%s\n",
1453                            physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1454                            LPROCFS_NRS_OFF_NAME_LOGICAL);
1455                 /**
1456                  * Ignore -ENODEV as the high priority NRS head's policy may be
1457                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1458                  */
1459         } else if (rc != -ENODEV) {
1460                 return rc;
1461         }
1462
1463 no_hp:
1464         return rc;
1465 }
1466
1467 /**
1468  * Max valid command string is the size of the labels, plus "physical" twice.
1469  * plus a separating ' '
1470  */
1471 #define LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD                                        \
1472         sizeof(LPROCFS_NRS_OFF_NAME_REG LPROCFS_NRS_OFF_NAME_PHYSICAL " "      \
1473                LPROCFS_NRS_OFF_NAME_HP LPROCFS_NRS_OFF_NAME_PHYSICAL)
1474
1475 /**
1476  * Sets the type of offsets used to order RPCs in ORR/TRR policy instances. The
1477  * user can set offset type for the regular or high priority NRS head
1478  * separately by specifying each value, or both together in a single invocation.
1479  *
1480  * For example:
1481  *
1482  * lctl set_param ost.OSS.ost_io.nrs_orr_offset_type=
1483  * reg_offset_type:physical, to enable the ORR policy instance on the regular
1484  * NRS head of the ost_io service to use physical disk offset ordering.
1485  *
1486  * lctl set_param ost.OSS.ost_io.nrs_trr_offset_type=logical, to enable the TRR
1487  * policy instances on both the regular ang high priority NRS heads of the
1488  * ost_io service to use logical file offset ordering.
1489  *
1490  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1491  * are skipped later by nrs_orr_ctl().
1492  */
1493 static ssize_t
1494 ptlrpc_lprocfs_nrs_orr_offset_type_seq_write(struct file *file,
1495                                              const char *buffer, size_t count,
1496                                              loff_t *off)
1497 {
1498         struct seq_file             *m = file->private_data;
1499         struct nrs_lprocfs_orr_data *orr_data = m->private;
1500         struct ptlrpc_service       *svc = orr_data->svc;
1501         enum ptlrpc_nrs_queue_type   queue = 0;
1502         char                         kernbuf[LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD];
1503         char                        *val_reg;
1504         char                        *val_hp;
1505         bool                         physical_reg;
1506         bool                         physical_hp;
1507         size_t                       count_copy;
1508         int                          rc = 0;
1509         int                          rc2 = 0;
1510
1511         if (count > (sizeof(kernbuf) - 1))
1512                 return -EINVAL;
1513
1514         if (copy_from_user(kernbuf, buffer, count))
1515                 return -EFAULT;
1516
1517         kernbuf[count] = '\0';
1518
1519         count_copy = count;
1520
1521         /**
1522          * Check if the regular offset type has been specified
1523          */
1524         val_reg = lprocfs_find_named_value(kernbuf,
1525                                            LPROCFS_NRS_OFF_NAME_REG,
1526                                            &count_copy);
1527         if (val_reg != kernbuf)
1528                 queue |= PTLRPC_NRS_QUEUE_REG;
1529
1530         count_copy = count;
1531
1532         /**
1533          * Check if the high priority offset type has been specified
1534          */
1535         val_hp = lprocfs_find_named_value(kernbuf, LPROCFS_NRS_OFF_NAME_HP,
1536                                           &count_copy);
1537         if (val_hp != kernbuf) {
1538                 if (!nrs_svc_has_hp(svc))
1539                         return -ENODEV;
1540
1541                 queue |= PTLRPC_NRS_QUEUE_HP;
1542         }
1543
1544         /**
1545          * If none of the queues has been specified, there may be a valid
1546          * command string at the start of the buffer.
1547          */
1548         if (queue == 0) {
1549                 queue = PTLRPC_NRS_QUEUE_REG;
1550
1551                 if (nrs_svc_has_hp(svc))
1552                         queue |= PTLRPC_NRS_QUEUE_HP;
1553         }
1554
1555         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1556                 if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1557                             sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1558                         physical_reg = true;
1559                 else if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_LOGICAL,
1560                          sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1561                         physical_reg = false;
1562                 else
1563                         return -EINVAL;
1564         }
1565
1566         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1567                 if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1568                             sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1569                         physical_hp = true;
1570                 else if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_LOGICAL,
1571                                  sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1572                         physical_hp = false;
1573                 else
1574                         return -EINVAL;
1575         }
1576
1577         /**
1578          * We change the values on regular and HP NRS heads separately, so that
1579          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1580          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1581          * started the policy on either the regular or HP NRS head; i.e. we are
1582          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1583          * only if the operation fails with -ENODEV on all heads that have been
1584          * specified by the command; if at least one operation succeeds,
1585          * success is returned.
1586          */
1587         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1588                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1589                                                orr_data->name,
1590                                                NRS_CTL_ORR_WR_OFF_TYPE, false,
1591                                                &physical_reg);
1592                 if ((rc < 0 && rc != -ENODEV) ||
1593                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1594                         return rc;
1595         }
1596
1597         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1598                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1599                                                 orr_data->name,
1600                                                 NRS_CTL_ORR_WR_OFF_TYPE, false,
1601                                                 &physical_hp);
1602                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1603                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1604                         return rc2;
1605         }
1606
1607         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1608 }
1609 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_offset_type);
1610
1611 #define NRS_LPROCFS_REQ_SUPP_NAME_REG           "reg_supported:"
1612 #define NRS_LPROCFS_REQ_SUPP_NAME_HP            "hp_supported:"
1613
1614 #define LPROCFS_NRS_SUPP_NAME_READS             "reads"
1615 #define LPROCFS_NRS_SUPP_NAME_WRITES            "writes"
1616 #define LPROCFS_NRS_SUPP_NAME_READWRITES        "reads_and_writes"
1617
1618 /**
1619  * Translates enum nrs_orr_supp values to a corresponding string.
1620  */
1621 static const char *nrs_orr_supp2str(enum nrs_orr_supp supp)
1622 {
1623         switch(supp) {
1624         default:
1625                 LBUG();
1626         case NOS_OST_READ:
1627                 return LPROCFS_NRS_SUPP_NAME_READS;
1628         case NOS_OST_WRITE:
1629                 return LPROCFS_NRS_SUPP_NAME_WRITES;
1630         case NOS_OST_RW:
1631                 return LPROCFS_NRS_SUPP_NAME_READWRITES;
1632         }
1633 }
1634
1635 /**
1636  * Translates strings to the corresponding enum nrs_orr_supp value
1637  */
1638 static enum nrs_orr_supp nrs_orr_str2supp(const char *val)
1639 {
1640         if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READWRITES,
1641                     sizeof(LPROCFS_NRS_SUPP_NAME_READWRITES) - 1) == 0)
1642                 return NOS_OST_RW;
1643         else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READS,
1644                          sizeof(LPROCFS_NRS_SUPP_NAME_READS) - 1) == 0)
1645                 return NOS_OST_READ;
1646         else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_WRITES,
1647                          sizeof(LPROCFS_NRS_SUPP_NAME_WRITES) - 1) == 0)
1648                 return NOS_OST_WRITE;
1649         else
1650                 return -EINVAL;
1651 }
1652
1653 /**
1654  * Retrieves the type of RPCs handled at the point of invocation by ORR/TRR
1655  * policy instances on both the regular and high-priority NRS head of a service,
1656  * as long as a policy instance is not in the
1657  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1658  * state are skipped later by nrs_orr_ctl().
1659  *
1660  * Supported RPC type information is a (reads|writes|reads_and_writes) string,
1661  * and output is in YAML format.
1662  *
1663  * For example:
1664  *
1665  *      reg_supported:reads
1666  *      hp_supported:reads_and_writes
1667  */
1668 static int
1669 ptlrpc_lprocfs_nrs_orr_supported_seq_show(struct seq_file *m, void *data)
1670 {
1671         struct nrs_lprocfs_orr_data *orr_data = m->private;
1672         struct ptlrpc_service       *svc = orr_data->svc;
1673         enum nrs_orr_supp            supported;
1674         int                          rc;
1675
1676         /**
1677          * Perform two separate calls to this as only one of the NRS heads'
1678          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1679          * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1680          */
1681         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1682                                        orr_data->name,
1683                                        NRS_CTL_ORR_RD_SUPP_REQ, true,
1684                                        &supported);
1685
1686         if (rc == 0) {
1687                 seq_printf(m, NRS_LPROCFS_REQ_SUPP_NAME_REG"%s\n",
1688                            nrs_orr_supp2str(supported));
1689                 /**
1690                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1691                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1692                  */
1693         } else if (rc != -ENODEV) {
1694                 return rc;
1695         }
1696
1697         /**
1698          * We know the ost_io service which is the only one ORR/TRR policies are
1699          * compatible with, do have an HP NRS head, but it may be best to guard
1700          * against a possible change of this in the future.
1701          */
1702         if (!nrs_svc_has_hp(svc))
1703                 goto no_hp;
1704
1705         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1706                                        orr_data->name,
1707                                        NRS_CTL_ORR_RD_SUPP_REQ, true,
1708                                        &supported);
1709         if (rc == 0) {
1710                 seq_printf(m, NRS_LPROCFS_REQ_SUPP_NAME_HP"%s\n",
1711                            nrs_orr_supp2str(supported));
1712                 /**
1713                  * Ignore -ENODEV as the high priority NRS head's policy may be
1714                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1715                  */
1716         } else if (rc != -ENODEV) {
1717                 return rc;
1718         }
1719
1720 no_hp:
1721
1722         return rc;
1723 }
1724
1725 /**
1726  * Max valid command string is the size of the labels, plus "reads_and_writes"
1727  * twice, plus a separating ' '
1728  */
1729 #define LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD                                        \
1730         sizeof(NRS_LPROCFS_REQ_SUPP_NAME_REG LPROCFS_NRS_SUPP_NAME_READWRITES  \
1731                NRS_LPROCFS_REQ_SUPP_NAME_HP LPROCFS_NRS_SUPP_NAME_READWRITES   \
1732                " ")
1733
1734 /**
1735  * Sets the type of RPCs handled by ORR/TRR policy instances. The user can
1736  * modify this setting for the regular or high priority NRS heads separately, or
1737  * both together in a single invocation.
1738  *
1739  * For example:
1740  *
1741  * lctl set_param ost.OSS.ost_io.nrs_orr_supported=
1742  * "reg_supported:reads", to enable the ORR policy instance on the regular NRS
1743  * head of the ost_io service to handle OST_READ RPCs.
1744  *
1745  * lctl set_param ost.OSS.ost_io.nrs_trr_supported=reads_and_writes, to enable
1746  * the TRR policy instances on both the regular ang high priority NRS heads of
1747  * the ost_io service to use handle OST_READ and OST_WRITE RPCs.
1748  *
1749  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1750  * are skipped later by nrs_orr_ctl().
1751  */
1752 static ssize_t
1753 ptlrpc_lprocfs_nrs_orr_supported_seq_write(struct file *file,
1754                                            const char *buffer, size_t count,
1755                                            loff_t *off)
1756 {
1757         struct seq_file             *m = file->private_data;
1758         struct nrs_lprocfs_orr_data *orr_data = m->private;
1759         struct ptlrpc_service       *svc = orr_data->svc;
1760         enum ptlrpc_nrs_queue_type   queue = 0;
1761         char                         kernbuf[LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD];
1762         char                        *val_reg;
1763         char                        *val_hp;
1764         enum nrs_orr_supp            supp_reg;
1765         enum nrs_orr_supp            supp_hp;
1766         size_t                       count_copy;
1767         int                          rc = 0;
1768         int                          rc2 = 0;
1769
1770         if (count > (sizeof(kernbuf) - 1))
1771                 return -EINVAL;
1772
1773         if (copy_from_user(kernbuf, buffer, count))
1774                 return -EFAULT;
1775
1776         kernbuf[count] = '\0';
1777
1778         count_copy = count;
1779
1780         /**
1781          * Check if the regular supported requests setting has been specified
1782          */
1783         val_reg = lprocfs_find_named_value(kernbuf,
1784                                            NRS_LPROCFS_REQ_SUPP_NAME_REG,
1785                                            &count_copy);
1786         if (val_reg != kernbuf)
1787                 queue |= PTLRPC_NRS_QUEUE_REG;
1788
1789         count_copy = count;
1790
1791         /**
1792          * Check if the high priority supported requests setting has been
1793          * specified
1794          */
1795         val_hp = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_REQ_SUPP_NAME_HP,
1796                                           &count_copy);
1797         if (val_hp != kernbuf) {
1798                 if (!nrs_svc_has_hp(svc))
1799                         return -ENODEV;
1800
1801                 queue |= PTLRPC_NRS_QUEUE_HP;
1802         }
1803
1804         /**
1805          * If none of the queues has been specified, there may be a valid
1806          * command string at the start of the buffer.
1807          */
1808         if (queue == 0) {
1809                 queue = PTLRPC_NRS_QUEUE_REG;
1810
1811                 if (nrs_svc_has_hp(svc))
1812                         queue |= PTLRPC_NRS_QUEUE_HP;
1813         }
1814
1815         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1816                 supp_reg = nrs_orr_str2supp(val_reg);
1817                 if (supp_reg == -EINVAL)
1818                         return -EINVAL;
1819         }
1820
1821         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1822                 supp_hp = nrs_orr_str2supp(val_hp);
1823                 if (supp_hp == -EINVAL)
1824                         return -EINVAL;
1825         }
1826
1827         /**
1828          * We change the values on regular and HP NRS heads separately, so that
1829          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1830          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1831          * started the policy on either the regular or HP NRS head; i.e. we are
1832          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1833          * only if the operation fails with -ENODEV on all heads that have been
1834          * specified by the command; if at least one operation succeeds,
1835          * success is returned.
1836          */
1837         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1838                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1839                                                orr_data->name,
1840                                                NRS_CTL_ORR_WR_SUPP_REQ, false,
1841                                                &supp_reg);
1842                 if ((rc < 0 && rc != -ENODEV) ||
1843                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1844                         return rc;
1845         }
1846
1847         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1848                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1849                                                 orr_data->name,
1850                                                 NRS_CTL_ORR_WR_SUPP_REQ, false,
1851                                                 &supp_hp);
1852                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1853                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1854                         return rc2;
1855         }
1856
1857         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1858 }
1859 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_supported);
1860
1861 static int nrs_orr_lprocfs_init(struct ptlrpc_service *svc)
1862 {
1863         int     i;
1864
1865         struct lprocfs_vars nrs_orr_lprocfs_vars[] = {
1866                 { .name         = "nrs_orr_quantum",
1867                   .fops         = &ptlrpc_lprocfs_nrs_orr_quantum_fops  },
1868                 { .name         = "nrs_orr_offset_type",
1869                   .fops         = &ptlrpc_lprocfs_nrs_orr_offset_type_fops },
1870                 { .name         = "nrs_orr_supported",
1871                   .fops         = &ptlrpc_lprocfs_nrs_orr_supported_fops },
1872                 { NULL }
1873         };
1874
1875         if (svc->srv_procroot == NULL)
1876                 return 0;
1877
1878         lprocfs_orr_data.svc = svc;
1879
1880         for (i = 0; i < ARRAY_SIZE(nrs_orr_lprocfs_vars); i++)
1881                 nrs_orr_lprocfs_vars[i].data = &lprocfs_orr_data;
1882
1883         return lprocfs_add_vars(svc->srv_procroot, nrs_orr_lprocfs_vars, NULL);
1884 }
1885
1886 static void nrs_orr_lprocfs_fini(struct ptlrpc_service *svc)
1887 {
1888         if (svc->srv_procroot == NULL)
1889                 return;
1890
1891         lprocfs_remove_proc_entry("nrs_orr_quantum", svc->srv_procroot);
1892         lprocfs_remove_proc_entry("nrs_orr_offset_type", svc->srv_procroot);
1893         lprocfs_remove_proc_entry("nrs_orr_supported", svc->srv_procroot);
1894 }
1895
1896 #endif /* CONFIG_PROC_FS */
1897
1898 static const struct ptlrpc_nrs_pol_ops nrs_orr_ops = {
1899         .op_policy_init         = nrs_orr_init,
1900         .op_policy_start        = nrs_orr_start,
1901         .op_policy_stop         = nrs_orr_stop,
1902         .op_policy_ctl          = nrs_orr_ctl,
1903         .op_res_get             = nrs_orr_res_get,
1904         .op_res_put             = nrs_orr_res_put,
1905         .op_req_get             = nrs_orr_req_get,
1906         .op_req_enqueue         = nrs_orr_req_add,
1907         .op_req_dequeue         = nrs_orr_req_del,
1908         .op_req_stop            = nrs_orr_req_stop,
1909 #ifdef CONFIG_PROC_FS
1910         .op_lprocfs_init        = nrs_orr_lprocfs_init,
1911         .op_lprocfs_fini        = nrs_orr_lprocfs_fini,
1912 #endif
1913 };
1914
1915 struct ptlrpc_nrs_pol_conf nrs_conf_orr = {
1916         .nc_name                = NRS_POL_NAME_ORR,
1917         .nc_ops                 = &nrs_orr_ops,
1918         .nc_compat              = nrs_policy_compat_one,
1919         .nc_compat_svc_name     = "ost_io",
1920 };
1921
1922 /**
1923  * TRR, Target-based Round Robin policy
1924  *
1925  * TRR reuses much of the functions and data structures of ORR
1926  */
1927
1928 #ifdef CONFIG_PROC_FS
1929
1930 static int nrs_trr_lprocfs_init(struct ptlrpc_service *svc)
1931 {
1932         int     i;
1933
1934         struct lprocfs_vars nrs_trr_lprocfs_vars[] = {
1935                 { .name         = "nrs_trr_quantum",
1936                   .fops         = &ptlrpc_lprocfs_nrs_orr_quantum_fops },
1937                 { .name         = "nrs_trr_offset_type",
1938                   .fops         = &ptlrpc_lprocfs_nrs_orr_offset_type_fops },
1939                 { .name         = "nrs_trr_supported",
1940                   .fops         = &ptlrpc_lprocfs_nrs_orr_supported_fops },
1941                 { NULL }
1942         };
1943
1944         if (svc->srv_procroot == NULL)
1945                 return 0;
1946
1947         lprocfs_trr_data.svc = svc;
1948
1949         for (i = 0; i < ARRAY_SIZE(nrs_trr_lprocfs_vars); i++)
1950                 nrs_trr_lprocfs_vars[i].data = &lprocfs_trr_data;
1951
1952         return lprocfs_add_vars(svc->srv_procroot, nrs_trr_lprocfs_vars, NULL);
1953 }
1954
1955 static void nrs_trr_lprocfs_fini(struct ptlrpc_service *svc)
1956 {
1957         if (svc->srv_procroot == NULL)
1958                 return;
1959
1960         lprocfs_remove_proc_entry("nrs_trr_quantum", svc->srv_procroot);
1961         lprocfs_remove_proc_entry("nrs_trr_offset_type", svc->srv_procroot);
1962         lprocfs_remove_proc_entry("nrs_trr_supported", svc->srv_procroot);
1963 }
1964
1965 #endif /* CONFIG_PROC_FS */
1966
1967 /**
1968  * Reuse much of the ORR functionality for TRR.
1969  */
1970 static const struct ptlrpc_nrs_pol_ops nrs_trr_ops = {
1971         .op_policy_init         = nrs_orr_init,
1972         .op_policy_start        = nrs_orr_start,
1973         .op_policy_stop         = nrs_orr_stop,
1974         .op_policy_ctl          = nrs_orr_ctl,
1975         .op_res_get             = nrs_orr_res_get,
1976         .op_res_put             = nrs_orr_res_put,
1977         .op_req_get             = nrs_orr_req_get,
1978         .op_req_enqueue         = nrs_orr_req_add,
1979         .op_req_dequeue         = nrs_orr_req_del,
1980         .op_req_stop            = nrs_orr_req_stop,
1981 #ifdef CONFIG_PROC_FS
1982         .op_lprocfs_init        = nrs_trr_lprocfs_init,
1983         .op_lprocfs_fini        = nrs_trr_lprocfs_fini,
1984 #endif
1985 };
1986
1987 struct ptlrpc_nrs_pol_conf nrs_conf_trr = {
1988         .nc_name                = NRS_POL_NAME_TRR,
1989         .nc_ops                 = &nrs_trr_ops,
1990         .nc_compat              = nrs_policy_compat_one,
1991         .nc_compat_svc_name     = "ost_io",
1992 };
1993
1994 /** @} ORR/TRR policy */
1995
1996 /** @} nrs */
1997
1998 #endif /* HAVE_SERVER_SUPPORT */