Whamcloud - gitweb
Revert "LU-5275 lprocfs: remove last of non seq data structs and functions."
[fs/lustre-release.git] / lustre / ptlrpc / nrs_orr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs_orr.c
29  *
30  * Network Request Scheduler (NRS) ORR and TRR policies
31  *
32  * Request scheduling in a Round-Robin manner over backend-fs objects and OSTs
33  * respectively
34  *
35  * Author: Liang Zhen <liang@whamcloud.com>
36  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
37  */
38 #ifdef HAVE_SERVER_SUPPORT
39
40 /**
41  * \addtogoup nrs
42  * @{
43  */
44 #define DEBUG_SUBSYSTEM S_RPC
45 #include <obd_support.h>
46 #include <obd_class.h>
47 #include <lustre_net.h>
48 #include <lustre/lustre_idl.h>
49 #include <lustre_req_layout.h>
50 #include "ptlrpc_internal.h"
51
52 /**
53  * \name ORR/TRR policy
54  *
55  * ORR/TRR (Object-based Round Robin/Target-based Round Robin) NRS policies
56  *
57  * ORR performs batched Round Robin shceduling of brw RPCs, based on the FID of
58  * the backend-fs object that the brw RPC pertains to; the TRR policy performs
59  * batched Round Robin scheduling of brw RPCs, based on the OST index that the
60  * RPC pertains to. Both policies also order RPCs in each batch in ascending
61  * offset order, which is lprocfs-tunable between logical file offsets, and
62  * physical disk offsets, as reported by fiemap.
63  *
64  * The TRR policy reuses much of the functionality of ORR. These two scheduling
65  * algorithms could alternatively be implemented under a single NRS policy, that
66  * uses an lprocfs tunable in order to switch between the two types of
67  * scheduling behaviour. The two algorithms have been implemented as separate
68  * policies for reasons of clarity to the user, and to avoid issues that would
69  * otherwise arise at the point of switching between behaviours in the case of
70  * having a single policy, such as resource cleanup for nrs_orr_object
71  * instances. It is possible that this may need to be re-examined in the future,
72  * along with potentially coalescing other policies that perform batched request
73  * scheduling in a Round-Robin manner, all into one policy.
74  *
75  * @{
76  */
77
78 #define NRS_POL_NAME_ORR        "orr"
79 #define NRS_POL_NAME_TRR        "trr"
80
81 /**
82  * Checks if the RPC type of \a nrq is currently handled by an ORR/TRR policy
83  *
84  * \param[in]  orrd   the ORR/TRR policy scheduler instance
85  * \param[in]  nrq    the request
86  * \param[out] opcode the opcode is saved here, just in order to avoid calling
87  *                    lustre_msg_get_opc() again later
88  *
89  * \retval true  request type is supported by the policy instance
90  * \retval false request type is not supported by the policy instance
91  */
92 static bool nrs_orr_req_supported(struct nrs_orr_data *orrd,
93                                   struct ptlrpc_nrs_request *nrq, __u32 *opcode)
94 {
95         struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
96                                                    rq_nrq);
97         __u32                   opc = lustre_msg_get_opc(req->rq_reqmsg);
98         bool                    rc = false;
99
100         /**
101          * XXX: nrs_orr_data::od_supp accessed unlocked.
102          */
103         switch (opc) {
104         case OST_READ:
105                 rc = orrd->od_supp & NOS_OST_READ;
106                 break;
107         case OST_WRITE:
108                 rc = orrd->od_supp & NOS_OST_WRITE;
109                 break;
110         }
111
112         if (rc)
113                 *opcode = opc;
114
115         return rc;
116 }
117
118 /**
119  * Returns the ORR/TRR key fields for the request \a nrq in \a key.
120  *
121  * \param[in]  orrd the ORR/TRR policy scheduler instance
122  * \param[in]  nrq  the request
123  * \param[in]  opc  the request's opcode
124  * \param[in]  name the policy name
125  * \param[out] key  fields of the key are returned here.
126  *
127  * \retval 0   key filled successfully
128  * \retval < 0 error
129  */
130 static int nrs_orr_key_fill(struct nrs_orr_data *orrd,
131                             struct ptlrpc_nrs_request *nrq, __u32 opc,
132                             char *name, struct nrs_orr_key *key)
133 {
134         struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
135                                                    rq_nrq);
136         struct ost_body        *body;
137         __u32                   ost_idx;
138         bool                    is_orr = strncmp(name, NRS_POL_NAME_ORR,
139                                                  NRS_POL_NAME_MAX) == 0;
140
141         LASSERT(req != NULL);
142
143         /**
144          * This is an attempt to fill in the request key fields while
145          * moving a request from the regular to the high-priority NRS
146          * head (via ldlm_lock_reorder_req()), but the request key has
147          * been adequately filled when nrs_orr_res_get() was called through
148          * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR/TRR
149          * policy, so there is nothing to do.
150          */
151         if ((is_orr && nrq->nr_u.orr.or_orr_set) ||
152             (!is_orr && nrq->nr_u.orr.or_trr_set)) {
153                 *key = nrq->nr_u.orr.or_key;
154                 return 0;
155         }
156
157         if (nrq->nr_u.orr.or_orr_set || nrq->nr_u.orr.or_trr_set)
158                 memset(&nrq->nr_u.orr.or_key, 0, sizeof(nrq->nr_u.orr.or_key));
159
160         ost_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
161
162         if (is_orr) {
163                 int     rc;
164                 /**
165                  * The request pill for OST_READ and OST_WRITE requests is
166                  * initialized in the ost_io service's
167                  * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(),
168                  * so no need to redo it here.
169                  */
170                 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
171                 if (body == NULL)
172                         RETURN(-EFAULT);
173
174                 rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx);
175                 if (rc < 0)
176                         return rc;
177
178                 nrq->nr_u.orr.or_orr_set = 1;
179         } else {
180                 key->ok_idx = ost_idx;
181                 nrq->nr_u.orr.or_trr_set = 1;
182         }
183
184         return 0;
185 }
186
187 /**
188  * Populates the range values in \a range with logical offsets obtained via
189  * \a nb.
190  *
191  * \param[in]  nb       niobuf_remote struct array for this request
192  * \param[in]  niocount count of niobuf_remote structs for this request
193  * \param[out] range    the offset range is returned here
194  */
195 static void nrs_orr_range_fill_logical(struct niobuf_remote *nb, int niocount,
196                                        struct nrs_orr_req_range *range)
197 {
198         /* Should we do this at page boundaries ? */
199         range->or_start = nb[0].rnb_offset & CFS_PAGE_MASK;
200         range->or_end = (nb[niocount - 1].rnb_offset +
201                          nb[niocount - 1].rnb_len - 1) | ~CFS_PAGE_MASK;
202 }
203
204 /**
205  * We obtain information just for a single extent, as the request can only be in
206  * a single place in the binary heap anyway.
207  */
208 #define ORR_NUM_EXTENTS 1
209
210 /**
211  * Converts the logical file offset range in \a range, to a physical disk offset
212  * range in \a range, for a request. Uses obd_get_info() in order to carry out a
213  * fiemap call and obtain backend-fs extent information. The returned range is
214  * in physical block numbers.
215  *
216  * \param[in]     nrq   the request
217  * \param[in]     oa    obdo struct for this request
218  * \param[in,out] range the offset range in bytes; logical range in, physical
219  *                      range out
220  *
221  * \retval 0    physical offsets obtained successfully
222  * \retvall < 0 error
223  */
224 static int nrs_orr_range_fill_physical(struct ptlrpc_nrs_request *nrq,
225                                        struct obdo *oa,
226                                        struct nrs_orr_req_range *range)
227 {
228         struct ptlrpc_request     *req = container_of(nrq,
229                                                       struct ptlrpc_request,
230                                                       rq_nrq);
231         char                       fiemap_buf[offsetof(struct ll_user_fiemap,
232                                                   fm_extents[ORR_NUM_EXTENTS])];
233         struct ll_user_fiemap     *fiemap = (struct ll_user_fiemap *)fiemap_buf;
234         struct ll_fiemap_info_key  key;
235         loff_t                     start;
236         loff_t                     end;
237         int                        rc;
238
239         key = (typeof(key)) {
240                 .name = KEY_FIEMAP,
241                 .oa = *oa,
242                 .fiemap = {
243                         .fm_start = range->or_start,
244                         .fm_length = range->or_end - range->or_start,
245                         .fm_extent_count = ORR_NUM_EXTENTS
246                 }
247         };
248
249         rc = obd_get_info(req->rq_svc_thread->t_env, req->rq_export,
250                           sizeof(key), &key, NULL, fiemap, NULL);
251         if (rc < 0)
252                 GOTO(out, rc);
253
254         if (fiemap->fm_mapped_extents == 0 ||
255             fiemap->fm_mapped_extents > ORR_NUM_EXTENTS)
256                 GOTO(out, rc = -EFAULT);
257
258         /**
259          * Calculate the physical offset ranges for the request from the extent
260          * information and the logical request offsets.
261          */
262         start = fiemap->fm_extents[0].fe_physical + range->or_start -
263                 fiemap->fm_extents[0].fe_logical;
264         end = start + range->or_end - range->or_start;
265
266         range->or_start = start;
267         range->or_end = end;
268
269         nrq->nr_u.orr.or_physical_set = 1;
270 out:
271         return rc;
272 }
273
274 /**
275  * Sets the offset range the request covers; either in logical file
276  * offsets or in physical disk offsets.
277  *
278  * \param[in] nrq        the request
279  * \param[in] orrd       the ORR/TRR policy scheduler instance
280  * \param[in] opc        the request's opcode
281  * \param[in] moving_req is the request in the process of moving onto the
282  *                       high-priority NRS head?
283  *
284  * \retval 0    range filled successfully
285  * \retval != 0 error
286  */
287 static int nrs_orr_range_fill(struct ptlrpc_nrs_request *nrq,
288                               struct nrs_orr_data *orrd, __u32 opc,
289                               bool moving_req)
290 {
291         struct ptlrpc_request       *req = container_of(nrq,
292                                                         struct ptlrpc_request,
293                                                         rq_nrq);
294         struct obd_ioobj            *ioo;
295         struct niobuf_remote        *nb;
296         struct ost_body             *body;
297         struct nrs_orr_req_range     range;
298         int                          niocount;
299         int                          rc = 0;
300
301         /**
302          * If we are scheduling using physical disk offsets, but we have filled
303          * the offset information in the request previously
304          * (i.e. ldlm_lock_reorder_req() is moving the request to the
305          * high-priority NRS head), there is no need to do anything, and we can
306          * exit. Moreover than the lack of need, we would be unable to perform
307          * the obd_get_info() call required in nrs_orr_range_fill_physical(),
308          * because ldlm_lock_reorder_lock() calls into here while holding a
309          * spinlock, and retrieving fiemap information via obd_get_info() is a
310          * potentially sleeping operation.
311          */
312         if (orrd->od_physical && nrq->nr_u.orr.or_physical_set)
313                 return 0;
314
315         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
316         if (ioo == NULL)
317                 GOTO(out, rc = -EFAULT);
318
319         niocount = ioo->ioo_bufcnt;
320
321         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
322         if (nb == NULL)
323                 GOTO(out, rc = -EFAULT);
324
325         /**
326          * Use logical information from niobuf_remote structures.
327          */
328         nrs_orr_range_fill_logical(nb, niocount, &range);
329
330         /**
331          * Obtain physical offsets if selected, and this is an OST_READ RPC
332          * RPC. We do not enter this block if moving_req is set which indicates
333          * that the request is being moved to the high-priority NRS head by
334          * ldlm_lock_reorder_req(), as that function calls in here while holding
335          * a spinlock, and nrs_orr_range_physical() can sleep, so we just use
336          * logical file offsets for the range values for such requests.
337          */
338         if (orrd->od_physical && opc == OST_READ && !moving_req) {
339                 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
340                 if (body == NULL)
341                         GOTO(out, rc = -EFAULT);
342
343                 /**
344                  * Translate to physical block offsets from backend filesystem
345                  * extents.
346                  * Ignore return values; if obtaining the physical offsets
347                  * fails, use the logical offsets.
348                  */
349                 nrs_orr_range_fill_physical(nrq, &body->oa, &range);
350         }
351
352         nrq->nr_u.orr.or_range = range;
353 out:
354         return rc;
355 }
356
357 /**
358  * Generates a character string that can be used in order to register uniquely
359  * named libcfs_hash and slab objects for ORR/TRR policy instances. The
360  * character string is unique per policy instance, as it includes the policy's
361  * name, the CPT number, and a {reg|hp} token, and there is one policy instance
362  * per NRS head on each CPT, and the policy is only compatible with the ost_io
363  * service.
364  *
365  * \param[in] policy the policy instance
366  * \param[out] name  the character array that will hold the generated name
367  */
368 static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name)
369 {
370         snprintf(name, NRS_ORR_OBJ_NAME_MAX, "%s%s%s%d",
371                  "nrs_", policy->pol_desc->pd_name,
372                  policy->pol_nrs->nrs_queue_type == PTLRPC_NRS_QUEUE_REG ?
373                  "_reg_" : "_hp_", nrs_pol2cptid(policy));
374 }
375
376 /**
377  * ORR/TRR hash operations
378  */
379 #define NRS_ORR_BITS            24
380 #define NRS_ORR_BKT_BITS        12
381 #define NRS_ORR_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | CFS_HASH_ASSERT_EMPTY)
382
383 #define NRS_TRR_BITS            4
384 #define NRS_TRR_BKT_BITS        2
385 #define NRS_TRR_HASH_FLAGS      CFS_HASH_SPIN_BKTLOCK
386
387 static unsigned nrs_orr_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask)
388 {
389         return cfs_hash_djb2_hash(key, sizeof(struct nrs_orr_key), mask);
390 }
391
392 static void *nrs_orr_hop_key(struct hlist_node *hnode)
393 {
394         struct nrs_orr_object *orro = hlist_entry(hnode,
395                                                       struct nrs_orr_object,
396                                                       oo_hnode);
397         return &orro->oo_key;
398 }
399
400 static int nrs_orr_hop_keycmp(const void *key, struct hlist_node *hnode)
401 {
402         struct nrs_orr_object *orro = hlist_entry(hnode,
403                                                       struct nrs_orr_object,
404                                                       oo_hnode);
405
406         return lu_fid_eq(&orro->oo_key.ok_fid,
407                          &((struct nrs_orr_key *)key)->ok_fid);
408 }
409
410 static void *nrs_orr_hop_object(struct hlist_node *hnode)
411 {
412         return hlist_entry(hnode, struct nrs_orr_object, oo_hnode);
413 }
414
415 static void nrs_orr_hop_get(cfs_hash_t *hs, struct hlist_node *hnode)
416 {
417         struct nrs_orr_object *orro = hlist_entry(hnode,
418                                                       struct nrs_orr_object,
419                                                       oo_hnode);
420         orro->oo_ref++;
421 }
422
423 /**
424  * Removes an nrs_orr_object the hash and frees its memory, if the object has
425  * no active users.
426  */
427 static void nrs_orr_hop_put_free(cfs_hash_t *hs, struct hlist_node *hnode)
428 {
429         struct nrs_orr_object *orro = hlist_entry(hnode,
430                                                       struct nrs_orr_object,
431                                                       oo_hnode);
432         struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
433                                                    struct nrs_orr_data, od_res);
434         cfs_hash_bd_t          bd;
435
436         cfs_hash_bd_get_and_lock(hs, &orro->oo_key, &bd, 1);
437
438         if (--orro->oo_ref > 1) {
439                 cfs_hash_bd_unlock(hs, &bd, 1);
440
441                 return;
442         }
443         LASSERT(orro->oo_ref == 1);
444
445         cfs_hash_bd_del_locked(hs, &bd, hnode);
446         cfs_hash_bd_unlock(hs, &bd, 1);
447
448         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
449 }
450
451 static void nrs_orr_hop_put(cfs_hash_t *hs, struct hlist_node *hnode)
452 {
453         struct nrs_orr_object *orro = hlist_entry(hnode,
454                                                       struct nrs_orr_object,
455                                                       oo_hnode);
456         orro->oo_ref--;
457 }
458
459 static int nrs_trr_hop_keycmp(const void *key, struct hlist_node *hnode)
460 {
461         struct nrs_orr_object *orro = hlist_entry(hnode,
462                                                       struct nrs_orr_object,
463                                                       oo_hnode);
464
465         return orro->oo_key.ok_idx == ((struct nrs_orr_key *)key)->ok_idx;
466 }
467
468 static void nrs_trr_hop_exit(cfs_hash_t *hs, struct hlist_node *hnode)
469 {
470         struct nrs_orr_object *orro = hlist_entry(hnode,
471                                                       struct nrs_orr_object,
472                                                       oo_hnode);
473         struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
474                                                    struct nrs_orr_data, od_res);
475
476         LASSERTF(orro->oo_ref == 0,
477                  "Busy NRS TRR policy object for OST with index %u, with %ld "
478                  "refs\n", orro->oo_key.ok_idx, orro->oo_ref);
479
480         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
481 }
482
483 static cfs_hash_ops_t nrs_orr_hash_ops = {
484         .hs_hash        = nrs_orr_hop_hash,
485         .hs_key         = nrs_orr_hop_key,
486         .hs_keycmp      = nrs_orr_hop_keycmp,
487         .hs_object      = nrs_orr_hop_object,
488         .hs_get         = nrs_orr_hop_get,
489         .hs_put         = nrs_orr_hop_put_free,
490         .hs_put_locked  = nrs_orr_hop_put,
491 };
492
493 static cfs_hash_ops_t nrs_trr_hash_ops = {
494         .hs_hash        = nrs_orr_hop_hash,
495         .hs_key         = nrs_orr_hop_key,
496         .hs_keycmp      = nrs_trr_hop_keycmp,
497         .hs_object      = nrs_orr_hop_object,
498         .hs_get         = nrs_orr_hop_get,
499         .hs_put         = nrs_orr_hop_put,
500         .hs_put_locked  = nrs_orr_hop_put,
501         .hs_exit        = nrs_trr_hop_exit,
502 };
503
504 #define NRS_ORR_QUANTUM_DFLT    256
505
506 /**
507  * Binary heap predicate.
508  *
509  * Uses
510  * ptlrpc_nrs_request::nr_u::orr::or_round,
511  * ptlrpc_nrs_request::nr_u::orr::or_sequence, and
512  * ptlrpc_nrs_request::nr_u::orr::or_range to compare two binheap nodes and
513  * produce a binary predicate that indicates their relative priority, so that
514  * the binary heap can perform the necessary sorting operations.
515  *
516  * \param[in] e1 the first binheap node to compare
517  * \param[in] e2 the second binheap node to compare
518  *
519  * \retval 0 e1 > e2
520  * \retval 1 e1 < e2
521  */
522 static int orr_req_compare(cfs_binheap_node_t *e1, cfs_binheap_node_t *e2)
523 {
524         struct ptlrpc_nrs_request *nrq1;
525         struct ptlrpc_nrs_request *nrq2;
526
527         nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
528         nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
529
530         /**
531          * Requests have been scheduled against a different scheduling round.
532          */
533         if (nrq1->nr_u.orr.or_round < nrq2->nr_u.orr.or_round)
534                 return 1;
535         else if (nrq1->nr_u.orr.or_round > nrq2->nr_u.orr.or_round)
536                 return 0;
537
538         /**
539          * Requests have been scheduled against the same scheduling round, but
540          * belong to a different batch, i.e. they pertain to a different
541          * backend-fs object (for ORR policy instances) or OST (for TRR policy
542          * instances).
543          */
544         if (nrq1->nr_u.orr.or_sequence < nrq2->nr_u.orr.or_sequence)
545                 return 1;
546         else if (nrq1->nr_u.orr.or_sequence > nrq2->nr_u.orr.or_sequence)
547                 return 0;
548
549         /**
550          * If round numbers and sequence numbers are equal, the two requests
551          * have been scheduled on the same round, and belong to the same batch,
552          * which means they pertain to the same backend-fs object (if this is an
553          * ORR policy instance), or to the same OST (if this is a TRR policy
554          * instance), so these requests should be sorted by ascending offset
555          * order.
556          */
557         if (nrq1->nr_u.orr.or_range.or_start <
558             nrq2->nr_u.orr.or_range.or_start) {
559                 return 1;
560         } else if (nrq1->nr_u.orr.or_range.or_start >
561                  nrq2->nr_u.orr.or_range.or_start) {
562                 return 0;
563         } else {
564                 /**
565                  * Requests start from the same offset; Dispatch the shorter one
566                  * first; perhaps slightly more chances of hitting caches like
567                  * this.
568                  */
569                 return nrq1->nr_u.orr.or_range.or_end <
570                        nrq2->nr_u.orr.or_range.or_end;
571         }
572 }
573
574 /**
575  * ORR binary heap operations
576  */
577 static cfs_binheap_ops_t nrs_orr_heap_ops = {
578         .hop_enter      = NULL,
579         .hop_exit       = NULL,
580         .hop_compare    = orr_req_compare,
581 };
582
583 /**
584  * Prints a warning message if an ORR/TRR policy is started on a service with
585  * more than one CPT.  Not printed on the console for now, since we don't
586  * have any performance metrics in the first place, and it is annoying.
587  *
588  * \param[in] policy the policy instance
589  *
590  * \retval 0 success
591  */
592 static int nrs_orr_init(struct ptlrpc_nrs_policy *policy)
593 {
594         if (policy->pol_nrs->nrs_svcpt->scp_service->srv_ncpts > 1)
595                 CDEBUG(D_CONFIG, "%s: The %s NRS policy was registered on a "
596                       "service with multiple service partitions. This policy "
597                       "may perform better with a single partition.\n",
598                       policy->pol_nrs->nrs_svcpt->scp_service->srv_name,
599                       policy->pol_desc->pd_name);
600
601         return 0;
602 }
603
604 /**
605  * Called when an ORR policy instance is started.
606  *
607  * \param[in] policy the policy
608  *
609  * \retval -ENOMEM OOM error
610  * \retval 0       success
611  */
612 static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg)
613 {
614         struct nrs_orr_data    *orrd;
615         cfs_hash_ops_t         *ops;
616         unsigned                cur_bits;
617         unsigned                max_bits;
618         unsigned                bkt_bits;
619         unsigned                flags;
620         int                     rc = 0;
621         ENTRY;
622
623         OBD_CPT_ALLOC_PTR(orrd, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
624         if (orrd == NULL)
625                 RETURN(-ENOMEM);
626
627         /*
628          * Binary heap instance for sorted incoming requests.
629          */
630         orrd->od_binheap = cfs_binheap_create(&nrs_orr_heap_ops,
631                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
632                                               nrs_pol2cptab(policy),
633                                               nrs_pol2cptid(policy));
634         if (orrd->od_binheap == NULL)
635                 GOTO(failed, rc = -ENOMEM);
636
637         nrs_orr_genobjname(policy, orrd->od_objname);
638
639         /**
640          * Slab cache for NRS ORR/TRR objects.
641          */
642         orrd->od_cache = kmem_cache_create(orrd->od_objname,
643                                            sizeof(struct nrs_orr_object),
644                                            0, 0, NULL);
645         if (orrd->od_cache == NULL)
646                 GOTO(failed, rc = -ENOMEM);
647
648         if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
649                     NRS_POL_NAME_MAX) == 0) {
650                 ops = &nrs_orr_hash_ops;
651                 cur_bits = NRS_ORR_BITS;
652                 max_bits = NRS_ORR_BITS;
653                 bkt_bits = NRS_ORR_BKT_BITS;
654                 flags = NRS_ORR_HASH_FLAGS;
655         } else {
656                 ops = &nrs_trr_hash_ops;
657                 cur_bits = NRS_TRR_BITS;
658                 max_bits = NRS_TRR_BITS;
659                 bkt_bits = NRS_TRR_BKT_BITS;
660                 flags = NRS_TRR_HASH_FLAGS;
661         }
662
663         /**
664          * Hash for finding objects by struct nrs_orr_key.
665          * XXX: For TRR, it might be better to avoid using libcfs_hash?
666          * All that needs to be resolved are OST indices, and they
667          * will stay relatively stable during an OSS node's lifetime.
668          */
669         orrd->od_obj_hash = cfs_hash_create(orrd->od_objname, cur_bits,
670                                             max_bits, bkt_bits, 0,
671                                             CFS_HASH_MIN_THETA,
672                                             CFS_HASH_MAX_THETA, ops, flags);
673         if (orrd->od_obj_hash == NULL)
674                 GOTO(failed, rc = -ENOMEM);
675
676         /* XXX: Fields accessed unlocked */
677         orrd->od_quantum = NRS_ORR_QUANTUM_DFLT;
678         orrd->od_supp = NOS_DFLT;
679         orrd->od_physical = true;
680         /**
681          * Set to 1 so that the test inside nrs_orr_req_add() can evaluate to
682          * true.
683          */
684         orrd->od_sequence = 1;
685
686         policy->pol_private = orrd;
687
688         RETURN(rc);
689
690 failed:
691         if (orrd->od_cache) {
692                 kmem_cache_destroy(orrd->od_cache);
693                 LASSERTF(rc == 0, "Could not destroy od_cache slab\n");
694         }
695         if (orrd->od_binheap != NULL)
696                 cfs_binheap_destroy(orrd->od_binheap);
697
698         OBD_FREE_PTR(orrd);
699
700         RETURN(rc);
701 }
702
703 /**
704  * Called when an ORR/TRR policy instance is stopped.
705  *
706  * Called when the policy has been instructed to transition to the
707  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more
708  * pending requests to serve.
709  *
710  * \param[in] policy the policy
711  */
712 static void nrs_orr_stop(struct ptlrpc_nrs_policy *policy)
713 {
714         struct nrs_orr_data *orrd = policy->pol_private;
715         ENTRY;
716
717         LASSERT(orrd != NULL);
718         LASSERT(orrd->od_binheap != NULL);
719         LASSERT(orrd->od_obj_hash != NULL);
720         LASSERT(orrd->od_cache != NULL);
721         LASSERT(cfs_binheap_is_empty(orrd->od_binheap));
722
723         cfs_binheap_destroy(orrd->od_binheap);
724         cfs_hash_putref(orrd->od_obj_hash);
725         kmem_cache_destroy(orrd->od_cache);
726
727         OBD_FREE_PTR(orrd);
728 }
729
730 /**
731  * Performs a policy-specific ctl function on ORR/TRR policy instances; similar
732  * to ioctl.
733  *
734  * \param[in]     policy the policy instance
735  * \param[in]     opc    the opcode
736  * \param[in,out] arg    used for passing parameters and information
737  *
738  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
739  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
740  *
741  * \retval 0   operation carried successfully
742  * \retval -ve error
743  */
744 int nrs_orr_ctl(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc,
745                 void *arg)
746 {
747         assert_spin_locked(&policy->pol_nrs->nrs_lock);
748
749         switch((enum nrs_ctl_orr)opc) {
750         default:
751                 RETURN(-EINVAL);
752
753         case NRS_CTL_ORR_RD_QUANTUM: {
754                 struct nrs_orr_data     *orrd = policy->pol_private;
755
756                 *(__u16 *)arg = orrd->od_quantum;
757                 }
758                 break;
759
760         case NRS_CTL_ORR_WR_QUANTUM: {
761                 struct nrs_orr_data     *orrd = policy->pol_private;
762
763                 orrd->od_quantum = *(__u16 *)arg;
764                 LASSERT(orrd->od_quantum != 0);
765                 }
766                 break;
767
768         case NRS_CTL_ORR_RD_OFF_TYPE: {
769                 struct nrs_orr_data     *orrd = policy->pol_private;
770
771                 *(bool *)arg = orrd->od_physical;
772                 }
773                 break;
774
775         case NRS_CTL_ORR_WR_OFF_TYPE: {
776                 struct nrs_orr_data     *orrd = policy->pol_private;
777
778                 orrd->od_physical = *(bool *)arg;
779                 }
780                 break;
781
782         case NRS_CTL_ORR_RD_SUPP_REQ: {
783                 struct nrs_orr_data     *orrd = policy->pol_private;
784
785                 *(enum nrs_orr_supp *)arg = orrd->od_supp;
786                 }
787                 break;
788
789         case NRS_CTL_ORR_WR_SUPP_REQ: {
790                 struct nrs_orr_data     *orrd = policy->pol_private;
791
792                 orrd->od_supp = *(enum nrs_orr_supp *)arg;
793                 LASSERT((orrd->od_supp & NOS_OST_RW) != 0);
794                 }
795                 break;
796         }
797         RETURN(0);
798 }
799
800 /**
801  * Obtains resources for ORR/TRR policy instances. The top-level resource lives
802  * inside \e nrs_orr_data and the second-level resource inside
803  * \e nrs_orr_object instances.
804  *
805  * \param[in]  policy     the policy for which resources are being taken for
806  *                        request \a nrq
807  * \param[in]  nrq        the request for which resources are being taken
808  * \param[in]  parent     parent resource, embedded in nrs_orr_data for the
809  *                        ORR/TRR policies
810  * \param[out] resp       used to return resource references
811  * \param[in]  moving_req signifies limited caller context; used to perform
812  *                        memory allocations in an atomic context in this
813  *                        policy
814  *
815  * \retval 0   we are returning a top-level, parent resource, one that is
816  *             embedded in an nrs_orr_data object
817  * \retval 1   we are returning a bottom-level resource, one that is embedded
818  *             in an nrs_orr_object object
819  *
820  * \see nrs_resource_get_safe()
821  */
822 int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
823                     struct ptlrpc_nrs_request *nrq,
824                     const struct ptlrpc_nrs_resource *parent,
825                     struct ptlrpc_nrs_resource **resp, bool moving_req)
826 {
827         struct nrs_orr_data            *orrd;
828         struct nrs_orr_object          *orro;
829         struct nrs_orr_object          *tmp;
830         struct nrs_orr_key              key = { { { 0 } } };
831         __u32                           opc;
832         int                             rc = 0;
833
834         /**
835          * struct nrs_orr_data is requested.
836          */
837         if (parent == NULL) {
838                 *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res;
839                 return 0;
840         }
841
842         orrd = container_of(parent, struct nrs_orr_data, od_res);
843
844         /**
845          * If the request type is not supported, fail the enqueuing; the RPC
846          * will be handled by the fallback NRS policy.
847          */
848         if (!nrs_orr_req_supported(orrd, nrq, &opc))
849                 return -1;
850
851         /**
852          * Fill in the key for the request; OST FID for ORR policy instances,
853          * and OST index for TRR policy instances.
854          */
855         rc = nrs_orr_key_fill(orrd, nrq, opc, policy->pol_desc->pd_name, &key);
856         if (rc < 0)
857                 RETURN(rc);
858
859         /**
860          * Set the offset range the request covers
861          */
862         rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req);
863         if (rc < 0)
864                 RETURN(rc);
865
866         orro = cfs_hash_lookup(orrd->od_obj_hash, &key);
867         if (orro != NULL)
868                 goto out;
869
870         OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache,
871                                    nrs_pol2cptab(policy), nrs_pol2cptid(policy),
872                                    moving_req ? GFP_ATOMIC : GFP_NOFS);
873         if (orro == NULL)
874                 RETURN(-ENOMEM);
875
876         orro->oo_key = key;
877         orro->oo_ref = 1;
878
879         tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key,
880                                       &orro->oo_hnode);
881         if (tmp != orro) {
882                 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
883                 orro = tmp;
884         }
885 out:
886         /**
887          * For debugging purposes
888          */
889         nrq->nr_u.orr.or_key = orro->oo_key;
890
891         *resp = &orro->oo_res;
892
893         return 1;
894 }
895
896 /**
897  * Called when releasing references to the resource hierachy obtained for a
898  * request for scheduling using ORR/TRR policy instances
899  *
900  * \param[in] policy   the policy the resource belongs to
901  * \param[in] res      the resource to be released
902  */
903 static void nrs_orr_res_put(struct ptlrpc_nrs_policy *policy,
904                             const struct ptlrpc_nrs_resource *res)
905 {
906         struct nrs_orr_data     *orrd;
907         struct nrs_orr_object   *orro;
908
909         /**
910          * Do nothing for freeing parent, nrs_orr_data resources.
911          */
912         if (res->res_parent == NULL)
913                 return;
914
915         orro = container_of(res, struct nrs_orr_object, oo_res);
916         orrd = container_of(res->res_parent, struct nrs_orr_data, od_res);
917
918         cfs_hash_put(orrd->od_obj_hash, &orro->oo_hnode);
919 }
920
921 /**
922  * Called when polling an ORR/TRR policy instance for a request so that it can
923  * be served. Returns the request that is at the root of the binary heap, as
924  * that is the lowest priority one (i.e. libcfs_heap is an implementation of a
925  * min-heap)
926  *
927  * \param[in] policy the policy instance being polled
928  * \param[in] peek   when set, signifies that we just want to examine the
929  *                   request, and not handle it, so the request is not removed
930  *                   from the policy.
931  * \param[in] force  force the policy to return a request; unused in this policy
932  *
933  * \retval the request to be handled
934  * \retval NULL no request available
935  *
936  * \see ptlrpc_nrs_req_get_nolock()
937  * \see nrs_request_get()
938  */
939 static
940 struct ptlrpc_nrs_request *nrs_orr_req_get(struct ptlrpc_nrs_policy *policy,
941                                            bool peek, bool force)
942 {
943         struct nrs_orr_data       *orrd = policy->pol_private;
944         cfs_binheap_node_t        *node = cfs_binheap_root(orrd->od_binheap);
945         struct ptlrpc_nrs_request *nrq;
946
947         nrq = unlikely(node == NULL) ? NULL :
948               container_of(node, struct ptlrpc_nrs_request, nr_node);
949
950         if (likely(!peek && nrq != NULL)) {
951                 struct nrs_orr_object *orro;
952
953                 orro = container_of(nrs_request_resource(nrq),
954                                     struct nrs_orr_object, oo_res);
955
956                 LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
957
958                 cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
959                 orro->oo_active--;
960
961                 if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
962                                  NRS_POL_NAME_MAX) == 0)
963                         CDEBUG(D_RPCTRACE,
964                                "NRS: starting to handle %s request for object "
965                                "with FID "DFID", from OST with index %u, with "
966                                "round "LPU64"\n", NRS_POL_NAME_ORR,
967                                PFID(&orro->oo_key.ok_fid),
968                                nrq->nr_u.orr.or_key.ok_idx,
969                                nrq->nr_u.orr.or_round);
970                 else
971                         CDEBUG(D_RPCTRACE,
972                                "NRS: starting to handle %s request from OST "
973                                "with index %u, with round "LPU64"\n",
974                                NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
975                                nrq->nr_u.orr.or_round);
976
977                 /** Peek at the next request to be served */
978                 node = cfs_binheap_root(orrd->od_binheap);
979
980                 /** No more requests */
981                 if (unlikely(node == NULL)) {
982                         orrd->od_round++;
983                 } else {
984                         struct ptlrpc_nrs_request *next;
985
986                         next = container_of(node, struct ptlrpc_nrs_request,
987                                             nr_node);
988
989                         if (orrd->od_round < next->nr_u.orr.or_round)
990                                 orrd->od_round = next->nr_u.orr.or_round;
991                 }
992         }
993
994         return nrq;
995 }
996
997 /**
998  * Sort-adds request \a nrq to an ORR/TRR \a policy instance's set of queued
999  * requests in the policy's binary heap.
1000  *
1001  * A scheduling round is a stream of requests that have been sorted in batches
1002  * according to the backend-fs object (for ORR policy instances) or OST (for TRR
1003  * policy instances) that they pertain to (as identified by its IDIF FID or OST
1004  * index respectively); there can be only one batch for each object or OST in
1005  * each round. The batches are of maximum size nrs_orr_data:od_quantum. When a
1006  * new request arrives for scheduling for an object or OST that has exhausted
1007  * its quantum in its current round, the request will be scheduled on the next
1008  * scheduling round. Requests are allowed to be scheduled against a round until
1009  * all requests for the round are serviced, so an object or OST might miss a
1010  * round if requests are not scheduled for it for a long enough period of time.
1011  * Objects or OSTs that miss a round will continue with having their next
1012  * request scheduled, starting at the round that requests are being dispatched
1013  * for, at the time of arrival of this request.
1014  *
1015  * Requests are tagged with the round number and a sequence number; the sequence
1016  * number indicates the relative ordering amongst the batches of requests in a
1017  * round, and is identical for all requests in a batch, as is the round number.
1018  * The round and sequence numbers are used by orr_req_compare() in order to use
1019  * nrs_orr_data::od_binheap in order to maintain an ordered set of rounds, with
1020  * each round consisting of an ordered set of batches of requests, and each
1021  * batch consisting of an ordered set of requests according to their logical
1022  * file or physical disk offsets.
1023  *
1024  * \param[in] policy the policy
1025  * \param[in] nrq    the request to add
1026  *
1027  * \retval 0    request successfully added
1028  * \retval != 0 error
1029  */
1030 static int nrs_orr_req_add(struct ptlrpc_nrs_policy *policy,
1031                            struct ptlrpc_nrs_request *nrq)
1032 {
1033         struct nrs_orr_data     *orrd;
1034         struct nrs_orr_object   *orro;
1035         int                      rc;
1036
1037         orro = container_of(nrs_request_resource(nrq),
1038                             struct nrs_orr_object, oo_res);
1039         orrd = container_of(nrs_request_resource(nrq)->res_parent,
1040                             struct nrs_orr_data, od_res);
1041
1042         if (orro->oo_quantum == 0 || orro->oo_round < orrd->od_round ||
1043             (orro->oo_active == 0 && orro->oo_quantum > 0)) {
1044
1045                 /**
1046                  * If there are no pending requests for the object/OST, but some
1047                  * of its quantum still remains unused, which implies we did not
1048                  * get a chance to schedule up to its maximum allowed batch size
1049                  * of requests in the previous round this object/OST
1050                  * participated in, schedule this next request on a new round;
1051                  * this avoids fragmentation of request batches caused by
1052                  * intermittent inactivity on the object/OST, at the expense of
1053                  * potentially slightly increased service time for the request
1054                  * batch this request will be a part of.
1055                  */
1056                 if (orro->oo_active == 0 && orro->oo_quantum > 0)
1057                         orro->oo_round++;
1058
1059                 /** A new scheduling round has commenced */
1060                 if (orro->oo_round < orrd->od_round)
1061                         orro->oo_round = orrd->od_round;
1062
1063                 /** I was not the last object/OST that scheduled a request */
1064                 if (orro->oo_sequence < orrd->od_sequence)
1065                         orro->oo_sequence = ++orrd->od_sequence;
1066                 /**
1067                  * Reset the quantum if we have reached the maximum quantum
1068                  * size for this batch, or even if we have not managed to
1069                  * complete a batch size up to its maximum allowed size.
1070                  * XXX: Accessed unlocked
1071                  */
1072                 orro->oo_quantum = orrd->od_quantum;
1073         }
1074
1075         nrq->nr_u.orr.or_round = orro->oo_round;
1076         nrq->nr_u.orr.or_sequence = orro->oo_sequence;
1077
1078         rc = cfs_binheap_insert(orrd->od_binheap, &nrq->nr_node);
1079         if (rc == 0) {
1080                 orro->oo_active++;
1081                 if (--orro->oo_quantum == 0)
1082                         orro->oo_round++;
1083         }
1084         return rc;
1085 }
1086
1087 /**
1088  * Removes request \a nrq from an ORR/TRR \a policy instance's set of queued
1089  * requests.
1090  *
1091  * \param[in] policy the policy
1092  * \param[in] nrq    the request to remove
1093  */
1094 static void nrs_orr_req_del(struct ptlrpc_nrs_policy *policy,
1095                             struct ptlrpc_nrs_request *nrq)
1096 {
1097         struct nrs_orr_data     *orrd;
1098         struct nrs_orr_object   *orro;
1099         bool                     is_root;
1100
1101         orro = container_of(nrs_request_resource(nrq),
1102                             struct nrs_orr_object, oo_res);
1103         orrd = container_of(nrs_request_resource(nrq)->res_parent,
1104                             struct nrs_orr_data, od_res);
1105
1106         LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
1107
1108         is_root = &nrq->nr_node == cfs_binheap_root(orrd->od_binheap);
1109
1110         cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
1111         orro->oo_active--;
1112
1113         /**
1114          * If we just deleted the node at the root of the binheap, we may have
1115          * to adjust round numbers.
1116          */
1117         if (unlikely(is_root)) {
1118                 /** Peek at the next request to be served */
1119                 cfs_binheap_node_t *node = cfs_binheap_root(orrd->od_binheap);
1120
1121                 /** No more requests */
1122                 if (unlikely(node == NULL)) {
1123                         orrd->od_round++;
1124                 } else {
1125                         nrq = container_of(node, struct ptlrpc_nrs_request,
1126                                            nr_node);
1127
1128                         if (orrd->od_round < nrq->nr_u.orr.or_round)
1129                                 orrd->od_round = nrq->nr_u.orr.or_round;
1130                 }
1131         }
1132 }
1133
1134 /**
1135  * Called right after the request \a nrq finishes being handled by ORR policy
1136  * instance \a policy.
1137  *
1138  * \param[in] policy the policy that handled the request
1139  * \param[in] nrq    the request that was handled
1140  */
1141 static void nrs_orr_req_stop(struct ptlrpc_nrs_policy *policy,
1142                              struct ptlrpc_nrs_request *nrq)
1143 {
1144         /** NB: resource control, credits etc can be added here */
1145         if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
1146                     NRS_POL_NAME_MAX) == 0)
1147                 CDEBUG(D_RPCTRACE,
1148                        "NRS: finished handling %s request for object with FID "
1149                        DFID", from OST with index %u, with round "LPU64"\n",
1150                        NRS_POL_NAME_ORR, PFID(&nrq->nr_u.orr.or_key.ok_fid),
1151                        nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round);
1152         else
1153                 CDEBUG(D_RPCTRACE,
1154                        "NRS: finished handling %s request from OST with index %u,"
1155                        " with round "LPU64"\n",
1156                        NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
1157                        nrq->nr_u.orr.or_round);
1158 }
1159
1160 /**
1161  * lprocfs interface
1162  */
1163
1164 #ifdef LPROCFS
1165
1166 /**
1167  * This allows to bundle the policy name into the lprocfs_vars::data pointer
1168  * so that lprocfs read/write functions can be used by both the ORR and TRR
1169  * policies.
1170  */
1171 struct nrs_lprocfs_orr_data {
1172         struct ptlrpc_service   *svc;
1173         char                    *name;
1174 } lprocfs_orr_data = {
1175         .name = NRS_POL_NAME_ORR
1176 }, lprocfs_trr_data = {
1177         .name = NRS_POL_NAME_TRR
1178 };
1179
1180 /**
1181  * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
1182  * for ORR/TRR policy instances on both the regular and high-priority NRS head
1183  * of a service, as long as a policy instance is not in the
1184  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1185  * state are skipped later by nrs_orr_ctl().
1186  *
1187  * Quantum values are in # of RPCs, and the output is in YAML format.
1188  *
1189  * For example:
1190  *
1191  *      reg_quantum:256
1192  *      hp_quantum:8
1193  *
1194  * XXX: the CRR-N version of this, ptlrpc_lprocfs_rd_nrs_crrn_quantum() is
1195  * almost identical; it can be reworked and then reused for ORR/TRR.
1196  */
1197 static int
1198 ptlrpc_lprocfs_nrs_orr_quantum_seq_show(struct seq_file *m, void *data)
1199 {
1200         struct nrs_lprocfs_orr_data *orr_data = m->private;
1201         struct ptlrpc_service       *svc = orr_data->svc;
1202         __u16                        quantum;
1203         int                          rc;
1204
1205         /**
1206          * Perform two separate calls to this as only one of the NRS heads'
1207          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
1208          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1209          */
1210         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1211                                        orr_data->name,
1212                                        NRS_CTL_ORR_RD_QUANTUM,
1213                                        true, &quantum);
1214         if (rc == 0) {
1215                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_REG "%-5d\n", quantum);
1216                 /**
1217                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1218                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1219                  */
1220         } else if (rc != -ENODEV) {
1221                 return rc;
1222         }
1223
1224         /**
1225          * We know the ost_io service which is the only one ORR/TRR policies are
1226          * compatible with, do have an HP NRS head, but it may be best to guard
1227          * against a possible change of this in the future.
1228          */
1229         if (!nrs_svc_has_hp(svc))
1230                 goto no_hp;
1231
1232         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1233                                        orr_data->name, NRS_CTL_ORR_RD_QUANTUM,
1234                                        true, &quantum);
1235         if (rc == 0) {
1236                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
1237                 /**
1238                  * Ignore -ENODEV as the high priority NRS head's policy may be
1239                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1240                  */
1241         } else if (rc != -ENODEV) {
1242                 return rc;
1243         }
1244
1245 no_hp:
1246
1247         return rc;
1248 }
1249
1250 /**
1251  * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
1252  * for ORR/TRR policy instances of a service. The user can set the quantum size
1253  * for the regular and high priority NRS head separately by specifying each
1254  * value, or both together in a single invocation.
1255  *
1256  * For example:
1257  *
1258  * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=req_quantum:64, to set the
1259  * request quantum size of the ORR policy instance on the regular NRS head of
1260  * the ost_io service to 64
1261  *
1262  * lctl set_param ost.OSS.ost_io.nrs_trr_quantum=hp_quantum:8 to set the request
1263  * quantum size of the TRR policy instance on the high priority NRS head of the
1264  * ost_io service to 8
1265  *
1266  * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=32, to set both the request
1267  * quantum size of the ORR policy instance on both the regular and the high
1268  * priority NRS head of the ost_io service to 32
1269  *
1270  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
1271  * are skipped later by nrs_orr_ctl().
1272  *
1273  * XXX: the CRR-N version of this, ptlrpc_lprocfs_wr_nrs_crrn_quantum() is
1274  * almost identical; it can be reworked and then reused for ORR/TRR.
1275  */
1276 static ssize_t
1277 ptlrpc_lprocfs_nrs_orr_quantum_seq_write(struct file *file, const char *buffer,
1278                                          size_t count, loff_t *off)
1279 {
1280         struct seq_file             *m = file->private_data;
1281         struct nrs_lprocfs_orr_data *orr_data = m->private;
1282         struct ptlrpc_service       *svc = orr_data->svc;
1283         enum ptlrpc_nrs_queue_type   queue = 0;
1284         char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
1285         char                        *val;
1286         long                         quantum_reg;
1287         long                         quantum_hp;
1288         /** lprocfs_find_named_value() modifies its argument, so keep a copy */
1289         size_t                       count_copy;
1290         int                          rc = 0;
1291         int                          rc2 = 0;
1292
1293         if (count > (sizeof(kernbuf) - 1))
1294                 return -EINVAL;
1295
1296         if (copy_from_user(kernbuf, buffer, count))
1297                 return -EFAULT;
1298
1299         kernbuf[count] = '\0';
1300
1301         count_copy = count;
1302
1303         /**
1304          * Check if the regular quantum value has been specified
1305          */
1306         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
1307                                        &count_copy);
1308         if (val != kernbuf) {
1309                 quantum_reg = simple_strtol(val, NULL, 10);
1310
1311                 queue |= PTLRPC_NRS_QUEUE_REG;
1312         }
1313
1314         count_copy = count;
1315
1316         /**
1317          * Check if the high priority quantum value has been specified
1318          */
1319         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
1320                                        &count_copy);
1321         if (val != kernbuf) {
1322                 if (!nrs_svc_has_hp(svc))
1323                         return -ENODEV;
1324
1325                 quantum_hp = simple_strtol(val, NULL, 10);
1326
1327                 queue |= PTLRPC_NRS_QUEUE_HP;
1328         }
1329
1330         /**
1331          * If none of the queues has been specified, look for a valid numerical
1332          * value
1333          */
1334         if (queue == 0) {
1335                 if (!isdigit(kernbuf[0]))
1336                         return -EINVAL;
1337
1338                 quantum_reg = simple_strtol(kernbuf, NULL, 10);
1339
1340                 queue = PTLRPC_NRS_QUEUE_REG;
1341
1342                 if (nrs_svc_has_hp(svc)) {
1343                         queue |= PTLRPC_NRS_QUEUE_HP;
1344                         quantum_hp = quantum_reg;
1345                 }
1346         }
1347
1348         if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
1349             ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
1350             (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
1351             ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
1352                 return -EINVAL;
1353
1354         /**
1355          * We change the values on regular and HP NRS heads separately, so that
1356          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1357          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1358          * started the policy on either the regular or HP NRS head; i.e. we are
1359          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1360          * only if the operation fails with -ENODEV on all heads that have been
1361          * specified by the command; if at least one operation succeeds,
1362          * success is returned.
1363          */
1364         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1365                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1366                                                orr_data->name,
1367                                                NRS_CTL_ORR_WR_QUANTUM, false,
1368                                                &quantum_reg);
1369                 if ((rc < 0 && rc != -ENODEV) ||
1370                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1371                         return rc;
1372         }
1373
1374         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1375                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1376                                                 orr_data->name,
1377                                                 NRS_CTL_ORR_WR_QUANTUM, false,
1378                                                 &quantum_hp);
1379                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1380                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1381                         return rc2;
1382         }
1383
1384         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1385 }
1386 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_quantum);
1387
1388 #define LPROCFS_NRS_OFF_NAME_REG                "reg_offset_type:"
1389 #define LPROCFS_NRS_OFF_NAME_HP                 "hp_offset_type:"
1390
1391 #define LPROCFS_NRS_OFF_NAME_PHYSICAL           "physical"
1392 #define LPROCFS_NRS_OFF_NAME_LOGICAL            "logical"
1393
1394 /**
1395  * Retrieves the offset type used by ORR/TRR policy instances on both the
1396  * regular and high-priority NRS head of a service, as long as a policy
1397  * instance is not in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state;
1398  * policy instances in this state are skipped later by nrs_orr_ctl().
1399  *
1400  * Offset type information is a (physical|logical) string, and output is
1401  * in YAML format.
1402  *
1403  * For example:
1404  *
1405  *      reg_offset_type:physical
1406  *      hp_offset_type:logical
1407  */
1408 static int
1409 ptlrpc_lprocfs_nrs_orr_offset_type_seq_show(struct seq_file *m, void *data)
1410 {
1411         struct nrs_lprocfs_orr_data *orr_data = m->private;
1412         struct ptlrpc_service       *svc = orr_data->svc;
1413         bool                         physical;
1414         int                          rc;
1415
1416         /**
1417          * Perform two separate calls to this as only one of the NRS heads'
1418          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1419          * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1420          */
1421         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1422                                        orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1423                                        true, &physical);
1424         if (rc == 0) {
1425                 seq_printf(m, LPROCFS_NRS_OFF_NAME_REG"%s\n",
1426                            physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1427                            LPROCFS_NRS_OFF_NAME_LOGICAL);
1428                 /**
1429                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1430                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1431                  */
1432         } else if (rc != -ENODEV) {
1433                 return rc;
1434         }
1435
1436         /**
1437          * We know the ost_io service which is the only one ORR/TRR policies are
1438          * compatible with, do have an HP NRS head, but it may be best to guard
1439          * against a possible change of this in the future.
1440          */
1441         if (!nrs_svc_has_hp(svc))
1442                 goto no_hp;
1443
1444         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1445                                        orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1446                                        true, &physical);
1447         if (rc == 0) {
1448                 seq_printf(m, LPROCFS_NRS_OFF_NAME_HP"%s\n",
1449                            physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1450                            LPROCFS_NRS_OFF_NAME_LOGICAL);
1451                 /**
1452                  * Ignore -ENODEV as the high priority NRS head's policy may be
1453                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1454                  */
1455         } else if (rc != -ENODEV) {
1456                 return rc;
1457         }
1458
1459 no_hp:
1460         return rc;
1461 }
1462
1463 /**
1464  * Max valid command string is the size of the labels, plus "physical" twice.
1465  * plus a separating ' '
1466  */
1467 #define LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD                                        \
1468         sizeof(LPROCFS_NRS_OFF_NAME_REG LPROCFS_NRS_OFF_NAME_PHYSICAL " "      \
1469                LPROCFS_NRS_OFF_NAME_HP LPROCFS_NRS_OFF_NAME_PHYSICAL)
1470
1471 /**
1472  * Sets the type of offsets used to order RPCs in ORR/TRR policy instances. The
1473  * user can set offset type for the regular or high priority NRS head
1474  * separately by specifying each value, or both together in a single invocation.
1475  *
1476  * For example:
1477  *
1478  * lctl set_param ost.OSS.ost_io.nrs_orr_offset_type=
1479  * reg_offset_type:physical, to enable the ORR policy instance on the regular
1480  * NRS head of the ost_io service to use physical disk offset ordering.
1481  *
1482  * lctl set_param ost.OSS.ost_io.nrs_trr_offset_type=logical, to enable the TRR
1483  * policy instances on both the regular ang high priority NRS heads of the
1484  * ost_io service to use logical file offset ordering.
1485  *
1486  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1487  * are skipped later by nrs_orr_ctl().
1488  */
1489 static ssize_t
1490 ptlrpc_lprocfs_nrs_orr_offset_type_seq_write(struct file *file,
1491                                              const char *buffer, size_t count,
1492                                              loff_t *off)
1493 {
1494         struct seq_file             *m = file->private_data;
1495         struct nrs_lprocfs_orr_data *orr_data = m->private;
1496         struct ptlrpc_service       *svc = orr_data->svc;
1497         enum ptlrpc_nrs_queue_type   queue = 0;
1498         char                         kernbuf[LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD];
1499         char                        *val_reg;
1500         char                        *val_hp;
1501         bool                         physical_reg;
1502         bool                         physical_hp;
1503         size_t                       count_copy;
1504         int                          rc = 0;
1505         int                          rc2 = 0;
1506
1507         if (count > (sizeof(kernbuf) - 1))
1508                 return -EINVAL;
1509
1510         if (copy_from_user(kernbuf, buffer, count))
1511                 return -EFAULT;
1512
1513         kernbuf[count] = '\0';
1514
1515         count_copy = count;
1516
1517         /**
1518          * Check if the regular offset type has been specified
1519          */
1520         val_reg = lprocfs_find_named_value(kernbuf,
1521                                            LPROCFS_NRS_OFF_NAME_REG,
1522                                            &count_copy);
1523         if (val_reg != kernbuf)
1524                 queue |= PTLRPC_NRS_QUEUE_REG;
1525
1526         count_copy = count;
1527
1528         /**
1529          * Check if the high priority offset type has been specified
1530          */
1531         val_hp = lprocfs_find_named_value(kernbuf, LPROCFS_NRS_OFF_NAME_HP,
1532                                           &count_copy);
1533         if (val_hp != kernbuf) {
1534                 if (!nrs_svc_has_hp(svc))
1535                         return -ENODEV;
1536
1537                 queue |= PTLRPC_NRS_QUEUE_HP;
1538         }
1539
1540         /**
1541          * If none of the queues has been specified, there may be a valid
1542          * command string at the start of the buffer.
1543          */
1544         if (queue == 0) {
1545                 queue = PTLRPC_NRS_QUEUE_REG;
1546
1547                 if (nrs_svc_has_hp(svc))
1548                         queue |= PTLRPC_NRS_QUEUE_HP;
1549         }
1550
1551         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1552                 if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1553                             sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1554                         physical_reg = true;
1555                 else if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_LOGICAL,
1556                          sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1557                         physical_reg = false;
1558                 else
1559                         return -EINVAL;
1560         }
1561
1562         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1563                 if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1564                             sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1565                         physical_hp = true;
1566                 else if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_LOGICAL,
1567                                  sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1568                         physical_hp = false;
1569                 else
1570                         return -EINVAL;
1571         }
1572
1573         /**
1574          * We change the values on regular and HP NRS heads separately, so that
1575          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1576          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1577          * started the policy on either the regular or HP NRS head; i.e. we are
1578          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1579          * only if the operation fails with -ENODEV on all heads that have been
1580          * specified by the command; if at least one operation succeeds,
1581          * success is returned.
1582          */
1583         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1584                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1585                                                orr_data->name,
1586                                                NRS_CTL_ORR_WR_OFF_TYPE, false,
1587                                                &physical_reg);
1588                 if ((rc < 0 && rc != -ENODEV) ||
1589                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1590                         return rc;
1591         }
1592
1593         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1594                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1595                                                 orr_data->name,
1596                                                 NRS_CTL_ORR_WR_OFF_TYPE, false,
1597                                                 &physical_hp);
1598                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1599                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1600                         return rc2;
1601         }
1602
1603         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1604 }
1605 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_offset_type);
1606
1607 #define NRS_LPROCFS_REQ_SUPP_NAME_REG           "reg_supported:"
1608 #define NRS_LPROCFS_REQ_SUPP_NAME_HP            "hp_supported:"
1609
1610 #define LPROCFS_NRS_SUPP_NAME_READS             "reads"
1611 #define LPROCFS_NRS_SUPP_NAME_WRITES            "writes"
1612 #define LPROCFS_NRS_SUPP_NAME_READWRITES        "reads_and_writes"
1613
1614 /**
1615  * Translates enum nrs_orr_supp values to a corresponding string.
1616  */
1617 static const char *nrs_orr_supp2str(enum nrs_orr_supp supp)
1618 {
1619         switch(supp) {
1620         default:
1621                 LBUG();
1622         case NOS_OST_READ:
1623                 return LPROCFS_NRS_SUPP_NAME_READS;
1624         case NOS_OST_WRITE:
1625                 return LPROCFS_NRS_SUPP_NAME_WRITES;
1626         case NOS_OST_RW:
1627                 return LPROCFS_NRS_SUPP_NAME_READWRITES;
1628         }
1629 }
1630
1631 /**
1632  * Translates strings to the corresponding enum nrs_orr_supp value
1633  */
1634 static enum nrs_orr_supp nrs_orr_str2supp(const char *val)
1635 {
1636         if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READWRITES,
1637                     sizeof(LPROCFS_NRS_SUPP_NAME_READWRITES) - 1) == 0)
1638                 return NOS_OST_RW;
1639         else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READS,
1640                          sizeof(LPROCFS_NRS_SUPP_NAME_READS) - 1) == 0)
1641                 return NOS_OST_READ;
1642         else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_WRITES,
1643                          sizeof(LPROCFS_NRS_SUPP_NAME_WRITES) - 1) == 0)
1644                 return NOS_OST_WRITE;
1645         else
1646                 return -EINVAL;
1647 }
1648
1649 /**
1650  * Retrieves the type of RPCs handled at the point of invocation by ORR/TRR
1651  * policy instances on both the regular and high-priority NRS head of a service,
1652  * as long as a policy instance is not in the
1653  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1654  * state are skipped later by nrs_orr_ctl().
1655  *
1656  * Supported RPC type information is a (reads|writes|reads_and_writes) string,
1657  * and output is in YAML format.
1658  *
1659  * For example:
1660  *
1661  *      reg_supported:reads
1662  *      hp_supported:reads_and_writes
1663  */
1664 static int
1665 ptlrpc_lprocfs_nrs_orr_supported_seq_show(struct seq_file *m, void *data)
1666 {
1667         struct nrs_lprocfs_orr_data *orr_data = m->private;
1668         struct ptlrpc_service       *svc = orr_data->svc;
1669         enum nrs_orr_supp            supported;
1670         int                          rc;
1671
1672         /**
1673          * Perform two separate calls to this as only one of the NRS heads'
1674          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1675          * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1676          */
1677         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1678                                        orr_data->name,
1679                                        NRS_CTL_ORR_RD_SUPP_REQ, true,
1680                                        &supported);
1681
1682         if (rc == 0) {
1683                 seq_printf(m, NRS_LPROCFS_REQ_SUPP_NAME_REG"%s\n",
1684                            nrs_orr_supp2str(supported));
1685                 /**
1686                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1687                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1688                  */
1689         } else if (rc != -ENODEV) {
1690                 return rc;
1691         }
1692
1693         /**
1694          * We know the ost_io service which is the only one ORR/TRR policies are
1695          * compatible with, do have an HP NRS head, but it may be best to guard
1696          * against a possible change of this in the future.
1697          */
1698         if (!nrs_svc_has_hp(svc))
1699                 goto no_hp;
1700
1701         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1702                                        orr_data->name,
1703                                        NRS_CTL_ORR_RD_SUPP_REQ, true,
1704                                        &supported);
1705         if (rc == 0) {
1706                 seq_printf(m, NRS_LPROCFS_REQ_SUPP_NAME_HP"%s\n",
1707                            nrs_orr_supp2str(supported));
1708                 /**
1709                  * Ignore -ENODEV as the high priority NRS head's policy may be
1710                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1711                  */
1712         } else if (rc != -ENODEV) {
1713                 return rc;
1714         }
1715
1716 no_hp:
1717
1718         return rc;
1719 }
1720
1721 /**
1722  * Max valid command string is the size of the labels, plus "reads_and_writes"
1723  * twice, plus a separating ' '
1724  */
1725 #define LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD                                        \
1726         sizeof(NRS_LPROCFS_REQ_SUPP_NAME_REG LPROCFS_NRS_SUPP_NAME_READWRITES  \
1727                NRS_LPROCFS_REQ_SUPP_NAME_HP LPROCFS_NRS_SUPP_NAME_READWRITES   \
1728                " ")
1729
1730 /**
1731  * Sets the type of RPCs handled by ORR/TRR policy instances. The user can
1732  * modify this setting for the regular or high priority NRS heads separately, or
1733  * both together in a single invocation.
1734  *
1735  * For example:
1736  *
1737  * lctl set_param ost.OSS.ost_io.nrs_orr_supported=
1738  * "reg_supported:reads", to enable the ORR policy instance on the regular NRS
1739  * head of the ost_io service to handle OST_READ RPCs.
1740  *
1741  * lctl set_param ost.OSS.ost_io.nrs_trr_supported=reads_and_writes, to enable
1742  * the TRR policy instances on both the regular ang high priority NRS heads of
1743  * the ost_io service to use handle OST_READ and OST_WRITE RPCs.
1744  *
1745  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1746  * are skipped later by nrs_orr_ctl().
1747  */
1748 static ssize_t
1749 ptlrpc_lprocfs_nrs_orr_supported_seq_write(struct file *file,
1750                                            const char *buffer, size_t count,
1751                                            loff_t *off)
1752 {
1753         struct seq_file             *m = file->private_data;
1754         struct nrs_lprocfs_orr_data *orr_data = m->private;
1755         struct ptlrpc_service       *svc = orr_data->svc;
1756         enum ptlrpc_nrs_queue_type   queue = 0;
1757         char                         kernbuf[LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD];
1758         char                        *val_reg;
1759         char                        *val_hp;
1760         enum nrs_orr_supp            supp_reg;
1761         enum nrs_orr_supp            supp_hp;
1762         size_t                       count_copy;
1763         int                          rc = 0;
1764         int                          rc2 = 0;
1765
1766         if (count > (sizeof(kernbuf) - 1))
1767                 return -EINVAL;
1768
1769         if (copy_from_user(kernbuf, buffer, count))
1770                 return -EFAULT;
1771
1772         kernbuf[count] = '\0';
1773
1774         count_copy = count;
1775
1776         /**
1777          * Check if the regular supported requests setting has been specified
1778          */
1779         val_reg = lprocfs_find_named_value(kernbuf,
1780                                            NRS_LPROCFS_REQ_SUPP_NAME_REG,
1781                                            &count_copy);
1782         if (val_reg != kernbuf)
1783                 queue |= PTLRPC_NRS_QUEUE_REG;
1784
1785         count_copy = count;
1786
1787         /**
1788          * Check if the high priority supported requests setting has been
1789          * specified
1790          */
1791         val_hp = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_REQ_SUPP_NAME_HP,
1792                                           &count_copy);
1793         if (val_hp != kernbuf) {
1794                 if (!nrs_svc_has_hp(svc))
1795                         return -ENODEV;
1796
1797                 queue |= PTLRPC_NRS_QUEUE_HP;
1798         }
1799
1800         /**
1801          * If none of the queues has been specified, there may be a valid
1802          * command string at the start of the buffer.
1803          */
1804         if (queue == 0) {
1805                 queue = PTLRPC_NRS_QUEUE_REG;
1806
1807                 if (nrs_svc_has_hp(svc))
1808                         queue |= PTLRPC_NRS_QUEUE_HP;
1809         }
1810
1811         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1812                 supp_reg = nrs_orr_str2supp(val_reg);
1813                 if (supp_reg == -EINVAL)
1814                         return -EINVAL;
1815         }
1816
1817         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1818                 supp_hp = nrs_orr_str2supp(val_hp);
1819                 if (supp_hp == -EINVAL)
1820                         return -EINVAL;
1821         }
1822
1823         /**
1824          * We change the values on regular and HP NRS heads separately, so that
1825          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1826          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1827          * started the policy on either the regular or HP NRS head; i.e. we are
1828          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1829          * only if the operation fails with -ENODEV on all heads that have been
1830          * specified by the command; if at least one operation succeeds,
1831          * success is returned.
1832          */
1833         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1834                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1835                                                orr_data->name,
1836                                                NRS_CTL_ORR_WR_SUPP_REQ, false,
1837                                                &supp_reg);
1838                 if ((rc < 0 && rc != -ENODEV) ||
1839                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1840                         return rc;
1841         }
1842
1843         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1844                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1845                                                 orr_data->name,
1846                                                 NRS_CTL_ORR_WR_SUPP_REQ, false,
1847                                                 &supp_hp);
1848                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1849                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1850                         return rc2;
1851         }
1852
1853         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1854 }
1855 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_supported);
1856
1857 int nrs_orr_lprocfs_init(struct ptlrpc_service *svc)
1858 {
1859         int     i;
1860
1861         struct lprocfs_seq_vars nrs_orr_lprocfs_vars[] = {
1862                 { .name         = "nrs_orr_quantum",
1863                   .fops         = &ptlrpc_lprocfs_nrs_orr_quantum_fops  },
1864                 { .name         = "nrs_orr_offset_type",
1865                   .fops         = &ptlrpc_lprocfs_nrs_orr_offset_type_fops },
1866                 { .name         = "nrs_orr_supported",
1867                   .fops         = &ptlrpc_lprocfs_nrs_orr_supported_fops },
1868                 { NULL }
1869         };
1870
1871         if (svc->srv_procroot == NULL)
1872                 return 0;
1873
1874         lprocfs_orr_data.svc = svc;
1875
1876         for (i = 0; i < ARRAY_SIZE(nrs_orr_lprocfs_vars); i++)
1877                 nrs_orr_lprocfs_vars[i].data = &lprocfs_orr_data;
1878
1879         return lprocfs_seq_add_vars(svc->srv_procroot, nrs_orr_lprocfs_vars, NULL);
1880 }
1881
1882 void nrs_orr_lprocfs_fini(struct ptlrpc_service *svc)
1883 {
1884         if (svc->srv_procroot == NULL)
1885                 return;
1886
1887         lprocfs_remove_proc_entry("nrs_orr_quantum", svc->srv_procroot);
1888         lprocfs_remove_proc_entry("nrs_orr_offset_type", svc->srv_procroot);
1889         lprocfs_remove_proc_entry("nrs_orr_supported", svc->srv_procroot);
1890 }
1891
1892 #endif /* LPROCFS */
1893
1894 static const struct ptlrpc_nrs_pol_ops nrs_orr_ops = {
1895         .op_policy_init         = nrs_orr_init,
1896         .op_policy_start        = nrs_orr_start,
1897         .op_policy_stop         = nrs_orr_stop,
1898         .op_policy_ctl          = nrs_orr_ctl,
1899         .op_res_get             = nrs_orr_res_get,
1900         .op_res_put             = nrs_orr_res_put,
1901         .op_req_get             = nrs_orr_req_get,
1902         .op_req_enqueue         = nrs_orr_req_add,
1903         .op_req_dequeue         = nrs_orr_req_del,
1904         .op_req_stop            = nrs_orr_req_stop,
1905 #ifdef LPROCFS
1906         .op_lprocfs_init        = nrs_orr_lprocfs_init,
1907         .op_lprocfs_fini        = nrs_orr_lprocfs_fini,
1908 #endif
1909 };
1910
1911 struct ptlrpc_nrs_pol_conf nrs_conf_orr = {
1912         .nc_name                = NRS_POL_NAME_ORR,
1913         .nc_ops                 = &nrs_orr_ops,
1914         .nc_compat              = nrs_policy_compat_one,
1915         .nc_compat_svc_name     = "ost_io",
1916 };
1917
1918 /**
1919  * TRR, Target-based Round Robin policy
1920  *
1921  * TRR reuses much of the functions and data structures of ORR
1922  */
1923
1924 #ifdef LPROCFS
1925
1926 int nrs_trr_lprocfs_init(struct ptlrpc_service *svc)
1927 {
1928         int     rc;
1929         int     i;
1930
1931         struct lprocfs_seq_vars nrs_trr_lprocfs_vars[] = {
1932                 { .name         = "nrs_trr_quantum",
1933                   .fops         = &ptlrpc_lprocfs_nrs_orr_quantum_fops },
1934                 { .name         = "nrs_trr_offset_type",
1935                   .fops         = &ptlrpc_lprocfs_nrs_orr_offset_type_fops },
1936                 { .name         = "nrs_trr_supported",
1937                   .fops         = &ptlrpc_lprocfs_nrs_orr_supported_fops },
1938                 { NULL }
1939         };
1940
1941         if (svc->srv_procroot == NULL)
1942                 return 0;
1943
1944         lprocfs_trr_data.svc = svc;
1945
1946         for (i = 0; i < ARRAY_SIZE(nrs_trr_lprocfs_vars); i++)
1947                 nrs_trr_lprocfs_vars[i].data = &lprocfs_trr_data;
1948
1949         rc = lprocfs_seq_add_vars(svc->srv_procroot, nrs_trr_lprocfs_vars, NULL);
1950
1951         return rc;
1952 }
1953
1954 void nrs_trr_lprocfs_fini(struct ptlrpc_service *svc)
1955 {
1956         if (svc->srv_procroot == NULL)
1957                 return;
1958
1959         lprocfs_remove_proc_entry("nrs_trr_quantum", svc->srv_procroot);
1960         lprocfs_remove_proc_entry("nrs_trr_offset_type", svc->srv_procroot);
1961         lprocfs_remove_proc_entry("nrs_trr_supported", svc->srv_procroot);
1962 }
1963
1964 #endif /* LPROCFS */
1965
1966 /**
1967  * Reuse much of the ORR functionality for TRR.
1968  */
1969 static const struct ptlrpc_nrs_pol_ops nrs_trr_ops = {
1970         .op_policy_init         = nrs_orr_init,
1971         .op_policy_start        = nrs_orr_start,
1972         .op_policy_stop         = nrs_orr_stop,
1973         .op_policy_ctl          = nrs_orr_ctl,
1974         .op_res_get             = nrs_orr_res_get,
1975         .op_res_put             = nrs_orr_res_put,
1976         .op_req_get             = nrs_orr_req_get,
1977         .op_req_enqueue         = nrs_orr_req_add,
1978         .op_req_dequeue         = nrs_orr_req_del,
1979         .op_req_stop            = nrs_orr_req_stop,
1980 #ifdef LPROCFS
1981         .op_lprocfs_init        = nrs_trr_lprocfs_init,
1982         .op_lprocfs_fini        = nrs_trr_lprocfs_fini,
1983 #endif
1984 };
1985
1986 struct ptlrpc_nrs_pol_conf nrs_conf_trr = {
1987         .nc_name                = NRS_POL_NAME_TRR,
1988         .nc_ops                 = &nrs_trr_ops,
1989         .nc_compat              = nrs_policy_compat_one,
1990         .nc_compat_svc_name     = "ost_io",
1991 };
1992
1993 /** @} ORR/TRR policy */
1994
1995 /** @} nrs */
1996
1997 #endif /* HAVE_SERVER_SUPPORT */