Whamcloud - gitweb
LU-398 ptlrpc: Add the NRS ORR and TRR policies
[fs/lustre-release.git] / lustre / ptlrpc / nrs_orr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2011 Intel Corporation
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs_orr.c
29  *
30  * Network Request Scheduler (NRS) ORR and TRR policies
31  *
32  * Request scheduling in a Round-Robin manner over backend-fs objects and OSTs
33  * respectively
34  *
35  * Author: Liang Zhen <liang@whamcloud.com>
36  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
37  */
38 #ifdef HAVE_SERVER_SUPPORT
39
40 /**
41  * \addtogoup nrs
42  * @{
43  */
44 #define DEBUG_SUBSYSTEM S_RPC
45 #include <obd_support.h>
46 #include <obd_class.h>
47 #include <lustre_net.h>
48 #include <lustre/lustre_idl.h>
49 #include <lustre_req_layout.h>
50 #include "ptlrpc_internal.h"
51
52 /**
53  * \name ORR/TRR policy
54  *
55  * ORR/TRR (Object-based Round Robin/Target-based Round Robin) NRS policies
56  *
57  * ORR performs batched Round Robin shceduling of brw RPCs, based on the FID of
58  * the backend-fs object that the brw RPC pertains to; the TRR policy performs
59  * batched Round Robin scheduling of brw RPCs, based on the OST index that the
60  * RPC pertains to. Both policies also order RPCs in each batch in ascending
61  * offset order, which is lprocfs-tunable between logical file offsets, and
62  * physical disk offsets, as reported by fiemap.
63  *
64  * The TRR policy reuses much of the functionality of ORR. These two scheduling
65  * algorithms could alternatively be implemented under a single NRS policy, that
66  * uses an lprocfs tunable in order to switch between the two types of
67  * scheduling behaviour. The two algorithms have been implemented as separate
68  * policies for reasons of clarity to the user, and to avoid issues that would
69  * otherwise arise at the point of switching between behaviours in the case of
70  * having a single policy, such as resource cleanup for nrs_orr_object
71  * instances. It is possible that this may need to be re-examined in the future,
72  * along with potentially coalescing other policies that perform batched request
73  * scheduling in a Round-Robin manner, all into one policy.
74  *
75  * @{
76  */
77
78 #define NRS_POL_NAME_ORR        "orr"
79 #define NRS_POL_NAME_TRR        "trr"
80
81 /**
82  * Checks if the RPC type of \a nrq is currently handled by an ORR/TRR policy
83  *
84  * \param[in]  orrd   the ORR/TRR policy scheduler instance
85  * \param[in]  nrq    the request
86  * \param[out] opcode the opcode is saved here, just in order to avoid calling
87  *                    lustre_msg_get_opc() again later
88  *
89  * \retval true  request type is supported by the policy instance
90  * \retval false request type is not supported by the policy instance
91  */
92 static bool nrs_orr_req_supported(struct nrs_orr_data *orrd,
93                                   struct ptlrpc_nrs_request *nrq, __u32 *opcode)
94 {
95         struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
96                                                    rq_nrq);
97         __u32                   opc = lustre_msg_get_opc(req->rq_reqmsg);
98         bool                    rc = false;
99
100         /**
101          * XXX: nrs_orr_data::od_supp accessed unlocked.
102          */
103         switch (opc) {
104         case OST_READ:
105                 rc = orrd->od_supp & NOS_OST_READ;
106                 break;
107         case OST_WRITE:
108                 rc = orrd->od_supp & NOS_OST_WRITE;
109                 break;
110         }
111
112         if (rc)
113                 *opcode = opc;
114
115         return rc;
116 }
117
118 /**
119  * Returns the ORR/TRR key fields for the request \a nrq in \a key.
120  *
121  * \param[in]  orrd the ORR/TRR policy scheduler instance
122  * \param[in]  nrq  the request
123  * \param[in]  opc  the request's opcode
124  * \param[in]  name the policy name
125  * \param[out] key  fields of the key are returned here.
126  *
127  * \retval 0   key filled successfully
128  * \retval < 0 error
129  */
130 static int nrs_orr_key_fill(struct nrs_orr_data *orrd,
131                             struct ptlrpc_nrs_request *nrq, __u32 opc,
132                             char *name, struct nrs_orr_key *key)
133 {
134         struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
135                                                    rq_nrq);
136         struct ost_body        *body;
137         __u32                   ost_idx;
138         bool                    is_orr = strncmp(name, NRS_POL_NAME_ORR,
139                                                  NRS_POL_NAME_MAX) == 0;
140
141         LASSERT(req != NULL);
142
143         /**
144          * This is an attempt to fill in the request key fields while
145          * moving a request from the regular to the high-priority NRS
146          * head (via ldlm_lock_reorder_req()), but the request key has
147          * been adequately filled when nrs_orr_res_get() was called through
148          * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR/TRR
149          * policy, so there is nothing to do.
150          */
151         if ((is_orr && nrq->nr_u.orr.or_orr_set) ||
152             (!is_orr && nrq->nr_u.orr.or_trr_set)) {
153                 *key = nrq->nr_u.orr.or_key;
154                 return 0;
155         }
156
157         if (nrq->nr_u.orr.or_orr_set || nrq->nr_u.orr.or_trr_set)
158                 memset(&nrq->nr_u.orr.or_key, 0, sizeof(nrq->nr_u.orr.or_key));
159
160         ost_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
161
162         if (is_orr) {
163                 int     rc;
164                 /**
165                  * The request pill for OST_READ and OST_WRITE requests is
166                  * initialized in the ost_io service's
167                  * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(),
168                  * so no need to redo it here.
169                  */
170                 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
171                 if (body == NULL)
172                         RETURN(-EFAULT);
173
174                 rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx);
175                 if (rc < 0)
176                         return rc;
177
178                 nrq->nr_u.orr.or_orr_set = 1;
179         } else {
180                 key->ok_idx = ost_idx;
181                 nrq->nr_u.orr.or_trr_set = 1;
182         }
183
184         return 0;
185 }
186
187 /**
188  * Populates the range values in \a range with logical offsets obtained via
189  * \a nb.
190  *
191  * \param[in]  nb       niobuf_remote struct array for this request
192  * \param[in]  niocount count of niobuf_remote structs for this request
193  * \param[out] range    the offset range is returned here
194  */
195 static void nrs_orr_range_fill_logical(struct niobuf_remote *nb, int niocount,
196                                        struct nrs_orr_req_range *range)
197 {
198         /* Should we do this at page boundaries ? */
199         range->or_start = nb[0].offset & CFS_PAGE_MASK;
200         range->or_end = (nb[niocount - 1].offset +
201                          nb[niocount - 1].len - 1) | ~CFS_PAGE_MASK;
202 }
203
204 /**
205  * We obtain information just for a single extent, as the request can only be in
206  * a single place in the binary heap anyway.
207  */
208 #define ORR_NUM_EXTENTS 1
209
210 /**
211  * Converts the logical file offset range in \a range, to a physical disk offset
212  * range in \a range, for a request. Uses obd_get_info() in order to carry out a
213  * fiemap call and obtain backend-fs extent information. The returned range is
214  * in physical block numbers.
215  *
216  * \param[in]     nrq   the request
217  * \param[in]     oa    obdo struct for this request
218  * \param[in,out] range the offset range in bytes; logical range in, physical
219  *                      range out
220  *
221  * \retval 0    physical offsets obtained successfully
222  * \retvall < 0 error
223  */
224 static int nrs_orr_range_fill_physical(struct ptlrpc_nrs_request *nrq,
225                                        struct obdo *oa,
226                                        struct nrs_orr_req_range *range)
227 {
228         struct ptlrpc_request     *req = container_of(nrq,
229                                                       struct ptlrpc_request,
230                                                       rq_nrq);
231         char                       fiemap_buf[offsetof(struct ll_user_fiemap,
232                                                   fm_extents[ORR_NUM_EXTENTS])];
233         struct ll_user_fiemap     *fiemap = (struct ll_user_fiemap *)fiemap_buf;
234         struct ll_fiemap_info_key  key;
235         loff_t                     start;
236         loff_t                     end;
237         int                        rc;
238
239         key = (typeof(key)) {
240                 .name = KEY_FIEMAP,
241                 .oa = *oa,
242                 .fiemap = {
243                         .fm_start = range->or_start,
244                         .fm_length = range->or_end - range->or_start,
245                         .fm_extent_count = ORR_NUM_EXTENTS
246                 }
247         };
248
249         rc = obd_get_info(req->rq_svc_thread->t_env, req->rq_export,
250                           sizeof(key), &key, NULL, fiemap, NULL);
251         if (rc < 0)
252                 GOTO(out, rc);
253
254         if (fiemap->fm_mapped_extents == 0 ||
255             fiemap->fm_mapped_extents > ORR_NUM_EXTENTS)
256                 GOTO(out, rc = -EFAULT);
257
258         /**
259          * Calculate the physical offset ranges for the request from the extent
260          * information and the logical request offsets.
261          */
262         start = fiemap->fm_extents[0].fe_physical + range->or_start -
263                 fiemap->fm_extents[0].fe_logical;
264         end = start + range->or_end - range->or_start;
265
266         range->or_start = start;
267         range->or_end = end;
268
269         nrq->nr_u.orr.or_physical_set = 1;
270 out:
271         return rc;
272 }
273
274 /**
275  * Sets the offset range the request covers; either in logical file
276  * offsets or in physical disk offsets.
277  *
278  * \param[in] nrq        the request
279  * \param[in] orrd       the ORR/TRR policy scheduler instance
280  * \param[in] opc        the request's opcode
281  * \param[in] moving_req is the request in the process of moving onto the
282  *                       high-priority NRS head?
283  *
284  * \retval 0    range filled successfully
285  * \retval != 0 error
286  */
287 static int nrs_orr_range_fill(struct ptlrpc_nrs_request *nrq,
288                               struct nrs_orr_data *orrd, __u32 opc,
289                               bool moving_req)
290 {
291         struct ptlrpc_request       *req = container_of(nrq,
292                                                         struct ptlrpc_request,
293                                                         rq_nrq);
294         struct obd_ioobj            *ioo;
295         struct niobuf_remote        *nb;
296         struct ost_body             *body;
297         struct nrs_orr_req_range     range;
298         int                          niocount;
299         int                          rc = 0;
300
301         /**
302          * If we are scheduling using physical disk offsets, but we have filled
303          * the offset information in the request previously
304          * (i.e. ldlm_lock_reorder_req() is moving the request to the
305          * high-priority NRS head), there is no need to do anything, and we can
306          * exit. Moreover than the lack of need, we would be unable to perform
307          * the obd_get_info() call required in nrs_orr_range_fill_physical(),
308          * because ldlm_lock_reorder_lock() calls into here while holding a
309          * spinlock, and retrieving fiemap information via obd_get_info() is a
310          * potentially sleeping operation.
311          */
312         if (orrd->od_physical && nrq->nr_u.orr.or_physical_set)
313                 return 0;
314
315         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
316         if (ioo == NULL)
317                 GOTO(out, rc = -EFAULT);
318
319         niocount = ioo->ioo_bufcnt;
320
321         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
322         if (nb == NULL)
323                 GOTO(out, rc = -EFAULT);
324
325         /**
326          * Use logical information from niobuf_remote structures.
327          */
328         nrs_orr_range_fill_logical(nb, niocount, &range);
329
330         /**
331          * Obtain physical offsets if selected, and this is an OST_READ RPC
332          * RPC. We do not enter this block if moving_req is set which indicates
333          * that the request is being moved to the high-priority NRS head by
334          * ldlm_lock_reorder_req(), as that function calls in here while holding
335          * a spinlock, and nrs_orr_range_physical() can sleep, so we just use
336          * logical file offsets for the range values for such requests.
337          */
338         if (orrd->od_physical && opc == OST_READ && !moving_req) {
339                 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
340                 if (body == NULL)
341                         GOTO(out, rc = -EFAULT);
342
343                 /**
344                  * Translate to physical block offsets from backend filesystem
345                  * extents.
346                  * Ignore return values; if obtaining the physical offsets
347                  * fails, use the logical offsets.
348                  */
349                 nrs_orr_range_fill_physical(nrq, &body->oa, &range);
350         }
351
352         nrq->nr_u.orr.or_range = range;
353 out:
354         return rc;
355 }
356
357 /**
358  * Generates a character string that can be used in order to register uniquely
359  * named libcfs_hash and slab objects for ORR/TRR policy instances. The
360  * character string is unique per policy instance, as it includes the policy's
361  * name, the CPT number, and a {reg|hp} token, and there is one policy instance
362  * per NRS head on each CPT, and the policy is only compatible with the ost_io
363  * service.
364  *
365  * \param[in] policy the policy instance
366  * \param[out] name  the character array that will hold the generated name
367  */
368 static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name)
369 {
370         snprintf(name, NRS_ORR_OBJ_NAME_MAX, "%s%s%s%d",
371                  "nrs_", policy->pol_desc->pd_name,
372                  policy->pol_nrs->nrs_queue_type == PTLRPC_NRS_QUEUE_REG ?
373                  "_reg_" : "_hp_", nrs_pol2cptid(policy));
374 }
375
376 /**
377  * ORR/TRR hash operations
378  */
379 #define NRS_ORR_BITS            24
380 #define NRS_ORR_BKT_BITS        12
381 #define NRS_ORR_HASH_FLAGS      (CFS_HASH_RW_BKTLOCK | CFS_HASH_ASSERT_EMPTY)
382
383 #define NRS_TRR_BITS            4
384 #define NRS_TRR_BKT_BITS        2
385 #define NRS_TRR_HASH_FLAGS      CFS_HASH_RW_BKTLOCK
386
387 static unsigned nrs_orr_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask)
388 {
389         return cfs_hash_djb2_hash(key, sizeof(struct nrs_orr_key), mask);
390 }
391
392 static void *nrs_orr_hop_key(cfs_hlist_node_t *hnode)
393 {
394         struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
395                                                       struct nrs_orr_object,
396                                                       oo_hnode);
397         return &orro->oo_key;
398 }
399
400 static int nrs_orr_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
401 {
402         struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
403                                                       struct nrs_orr_object,
404                                                       oo_hnode);
405
406         return lu_fid_eq(&orro->oo_key.ok_fid,
407                          &((struct nrs_orr_key *)key)->ok_fid);
408 }
409
410 static void *nrs_orr_hop_object(cfs_hlist_node_t *hnode)
411 {
412         return cfs_hlist_entry(hnode, struct nrs_orr_object, oo_hnode);
413 }
414
415 static void nrs_orr_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
416 {
417         struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
418                                                       struct nrs_orr_object,
419                                                       oo_hnode);
420         cfs_atomic_inc(&orro->oo_ref);
421 }
422
423 /**
424  * Removes an nrs_orr_object the hash and frees its memory, if the object has
425  * no active users.
426  */
427 static void nrs_orr_hop_put_free(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
428 {
429         struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
430                                                       struct nrs_orr_object,
431                                                       oo_hnode);
432         struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
433                                                    struct nrs_orr_data, od_res);
434         cfs_hash_bd_t          bds[2];
435
436         if (cfs_atomic_dec_return(&orro->oo_ref) > 1)
437                 return;
438
439         cfs_hash_lock(hs, 0);
440         cfs_hash_dual_bd_get_and_lock(hs, &orro->oo_key, bds, 1);
441
442         /**
443          * Another thread may have won the race and taken a reference on the
444          * nrs_orr_object.
445          */
446         if (cfs_atomic_read(&orro->oo_ref) > 1)
447                 goto lost_race;
448
449         if (bds[1].bd_bucket == NULL)
450                 cfs_hash_bd_del_locked(hs, &bds[0], hnode);
451         else
452                 hnode = cfs_hash_dual_bd_finddel_locked(hs, bds, &orro->oo_key,
453                                                         hnode);
454         LASSERT(hnode != NULL);
455
456         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
457
458 lost_race:
459
460         cfs_hash_dual_bd_unlock(hs, bds, 1);
461         cfs_hash_unlock(hs, 0);
462 }
463
464 static void nrs_orr_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
465 {
466         struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
467                                                       struct nrs_orr_object,
468                                                       oo_hnode);
469         cfs_atomic_dec(&orro->oo_ref);
470 }
471
472 static int nrs_trr_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
473 {
474         struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
475                                                       struct nrs_orr_object,
476                                                       oo_hnode);
477
478         return orro->oo_key.ok_idx == ((struct nrs_orr_key *)key)->ok_idx;
479 }
480
481 static void nrs_trr_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
482 {
483         struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
484                                                       struct nrs_orr_object,
485                                                       oo_hnode);
486         struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
487                                                    struct nrs_orr_data, od_res);
488
489         LASSERTF(cfs_atomic_read(&orro->oo_ref) == 0,
490                  "Busy NRS TRR policy object for OST with index %u, with %d "
491                  "refs\n", orro->oo_key.ok_idx, cfs_atomic_read(&orro->oo_ref));
492
493         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
494 }
495
496 static cfs_hash_ops_t nrs_orr_hash_ops = {
497         .hs_hash        = nrs_orr_hop_hash,
498         .hs_key         = nrs_orr_hop_key,
499         .hs_keycmp      = nrs_orr_hop_keycmp,
500         .hs_object      = nrs_orr_hop_object,
501         .hs_get         = nrs_orr_hop_get,
502         .hs_put         = nrs_orr_hop_put_free,
503         .hs_put_locked  = nrs_orr_hop_put,
504 };
505
506 static cfs_hash_ops_t nrs_trr_hash_ops = {
507         .hs_hash        = nrs_orr_hop_hash,
508         .hs_key         = nrs_orr_hop_key,
509         .hs_keycmp      = nrs_trr_hop_keycmp,
510         .hs_object      = nrs_orr_hop_object,
511         .hs_get         = nrs_orr_hop_get,
512         .hs_put         = nrs_orr_hop_put,
513         .hs_put_locked  = nrs_orr_hop_put,
514         .hs_exit        = nrs_trr_hop_exit,
515 };
516
517 #define NRS_ORR_QUANTUM_DFLT    256
518
519 /**
520  * Binary heap predicate.
521  *
522  * Uses
523  * ptlrpc_nrs_request::nr_u::orr::or_round,
524  * ptlrpc_nrs_request::nr_u::orr::or_sequence, and
525  * ptlrpc_nrs_request::nr_u::orr::or_range to compare two binheap nodes and
526  * produce a binary predicate that indicates their relative priority, so that
527  * the binary heap can perform the necessary sorting operations.
528  *
529  * \param[in] e1 the first binheap node to compare
530  * \param[in] e2 the second binheap node to compare
531  *
532  * \retval 0 e1 > e2
533  * \retval 1 e1 < e2
534  */
535 static int orr_req_compare(cfs_binheap_node_t *e1, cfs_binheap_node_t *e2)
536 {
537         struct ptlrpc_nrs_request *nrq1;
538         struct ptlrpc_nrs_request *nrq2;
539
540         nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
541         nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
542
543         /**
544          * Requests have been scheduled against a different scheduling round.
545          */
546         if (nrq1->nr_u.orr.or_round < nrq2->nr_u.orr.or_round)
547                 return 1;
548         else if (nrq1->nr_u.orr.or_round > nrq2->nr_u.orr.or_round)
549                 return 0;
550
551         /**
552          * Requests have been scheduled against the same scheduling round, but
553          * belong to a different batch, i.e. they pertain to a different
554          * backend-fs object (for ORR policy instances) or OST (for TRR policy
555          * instances).
556          */
557         if (nrq1->nr_u.orr.or_sequence < nrq2->nr_u.crr.cr_sequence)
558                 return 1;
559         else if (nrq1->nr_u.orr.or_sequence > nrq2->nr_u.crr.cr_sequence)
560                 return 0;
561
562         /**
563          * If round numbers and sequence numbers are equal, the two requests
564          * have been scheduled on the same round, and belong to the same batch,
565          * which means they pertain to the same backend-fs object (if this is an
566          * ORR policy instance), or to the same OST (if this is a TRR policy
567          * instance), so these requests should be sorted by ascending offset
568          * order.
569          */
570         if (nrq1->nr_u.orr.or_range.or_start <
571             nrq2->nr_u.orr.or_range.or_start) {
572                 return 1;
573         } else if (nrq1->nr_u.orr.or_range.or_start >
574                  nrq2->nr_u.orr.or_range.or_start) {
575                 return 0;
576         } else {
577                 /**
578                  * Requests start from the same offset; Dispatch the shorter one
579                  * first; perhaps slightly more chances of hitting caches like
580                  * this.
581                  */
582                 return nrq1->nr_u.orr.or_range.or_end <
583                        nrq2->nr_u.orr.or_range.or_end;
584         }
585 }
586
587 /**
588  * ORR binary heap operations
589  */
590 static cfs_binheap_ops_t nrs_orr_heap_ops = {
591         .hop_enter      = NULL,
592         .hop_exit       = NULL,
593         .hop_compare    = orr_req_compare,
594 };
595
596 /**
597  * Prints a warning message if an ORR/TRR policy is started on a service with
598  * more than one CPT.
599  *
600  * \param[in] policy the policy instance
601  *
602  * \retval 0 success
603  */
604 static int nrs_orr_init(struct ptlrpc_nrs_policy *policy)
605 {
606         if (policy->pol_nrs->nrs_svcpt->scp_service->srv_ncpts > 1) {
607                 bool is_orr = strncmp(policy->pol_desc->pd_name,
608                                       NRS_POL_NAME_ORR, NRS_POL_NAME_MAX) == 0;
609
610                 CWARN("A%s %s NRS policy has been registered on a PTLRPC "
611                       "service which has more than one service partition. "
612                       "Please be advised that this policy may perform better "
613                       "on services with only one partition.\n",
614                       is_orr ? "n" : "", policy->pol_desc->pd_name);
615         }
616         return 0;
617 }
618
619 /**
620  * Called when an ORR policy instance is started.
621  *
622  * \param[in] policy the policy
623  *
624  * \retval -ENOMEM OOM error
625  * \retval 0       success
626  */
627 static int nrs_orr_start(struct ptlrpc_nrs_policy *policy)
628 {
629         struct nrs_orr_data    *orrd;
630         cfs_hash_ops_t         *ops;
631         unsigned                cur_bits;
632         unsigned                max_bits;
633         unsigned                bkt_bits;
634         unsigned                flags;
635         int                     rc = 0;
636         ENTRY;
637
638         OBD_CPT_ALLOC_PTR(orrd, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
639         if (orrd == NULL)
640                 RETURN(-ENOMEM);
641
642         /*
643          * Binary heap instance for sorted incoming requests.
644          */
645         orrd->od_binheap = cfs_binheap_create(&nrs_orr_heap_ops,
646                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
647                                               nrs_pol2cptab(policy),
648                                               nrs_pol2cptid(policy));
649         if (orrd->od_binheap == NULL)
650                 GOTO(failed, rc = -ENOMEM);
651
652         nrs_orr_genobjname(policy, orrd->od_objname);
653
654         /**
655          * Slab cache for NRS ORR/TRR objects.
656          */
657         orrd->od_cache = cfs_mem_cache_create(orrd->od_objname,
658                                               sizeof(struct nrs_orr_object),
659                                               0, 0);
660         if (orrd->od_cache == NULL)
661                 GOTO(failed, rc = -ENOMEM);
662
663         if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
664                     NRS_POL_NAME_MAX) == 0) {
665                 ops = &nrs_orr_hash_ops;
666                 cur_bits = NRS_ORR_BITS;
667                 max_bits = NRS_ORR_BITS;
668                 bkt_bits = NRS_ORR_BKT_BITS;
669                 flags = NRS_ORR_HASH_FLAGS;
670         } else {
671                 ops = &nrs_trr_hash_ops;
672                 cur_bits = NRS_TRR_BITS;
673                 max_bits = NRS_TRR_BITS;
674                 bkt_bits = NRS_TRR_BKT_BITS;
675                 flags = NRS_TRR_HASH_FLAGS;
676         }
677
678         /**
679          * Hash for finding objects by struct nrs_orr_key.
680          * XXX: For TRR, it might be better to avoid using libcfs_hash?
681          * All that needs to be resolved are OST indices, and they
682          * will stay relatively stable during an OSS node's lifetime.
683          */
684         orrd->od_obj_hash = cfs_hash_create(orrd->od_objname, cur_bits,
685                                             max_bits, bkt_bits, 0,
686                                             CFS_HASH_MIN_THETA,
687                                             CFS_HASH_MAX_THETA, ops, flags);
688         if (orrd->od_obj_hash == NULL)
689                 GOTO(failed, rc = -ENOMEM);
690
691         /* XXX: Fields accessed unlocked */
692         orrd->od_quantum = NRS_ORR_QUANTUM_DFLT;
693         orrd->od_supp = NOS_DFLT;
694         orrd->od_physical = true;
695         /**
696          * Set to 1 so that the test inside nrs_orr_req_add() can evaluate to
697          * true.
698          */
699         orrd->od_sequence = 1;
700
701         policy->pol_private = orrd;
702
703         RETURN(rc);
704
705 failed:
706         if (orrd->od_cache) {
707                 rc = cfs_mem_cache_destroy(orrd->od_cache);
708                 LASSERTF(rc == 0, "Could not destroy od_cache slab\n");
709         }
710         if (orrd->od_binheap != NULL)
711                 cfs_binheap_destroy(orrd->od_binheap);
712
713         OBD_FREE_PTR(orrd);
714
715         RETURN(rc);
716 }
717
718 /**
719  * Called when an ORR/TRR policy instance is stopped.
720  *
721  * Called when the policy has been instructed to transition to the
722  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more
723  * pending requests to serve.
724  *
725  * \param[in] policy the policy
726  */
727 static void nrs_orr_stop(struct ptlrpc_nrs_policy *policy)
728 {
729         struct nrs_orr_data *orrd = policy->pol_private;
730         ENTRY;
731
732         LASSERT(orrd != NULL);
733         LASSERT(orrd->od_binheap != NULL);
734         LASSERT(orrd->od_obj_hash != NULL);
735         LASSERT(orrd->od_cache != NULL);
736         LASSERT(cfs_binheap_is_empty(orrd->od_binheap));
737
738         cfs_binheap_destroy(orrd->od_binheap);
739         cfs_hash_putref(orrd->od_obj_hash);
740         cfs_mem_cache_destroy(orrd->od_cache);
741
742         OBD_FREE_PTR(orrd);
743 }
744
745 /**
746  * Performs a policy-specific ctl function on ORR/TRR policy instances; similar
747  * to ioctl.
748  *
749  * \param[in]     policy the policy instance
750  * \param[in]     opc    the opcode
751  * \param[in,out] arg    used for passing parameters and information
752  *
753  * \pre spin_is_locked(&policy->pol_nrs->->nrs_lock)
754  * \post spin_is_locked(&policy->pol_nrs->->nrs_lock)
755  *
756  * \retval 0   operation carried successfully
757  * \retval -ve error
758  */
759 int nrs_orr_ctl(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc,
760                 void *arg)
761 {
762         LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock));
763
764         switch(opc) {
765         default:
766                 RETURN(-EINVAL);
767
768         case NRS_CTL_ORR_RD_QUANTUM: {
769                 struct nrs_orr_data     *orrd = policy->pol_private;
770
771                 *(__u16 *)arg = orrd->od_quantum;
772                 }
773                 break;
774
775         case NRS_CTL_ORR_WR_QUANTUM: {
776                 struct nrs_orr_data     *orrd = policy->pol_private;
777
778                 orrd->od_quantum = *(__u16 *)arg;
779                 LASSERT(orrd->od_quantum != 0);
780                 }
781                 break;
782
783         case NRS_CTL_ORR_RD_OFF_TYPE: {
784                 struct nrs_orr_data     *orrd = policy->pol_private;
785
786                 *(bool *)arg = orrd->od_physical;
787                 }
788                 break;
789
790         case NRS_CTL_ORR_WR_OFF_TYPE: {
791                 struct nrs_orr_data     *orrd = policy->pol_private;
792
793                 orrd->od_physical = *(bool *)arg;
794                 }
795                 break;
796
797         case NRS_CTL_ORR_RD_SUPP_REQ: {
798                 struct nrs_orr_data     *orrd = policy->pol_private;
799
800                 *(enum nrs_orr_supp *)arg = orrd->od_supp;
801                 }
802                 break;
803
804         case NRS_CTL_ORR_WR_SUPP_REQ: {
805                 struct nrs_orr_data     *orrd = policy->pol_private;
806
807                 orrd->od_supp = *(enum nrs_orr_supp *)arg;
808                 LASSERT((orrd->od_supp & NOS_OST_RW) != 0);
809                 }
810                 break;
811         }
812         RETURN(0);
813 }
814
815 /**
816  * Obtains resources for ORR/TRR policy instances. The top-level resource lives
817  * inside \e nrs_orr_data and the second-level resource inside
818  * \e nrs_orr_object instances.
819  *
820  * \param[in]  policy     the policy for which resources are being taken for
821  *                        request \a nrq
822  * \param[in]  nrq        the request for which resources are being taken
823  * \param[in]  parent     parent resource, embedded in nrs_orr_data for the
824  *                        ORR/TRR policies
825  * \param[out] resp       used to return resource references
826  * \param[in]  moving_req signifies limited caller context; used to perform
827  *                        memory allocations in an atomic context in this
828  *                        policy
829  *
830  * \retval 0   we are returning a top-level, parent resource, one that is
831  *             embedded in an nrs_orr_data object
832  * \retval 1   we are returning a bottom-level resource, one that is embedded
833  *             in an nrs_orr_object object
834  *
835  * \see nrs_resource_get_safe()
836  */
837 int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
838                     struct ptlrpc_nrs_request *nrq,
839                     const struct ptlrpc_nrs_resource *parent,
840                     struct ptlrpc_nrs_resource **resp, bool moving_req)
841 {
842         struct nrs_orr_data            *orrd;
843         struct nrs_orr_object          *orro;
844         struct nrs_orr_object          *tmp;
845         struct nrs_orr_key              key = { { { 0 } } };
846         __u32                           opc;
847         int                             rc = 0;
848
849         /**
850          * struct nrs_orr_data is requested.
851          */
852         if (parent == NULL) {
853                 *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res;
854                 return 0;
855         }
856
857         orrd = container_of(parent, struct nrs_orr_data, od_res);
858
859         /**
860          * If the request type is not supported, fail the enqueuing; the RPC
861          * will be handled by the fallback NRS policy.
862          */
863         if (!nrs_orr_req_supported(orrd, nrq, &opc))
864                 return -1;
865
866         /**
867          * Fill in the key for the request; OST FID for ORR policy instances,
868          * and OST index for TRR policy instances.
869          */
870         rc = nrs_orr_key_fill(orrd, nrq, opc, policy->pol_desc->pd_name, &key);
871         if (rc < 0)
872                 RETURN(rc);
873
874         /**
875          * Set the offset range the request covers
876          */
877         rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req);
878         if (rc < 0)
879                 RETURN(rc);
880
881         orro = cfs_hash_lookup(orrd->od_obj_hash, &key);
882         if (orro != NULL)
883                 goto out;
884
885         OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache,
886                                    nrs_pol2cptab(policy), nrs_pol2cptid(policy),
887                                    moving_req ? CFS_ALLOC_ATOMIC :
888                                    CFS_ALLOC_IO);
889         if (orro == NULL)
890                 RETURN(-ENOMEM);
891
892         orro->oo_key = key;
893         cfs_atomic_set(&orro->oo_ref, 1);
894
895         tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key,
896                                       &orro->oo_hnode);
897         if (tmp != orro) {
898                 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
899                 orro = tmp;
900         }
901 out:
902         /**
903          * For debugging purposes
904          */
905         nrq->nr_u.orr.or_key = orro->oo_key;
906
907         *resp = &orro->oo_res;
908
909         return 1;
910 }
911
912 /**
913  * Called when releasing references to the resource hierachy obtained for a
914  * request for scheduling using ORR/TRR policy instances
915  *
916  * \param[in] policy   the policy the resource belongs to
917  * \param[in] res      the resource to be released
918  */
919 static void nrs_orr_res_put(struct ptlrpc_nrs_policy *policy,
920                             const struct ptlrpc_nrs_resource *res)
921 {
922         struct nrs_orr_data     *orrd;
923         struct nrs_orr_object   *orro;
924
925         /**
926          * Do nothing for freeing parent, nrs_orr_data resources.
927          */
928         if (res->res_parent == NULL)
929                 return;
930
931         orro = container_of(res, struct nrs_orr_object, oo_res);
932         orrd = container_of(res->res_parent, struct nrs_orr_data, od_res);
933
934         cfs_hash_put(orrd->od_obj_hash, &orro->oo_hnode);
935 }
936
937 /**
938  * Called when polling an ORR/TRR policy instance for a request so that it can
939  * be served. Returns the request that is at the root of the binary heap, as
940  * that is the lowest priority one (i.e. libcfs_heap is an implementation of a
941  * min-heap)
942  *
943  * \param[in] policy the policy instance being polled
944  * \param[in] peek   when set, signifies that we just want to examine the
945  *                   request, and not handle it, so the request is not removed
946  *                   from the policy.
947  * \param[in] force  force the policy to return a request; unused in this policy
948  *
949  * \retval the request to be handled
950  * \retval NULL no request available
951  *
952  * \see ptlrpc_nrs_req_get_nolock()
953  * \see nrs_request_get()
954  */
955 static
956 struct ptlrpc_nrs_request *nrs_orr_req_get(struct ptlrpc_nrs_policy *policy,
957                                            bool peek, bool force)
958 {
959         struct nrs_orr_data       *orrd = policy->pol_private;
960         cfs_binheap_node_t        *node = cfs_binheap_root(orrd->od_binheap);
961         struct ptlrpc_nrs_request *nrq;
962
963         nrq = unlikely(node == NULL) ? NULL :
964               container_of(node, struct ptlrpc_nrs_request, nr_node);
965
966         if (likely(!peek && nrq != NULL)) {
967                 struct nrs_orr_object *orro;
968
969                 orro = container_of(nrs_request_resource(nrq),
970                                     struct nrs_orr_object, oo_res);
971
972                 LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
973
974                 cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
975                 orro->oo_active--;
976
977                 if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
978                                  NRS_POL_NAME_MAX) == 0)
979                         CDEBUG(D_RPCTRACE,
980                                "NRS: starting to handle %s request for object "
981                                "with FID "DFID", from OST with index %u, with "
982                                "round "LPU64"\n", NRS_POL_NAME_ORR,
983                                PFID(&orro->oo_key.ok_fid),
984                                nrq->nr_u.orr.or_key.ok_idx,
985                                nrq->nr_u.orr.or_round);
986                 else
987                         CDEBUG(D_RPCTRACE,
988                                "NRS: starting to handle %s request from OST "
989                                "with index %u, with round "LPU64"\n",
990                                NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
991                                nrq->nr_u.orr.or_round);
992
993                 /** Peek at the next request to be served */
994                 node = cfs_binheap_root(orrd->od_binheap);
995
996                 /** No more requests */
997                 if (unlikely(node == NULL)) {
998                         orrd->od_round++;
999                 } else {
1000                         struct ptlrpc_nrs_request *next;
1001
1002                         next = container_of(node, struct ptlrpc_nrs_request,
1003                                             nr_node);
1004
1005                         if (orrd->od_round < next->nr_u.orr.or_round)
1006                                 orrd->od_round = next->nr_u.orr.or_round;
1007                 }
1008         }
1009
1010         return nrq;
1011 }
1012
1013 /**
1014  * Sort-adds request \a nrq to an ORR/TRR \a policy instance's set of queued
1015  * requests in the policy's binary heap.
1016  *
1017  * A scheduling round is a stream of requests that have been sorted in batches
1018  * according to the backend-fs object (for ORR policy instances) or OST (for TRR
1019  * policy instances) that they pertain to (as identified by its IDIF FID or OST
1020  * index respectively); there can be only one batch for each object or OST in
1021  * each round. The batches are of maximum size nrs_orr_data:od_quantum. When a
1022  * new request arrives for scheduling for an object or OST that has exhausted
1023  * its quantum in its current round, the request will be scheduled on the next
1024  * scheduling round. Requests are allowed to be scheduled against a round until
1025  * all requests for the round are serviced, so an object or OST might miss a
1026  * round if requests are not scheduled for it for a long enough period of time.
1027  * Objects or OSTs that miss a round will continue with having their next
1028  * request scheduled, starting at the round that requests are being dispatched
1029  * for, at the time of arrival of this request.
1030  *
1031  * Requests are tagged with the round number and a sequence number; the sequence
1032  * number indicates the relative ordering amongst the batches of requests in a
1033  * round, and is identical for all requests in a batch, as is the round number.
1034  * The round and sequence numbers are used by orr_req_compare() in order to use
1035  * nrs_orr_data::od_binheap in order to maintain an ordered set of rounds, with
1036  * each round consisting of an ordered set of batches of requests, and each
1037  * batch consisting of an ordered set of requests according to their logical
1038  * file or physical disk offsets.
1039  *
1040  * \param[in] policy the policy
1041  * \param[in] nrq    the request to add
1042  *
1043  * \retval 0    request successfully added
1044  * \retval != 0 error
1045  */
1046 static int nrs_orr_req_add(struct ptlrpc_nrs_policy *policy,
1047                            struct ptlrpc_nrs_request *nrq)
1048 {
1049         struct nrs_orr_data     *orrd;
1050         struct nrs_orr_object   *orro;
1051         int                      rc;
1052
1053         orro = container_of(nrs_request_resource(nrq),
1054                             struct nrs_orr_object, oo_res);
1055         orrd = container_of(nrs_request_resource(nrq)->res_parent,
1056                             struct nrs_orr_data, od_res);
1057
1058         if (orro->oo_quantum == 0 || orro->oo_round < orrd->od_round ||
1059             (orro->oo_active == 0 && orro->oo_quantum > 0)) {
1060
1061                 /**
1062                  * If there are no pending requests for the object/OST, but some
1063                  * of its quantum still remains unused, which implies we did not
1064                  * get a chance to schedule up to its maximum allowed batch size
1065                  * of requests in the previous round this object/OST
1066                  * participated in, schedule this next request on a new round;
1067                  * this avoids fragmentation of request batches caused by
1068                  * intermittent inactivity on the object/OST, at the expense of
1069                  * potentially slightly increased service time for the request
1070                  * batch this request will be a part of.
1071                  */
1072                 if (orro->oo_active == 0 && orro->oo_quantum > 0)
1073                         orro->oo_round++;
1074
1075                 /** A new scheduling round has commenced */
1076                 if (orro->oo_round < orrd->od_round)
1077                         orro->oo_round = orrd->od_round;
1078
1079                 /** I was not the last object/OST that scheduled a request */
1080                 if (orro->oo_sequence < orrd->od_sequence)
1081                         orro->oo_sequence = ++orrd->od_sequence;
1082                 /**
1083                  * Reset the quantum if we have reached the maximum quantum
1084                  * size for this batch, or even if we have not managed to
1085                  * complete a batch size up to its maximum allowed size.
1086                  * XXX: Accessed unlocked
1087                  */
1088                 orro->oo_quantum = orrd->od_quantum;
1089         }
1090
1091         nrq->nr_u.crr.cr_round = orro->oo_round;
1092         nrq->nr_u.crr.cr_sequence = orro->oo_sequence;
1093
1094         rc = cfs_binheap_insert(orrd->od_binheap, &nrq->nr_node);
1095         if (rc == 0) {
1096                 orro->oo_active++;
1097                 if (--orro->oo_quantum == 0)
1098                         orro->oo_round++;
1099         }
1100         return rc;
1101 }
1102
1103 /**
1104  * Removes request \a nrq from an ORR/TRR \a policy instance's set of queued
1105  * requests.
1106  *
1107  * \param[in] policy the policy
1108  * \param[in] nrq    the request to remove
1109  */
1110 static void nrs_orr_req_del(struct ptlrpc_nrs_policy *policy,
1111                             struct ptlrpc_nrs_request *nrq)
1112 {
1113         struct nrs_orr_data     *orrd;
1114         struct nrs_orr_object   *orro;
1115         bool                     is_root;
1116
1117         orro = container_of(nrs_request_resource(nrq),
1118                             struct nrs_orr_object, oo_res);
1119         orrd = container_of(nrs_request_resource(nrq)->res_parent,
1120                             struct nrs_orr_data, od_res);
1121
1122         LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
1123
1124         is_root = &nrq->nr_node == cfs_binheap_root(orrd->od_binheap);
1125
1126         cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
1127         orro->oo_active--;
1128
1129         /**
1130          * If we just deleted the node at the root of the binheap, we may have
1131          * to adjust round numbers.
1132          */
1133         if (unlikely(is_root)) {
1134                 /** Peek at the next request to be served */
1135                 cfs_binheap_node_t *node = cfs_binheap_root(orrd->od_binheap);
1136
1137                 /** No more requests */
1138                 if (unlikely(node == NULL)) {
1139                         orrd->od_round++;
1140                 } else {
1141                         nrq = container_of(node, struct ptlrpc_nrs_request,
1142                                            nr_node);
1143
1144                         if (orrd->od_round < nrq->nr_u.orr.or_round)
1145                                 orrd->od_round = nrq->nr_u.orr.or_round;
1146                 }
1147         }
1148 }
1149
1150 /**
1151  * Called right after the request \a nrq finishes being handled by ORR policy
1152  * instance \a policy.
1153  *
1154  * \param[in] policy the policy that handled the request
1155  * \param[in] nrq    the request that was handled
1156  */
1157 static void nrs_orr_req_stop(struct ptlrpc_nrs_policy *policy,
1158                              struct ptlrpc_nrs_request *nrq)
1159 {
1160         /** NB: resource control, credits etc can be added here */
1161         if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
1162                     NRS_POL_NAME_MAX) == 0)
1163                 CDEBUG(D_RPCTRACE,
1164                        "NRS: finished handling %s request for object with FID "
1165                        DFID", from OST with index %u, with round "LPU64"\n",
1166                        NRS_POL_NAME_ORR, PFID(&nrq->nr_u.orr.or_key.ok_fid),
1167                        nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round);
1168         else
1169                 CDEBUG(D_RPCTRACE,
1170                        "NRS: finished handling %s request from OST with index %u,"
1171                        " with round "LPU64"\n",
1172                        NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
1173                        nrq->nr_u.orr.or_round);
1174 }
1175
1176 /**
1177  * lprocfs interface
1178  */
1179
1180 #ifdef LPROCFS
1181
1182 /**
1183  * This allows to bundle the policy name into the lprocfs_vars::data pointer
1184  * so that lprocfs read/write functions can be used by both the ORR and TRR
1185  * policies.
1186  */
1187 struct nrs_lprocfs_orr_data {
1188         struct ptlrpc_service   *svc;
1189         char                    *name;
1190 } lprocfs_orr_data = {
1191         .name = NRS_POL_NAME_ORR
1192 }, lprocfs_trr_data = {
1193         .name = NRS_POL_NAME_TRR
1194 };
1195
1196 /**
1197  * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
1198  * for ORR/TRR policy instances on both the regular and high-priority NRS head
1199  * of a service, as long as a policy instance is not in the
1200  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1201  * state are skipped later by nrs_orr_ctl().
1202  *
1203  * Quantum values are in # of RPCs, and the output is in YAML format.
1204  *
1205  * For example:
1206  *
1207  *      reg_quantum:256
1208  *      hp_quantum:8
1209  *
1210  * XXX: the CRR-N version of this, ptlrpc_lprocfs_rd_nrs_crrn_quantum() is
1211  * almost identical; it can be reworked and then reused for ORR/TRR.
1212  */
1213 static int ptlrpc_lprocfs_rd_nrs_orr_quantum(char *page, char **start,
1214                                              off_t off, int count, int *eof,
1215                                              void *data)
1216 {
1217         struct nrs_lprocfs_orr_data *orr_data = data;
1218         struct ptlrpc_service       *svc = orr_data->svc;
1219         __u16                        quantum;
1220         int                          rc;
1221         int                          rc2 = 0;
1222
1223         /**
1224          * Perform two separate calls to this as only one of the NRS heads'
1225          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
1226          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1227          */
1228         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1229                                        orr_data->name,
1230                                        NRS_CTL_ORR_RD_QUANTUM,
1231                                        true, &quantum);
1232         if (rc == 0) {
1233                 *eof = 1;
1234                 rc2 = snprintf(page, count, NRS_LPROCFS_QUANTUM_NAME_REG
1235                                "%-5d\n", quantum);
1236                 /**
1237                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1238                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1239                  */
1240         } else if (rc != -ENODEV) {
1241                 return rc;
1242         }
1243
1244         /**
1245          * We know the ost_io service which is the only one ORR/TRR policies are
1246          * compatible with, do have an HP NRS head, but it may be best to guard
1247          * against a possible change of this in the future.
1248          */
1249         if (!nrs_svc_has_hp(svc))
1250                 goto no_hp;
1251
1252         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1253                                        orr_data->name, NRS_CTL_ORR_RD_QUANTUM,
1254                                        true, &quantum);
1255         if (rc == 0) {
1256                 *eof = 1;
1257                 rc2 += snprintf(page + rc2, count - rc2,
1258                                 NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
1259                 /**
1260                  * Ignore -ENODEV as the high priority NRS head's policy may be
1261                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1262                  */
1263         } else if (rc != -ENODEV) {
1264                 return rc;
1265         }
1266
1267 no_hp:
1268
1269         return rc2 ? : rc;
1270 }
1271
1272 /**
1273  * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
1274  * for ORR/TRR policy instances of a service. The user can set the quantum size
1275  * for the regular and high priority NRS head separately by specifying each
1276  * value, or both together in a single invocation.
1277  *
1278  * For example:
1279  *
1280  * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=req_quantum:64, to set the
1281  * request quantum size of the ORR policy instance on the regular NRS head of
1282  * the ost_io service to 64
1283  *
1284  * lctl set_param ost.OSS.ost_io.nrs_trr_quantum=hp_quantum:8 to set the request
1285  * quantum size of the TRR policy instance on the high priority NRS head of the
1286  * ost_io service to 8
1287  *
1288  * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=32, to set both the request
1289  * quantum size of the ORR policy instance on both the regular and the high
1290  * priority NRS head of the ost_io service to 32
1291  *
1292  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
1293  * are skipped later by nrs_orr_ctl().
1294  *
1295  * XXX: the CRR-N version of this, ptlrpc_lprocfs_wr_nrs_crrn_quantum() is
1296  * almost identical; it can be reworked and then reused for ORR/TRR.
1297  */
1298 static int ptlrpc_lprocfs_wr_nrs_orr_quantum(struct file *file,
1299                                              const char *buffer,
1300                                              unsigned long count, void *data)
1301 {
1302         struct nrs_lprocfs_orr_data *orr_data = data;
1303         struct ptlrpc_service       *svc = orr_data->svc;
1304         enum ptlrpc_nrs_queue_type   queue = 0;
1305         char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
1306         char                        *val;
1307         long                         quantum_reg;
1308         long                         quantum_hp;
1309         /** lprocfs_find_named_value() modifies its argument, so keep a copy */
1310         unsigned long                count_copy;
1311         int                          rc = 0;
1312         int                          rc2 = 0;
1313
1314         if (count > (sizeof(kernbuf) - 1))
1315                 return -EINVAL;
1316
1317         if (cfs_copy_from_user(kernbuf, buffer, count))
1318                 return -EFAULT;
1319
1320         kernbuf[count] = '\0';
1321
1322         count_copy = count;
1323
1324         /**
1325          * Check if the regular quantum value has been specified
1326          */
1327         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
1328                                        &count_copy);
1329         if (val != kernbuf) {
1330                 quantum_reg = simple_strtol(val, NULL, 10);
1331
1332                 queue |= PTLRPC_NRS_QUEUE_REG;
1333         }
1334
1335         count_copy = count;
1336
1337         /**
1338          * Check if the high priority quantum value has been specified
1339          */
1340         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
1341                                        &count_copy);
1342         if (val != kernbuf) {
1343                 if (!nrs_svc_has_hp(svc))
1344                         return -ENODEV;
1345
1346                 quantum_hp = simple_strtol(val, NULL, 10);
1347
1348                 queue |= PTLRPC_NRS_QUEUE_HP;
1349         }
1350
1351         /**
1352          * If none of the queues has been specified, look for a valid numerical
1353          * value
1354          */
1355         if (queue == 0) {
1356                 if (!isdigit(kernbuf[0]))
1357                         return -EINVAL;
1358
1359                 quantum_reg = simple_strtol(kernbuf, NULL, 10);
1360
1361                 queue = PTLRPC_NRS_QUEUE_REG;
1362
1363                 if (nrs_svc_has_hp(svc)) {
1364                         queue |= PTLRPC_NRS_QUEUE_HP;
1365                         quantum_hp = quantum_reg;
1366                 }
1367         }
1368
1369         if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
1370             ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
1371             (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
1372             ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
1373                 return -EINVAL;
1374
1375         /**
1376          * We change the values on regular and HP NRS heads separately, so that
1377          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1378          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1379          * started the policy on either the regular or HP NRS head; i.e. we are
1380          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1381          * only if the operation fails with -ENODEV on all heads that have been
1382          * specified by the command; if at least one operation succeeds,
1383          * success is returned.
1384          */
1385         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1386                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1387                                                orr_data->name,
1388                                                NRS_CTL_ORR_WR_QUANTUM, false,
1389                                                &quantum_reg);
1390                 if ((rc < 0 && rc != -ENODEV) ||
1391                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1392                         return rc;
1393         }
1394
1395         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1396                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1397                                                 orr_data->name,
1398                                                 NRS_CTL_ORR_WR_QUANTUM, false,
1399                                                 &quantum_hp);
1400                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1401                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1402                         return rc2;
1403         }
1404
1405         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1406 }
1407
1408 #define LPROCFS_NRS_OFF_NAME_REG                "reg_offset_type:"
1409 #define LPROCFS_NRS_OFF_NAME_HP                 "hp_offset_type:"
1410
1411 #define LPROCFS_NRS_OFF_NAME_PHYSICAL           "physical"
1412 #define LPROCFS_NRS_OFF_NAME_LOGICAL            "logical"
1413
1414 /**
1415  * Retrieves the offset type used by ORR/TRR policy instances on both the
1416  * regular and high-priority NRS head of a service, as long as a policy
1417  * instance is not in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state;
1418  * policy instances in this state are skipped later by nrs_orr_ctl().
1419  *
1420  * Offset type information is a (physical|logical) string, and output is
1421  * in YAML format.
1422  *
1423  * For example:
1424  *
1425  *      reg_offset_type:physical
1426  *      hp_offset_type:logical
1427  */
1428 static int ptlrpc_lprocfs_rd_nrs_orr_offset_type(char *page, char **start,
1429                                                  off_t off, int count, int *eof,
1430                                                  void *data)
1431 {
1432         struct nrs_lprocfs_orr_data *orr_data = data;
1433         struct ptlrpc_service       *svc = orr_data->svc;
1434         bool                         physical;
1435         int                          rc;
1436         int                          rc2 = 0;
1437
1438         /**
1439          * Perform two separate calls to this as only one of the NRS heads'
1440          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1441          * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1442          */
1443         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1444                                        orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1445                                        true, &physical);
1446         if (rc == 0) {
1447                 *eof = 1;
1448                 rc2 = snprintf(page, count,
1449                                LPROCFS_NRS_OFF_NAME_REG"%s\n",
1450                                physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1451                                LPROCFS_NRS_OFF_NAME_LOGICAL);
1452                 /**
1453                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1454                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1455                  */
1456         } else if (rc != -ENODEV) {
1457                 return rc;
1458         }
1459
1460         /**
1461          * We know the ost_io service which is the only one ORR/TRR policies are
1462          * compatible with, do have an HP NRS head, but it may be best to guard
1463          * against a possible change of this in the future.
1464          */
1465         if (!nrs_svc_has_hp(svc))
1466                 goto no_hp;
1467
1468         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1469                                        orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1470                                        true, &physical);
1471         if (rc == 0) {
1472                 *eof = 1;
1473                 rc2 += snprintf(page + rc2, count - rc2,
1474                                 LPROCFS_NRS_OFF_NAME_HP"%s\n",
1475                                 physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1476                                 LPROCFS_NRS_OFF_NAME_LOGICAL);
1477                 /**
1478                  * Ignore -ENODEV as the high priority NRS head's policy may be
1479                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1480                  */
1481         } else if (rc != -ENODEV) {
1482                 return rc;
1483         }
1484
1485 no_hp:
1486
1487         return rc2 ? : rc;
1488 }
1489
1490 /**
1491  * Max valid command string is the size of the labels, plus "physical" twice.
1492  * plus a separating ' '
1493  */
1494 #define LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD                                        \
1495         sizeof(LPROCFS_NRS_OFF_NAME_REG LPROCFS_NRS_OFF_NAME_PHYSICAL " "      \
1496                LPROCFS_NRS_OFF_NAME_HP LPROCFS_NRS_OFF_NAME_PHYSICAL)
1497
1498 /**
1499  * Sets the type of offsets used to order RPCs in ORR/TRR policy instances. The
1500  * user can set offset type for the regular or high priority NRS head
1501  * separately by specifying each value, or both together in a single invocation.
1502  *
1503  * For example:
1504  *
1505  * lctl set_param ost.OSS.ost_io.nrs_orr_offset_type=
1506  * reg_offset_type:physical, to enable the ORR policy instance on the regular
1507  * NRS head of the ost_io service to use physical disk offset ordering.
1508  *
1509  * lctl set_param ost.OSS.ost_io.nrs_trr_offset_type=logical, to enable the TRR
1510  * policy instances on both the regular ang high priority NRS heads of the
1511  * ost_io service to use logical file offset ordering.
1512  *
1513  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1514  * are skipped later by nrs_orr_ctl().
1515  */
1516 static int ptlrpc_lprocfs_wr_nrs_orr_offset_type(struct file *file,
1517                                                  const char *buffer,
1518                                                  unsigned long count,
1519                                                  void *data)
1520 {
1521         struct nrs_lprocfs_orr_data *orr_data = data;
1522         struct ptlrpc_service       *svc = orr_data->svc;
1523         enum ptlrpc_nrs_queue_type   queue = 0;
1524         char                         kernbuf[LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD];
1525         char                        *val_reg;
1526         char                        *val_hp;
1527         bool                         physical_reg;
1528         bool                         physical_hp;
1529         unsigned long                count_copy;
1530         int                          rc = 0;
1531         int                          rc2 = 0;
1532
1533         if (count > (sizeof(kernbuf) - 1))
1534                 return -EINVAL;
1535
1536         if (cfs_copy_from_user(kernbuf, buffer, count))
1537                 return -EFAULT;
1538
1539         kernbuf[count] = '\0';
1540
1541         count_copy = count;
1542
1543         /**
1544          * Check if the regular offset type has been specified
1545          */
1546         val_reg = lprocfs_find_named_value(kernbuf,
1547                                            LPROCFS_NRS_OFF_NAME_REG,
1548                                            &count_copy);
1549         if (val_reg != kernbuf)
1550                 queue |= PTLRPC_NRS_QUEUE_REG;
1551
1552         count_copy = count;
1553
1554         /**
1555          * Check if the high priority offset type has been specified
1556          */
1557         val_hp = lprocfs_find_named_value(kernbuf, LPROCFS_NRS_OFF_NAME_HP,
1558                                           &count_copy);
1559         if (val_hp != kernbuf) {
1560                 if (!nrs_svc_has_hp(svc))
1561                         return -ENODEV;
1562
1563                 queue |= PTLRPC_NRS_QUEUE_HP;
1564         }
1565
1566         /**
1567          * If none of the queues has been specified, there may be a valid
1568          * command string at the start of the buffer.
1569          */
1570         if (queue == 0) {
1571                 queue = PTLRPC_NRS_QUEUE_REG;
1572
1573                 if (nrs_svc_has_hp(svc))
1574                         queue |= PTLRPC_NRS_QUEUE_HP;
1575         }
1576
1577         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1578                 if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1579                             sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1580                         physical_reg = true;
1581                 else if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_LOGICAL,
1582                          sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1583                         physical_reg = false;
1584                 else
1585                         return -EINVAL;
1586         }
1587
1588         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1589                 if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1590                             sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1591                         physical_hp = true;
1592                 else if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_LOGICAL,
1593                                  sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1594                         physical_hp = false;
1595                 else
1596                         return -EINVAL;
1597         }
1598
1599         /**
1600          * We change the values on regular and HP NRS heads separately, so that
1601          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1602          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1603          * started the policy on either the regular or HP NRS head; i.e. we are
1604          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1605          * only if the operation fails with -ENODEV on all heads that have been
1606          * specified by the command; if at least one operation succeeds,
1607          * success is returned.
1608          */
1609         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1610                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1611                                                orr_data->name,
1612                                                NRS_CTL_ORR_WR_OFF_TYPE, false,
1613                                                &physical_reg);
1614                 if ((rc < 0 && rc != -ENODEV) ||
1615                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1616                         return rc;
1617         }
1618
1619         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1620                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1621                                                 orr_data->name,
1622                                                 NRS_CTL_ORR_WR_OFF_TYPE, false,
1623                                                 &physical_hp);
1624                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1625                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1626                         return rc2;
1627         }
1628
1629         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1630 }
1631
1632 #define NRS_LPROCFS_REQ_SUPP_NAME_REG           "reg_supported:"
1633 #define NRS_LPROCFS_REQ_SUPP_NAME_HP            "hp_supported:"
1634
1635 #define LPROCFS_NRS_SUPP_NAME_READS             "reads"
1636 #define LPROCFS_NRS_SUPP_NAME_WRITES            "writes"
1637 #define LPROCFS_NRS_SUPP_NAME_READWRITES        "reads_and_writes"
1638
1639 /**
1640  * Translates enum nrs_orr_supp values to a corresponding string.
1641  */
1642 static const char *nrs_orr_supp2str(enum nrs_orr_supp supp)
1643 {
1644         switch(supp) {
1645         default:
1646                 LBUG();
1647         case NOS_OST_READ:
1648                 return LPROCFS_NRS_SUPP_NAME_READS;
1649         case NOS_OST_WRITE:
1650                 return LPROCFS_NRS_SUPP_NAME_WRITES;
1651         case NOS_OST_RW:
1652                 return LPROCFS_NRS_SUPP_NAME_READWRITES;
1653         }
1654 }
1655
1656 /**
1657  * Translates strings to the corresponding enum nrs_orr_supp value
1658  */
1659 static enum nrs_orr_supp nrs_orr_str2supp(const char *val)
1660 {
1661         if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READWRITES,
1662                     sizeof(LPROCFS_NRS_SUPP_NAME_READWRITES) - 1) == 0)
1663                 return NOS_OST_RW;
1664         else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READS,
1665                          sizeof(LPROCFS_NRS_SUPP_NAME_READS) - 1) == 0)
1666                 return NOS_OST_READ;
1667         else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_WRITES,
1668                          sizeof(LPROCFS_NRS_SUPP_NAME_WRITES) - 1) == 0)
1669                 return NOS_OST_WRITE;
1670         else
1671                 return -EINVAL;
1672 }
1673
1674 /**
1675  * Retrieves the type of RPCs handled at the point of invocation by ORR/TRR
1676  * policy instances on both the regular and high-priority NRS head of a service,
1677  * as long as a policy instance is not in the
1678  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1679  * state are skipped later by nrs_orr_ctl().
1680  *
1681  * Supported RPC type information is a (reads|writes|reads_and_writes) string,
1682  * and output is in YAML format.
1683  *
1684  * For example:
1685  *
1686  *      reg_supported:reads
1687  *      hp_supported:reads_and_writes
1688  */
1689 static int ptlrpc_lprocfs_rd_nrs_orr_supported(char *page, char **start,
1690                                                off_t off, int count, int *eof,
1691                                                void *data)
1692 {
1693         struct nrs_lprocfs_orr_data *orr_data = data;
1694         struct ptlrpc_service       *svc = orr_data->svc;
1695         enum nrs_orr_supp            supported;
1696         int                          rc;
1697         int                          rc2 = 0;
1698
1699         /**
1700          * Perform two separate calls to this as only one of the NRS heads'
1701          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1702          * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1703          */
1704         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1705                                        orr_data->name,
1706                                        NRS_CTL_ORR_RD_SUPP_REQ, true,
1707                                        &supported);
1708
1709         if (rc == 0) {
1710                 *eof = 1;
1711                 rc2 = snprintf(page, count,
1712                                NRS_LPROCFS_REQ_SUPP_NAME_REG"%s\n",
1713                                nrs_orr_supp2str(supported));
1714                 /**
1715                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1716                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1717                  */
1718         } else if (rc != -ENODEV) {
1719                 return rc;
1720         }
1721
1722         /**
1723          * We know the ost_io service which is the only one ORR/TRR policies are
1724          * compatible with, do have an HP NRS head, but it may be best to guard
1725          * against a possible change of this in the future.
1726          */
1727         if (!nrs_svc_has_hp(svc))
1728                 goto no_hp;
1729
1730         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1731                                        orr_data->name,
1732                                        NRS_CTL_ORR_RD_SUPP_REQ, true,
1733                                        &supported);
1734         if (rc == 0) {
1735                 *eof = 1;
1736                 rc2 += snprintf(page + rc2, count - rc2,
1737                                NRS_LPROCFS_REQ_SUPP_NAME_HP"%s\n",
1738                                nrs_orr_supp2str(supported));
1739                 /**
1740                  * Ignore -ENODEV as the high priority NRS head's policy may be
1741                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1742                  */
1743         } else if (rc != -ENODEV) {
1744                 return rc;
1745         }
1746
1747 no_hp:
1748
1749         return rc2 ? : rc;
1750 }
1751
1752 /**
1753  * Max valid command string is the size of the labels, plus "reads_and_writes"
1754  * twice, plus a separating ' '
1755  */
1756 #define LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD                                        \
1757         sizeof(NRS_LPROCFS_REQ_SUPP_NAME_REG LPROCFS_NRS_SUPP_NAME_READWRITES  \
1758                NRS_LPROCFS_REQ_SUPP_NAME_HP LPROCFS_NRS_SUPP_NAME_READWRITES   \
1759                " ")
1760
1761 /**
1762  * Sets the type of RPCs handled by ORR/TRR policy instances. The user can
1763  * modify this setting for the regular or high priority NRS heads separately, or
1764  * both together in a single invocation.
1765  *
1766  * For example:
1767  *
1768  * lctl set_param ost.OSS.ost_io.nrs_orr_supported=
1769  * "reg_supported:reads", to enable the ORR policy instance on the regular NRS
1770  * head of the ost_io service to handle OST_READ RPCs.
1771  *
1772  * lctl set_param ost.OSS.ost_io.nrs_trr_supported=reads_and_writes, to enable
1773  * the TRR policy instances on both the regular ang high priority NRS heads of
1774  * the ost_io service to use handle OST_READ and OST_WRITE RPCs.
1775  *
1776  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1777  * are skipped later by nrs_orr_ctl().
1778  */
1779 static int ptlrpc_lprocfs_wr_nrs_orr_supported(struct file *file,
1780                                                const char *buffer,
1781                                                unsigned long count, void *data)
1782 {
1783         struct nrs_lprocfs_orr_data *orr_data = data;
1784         struct ptlrpc_service       *svc = orr_data->svc;
1785         enum ptlrpc_nrs_queue_type   queue = 0;
1786         char                         kernbuf[LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD];
1787         char                        *val_reg;
1788         char                        *val_hp;
1789         enum nrs_orr_supp            supp_reg;
1790         enum nrs_orr_supp            supp_hp;
1791         unsigned long                count_copy;
1792         int                          rc = 0;
1793         int                          rc2 = 0;
1794
1795         if (count > (sizeof(kernbuf) - 1))
1796                 return -EINVAL;
1797
1798         if (cfs_copy_from_user(kernbuf, buffer, count))
1799                 return -EFAULT;
1800
1801         kernbuf[count] = '\0';
1802
1803         count_copy = count;
1804
1805         /**
1806          * Check if the regular supported requests setting has been specified
1807          */
1808         val_reg = lprocfs_find_named_value(kernbuf,
1809                                            NRS_LPROCFS_REQ_SUPP_NAME_REG,
1810                                            &count_copy);
1811         if (val_reg != kernbuf)
1812                 queue |= PTLRPC_NRS_QUEUE_REG;
1813
1814         count_copy = count;
1815
1816         /**
1817          * Check if the high priority supported requests setting has been
1818          * specified
1819          */
1820         val_hp = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_REQ_SUPP_NAME_HP,
1821                                           &count_copy);
1822         if (val_hp != kernbuf) {
1823                 if (!nrs_svc_has_hp(svc))
1824                         return -ENODEV;
1825
1826                 queue |= PTLRPC_NRS_QUEUE_HP;
1827         }
1828
1829         /**
1830          * If none of the queues has been specified, there may be a valid
1831          * command string at the start of the buffer.
1832          */
1833         if (queue == 0) {
1834                 queue = PTLRPC_NRS_QUEUE_REG;
1835
1836                 if (nrs_svc_has_hp(svc))
1837                         queue |= PTLRPC_NRS_QUEUE_HP;
1838         }
1839
1840         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1841                 supp_reg = nrs_orr_str2supp(val_reg);
1842                 if (supp_reg == -EINVAL)
1843                         return -EINVAL;
1844         }
1845
1846         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1847                 supp_hp = nrs_orr_str2supp(val_hp);
1848                 if (supp_hp == -EINVAL)
1849                         return -EINVAL;
1850         }
1851
1852         /**
1853          * We change the values on regular and HP NRS heads separately, so that
1854          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1855          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1856          * started the policy on either the regular or HP NRS head; i.e. we are
1857          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1858          * only if the operation fails with -ENODEV on all heads that have been
1859          * specified by the command; if at least one operation succeeds,
1860          * success is returned.
1861          */
1862         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1863                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1864                                                orr_data->name,
1865                                                NRS_CTL_ORR_WR_SUPP_REQ, false,
1866                                                &supp_reg);
1867                 if ((rc < 0 && rc != -ENODEV) ||
1868                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1869                         return rc;
1870         }
1871
1872         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1873                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1874                                                 orr_data->name,
1875                                                 NRS_CTL_ORR_WR_SUPP_REQ, false,
1876                                                 &supp_hp);
1877                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1878                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1879                         return rc2;
1880         }
1881
1882         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1883 }
1884
1885 int nrs_orr_lprocfs_init(struct ptlrpc_service *svc)
1886 {
1887         int     rc;
1888         int     i;
1889
1890         struct lprocfs_vars nrs_orr_lprocfs_vars[] = {
1891                 { .name         = "nrs_orr_quantum",
1892                   .read_fptr    = ptlrpc_lprocfs_rd_nrs_orr_quantum,
1893                   .write_fptr   = ptlrpc_lprocfs_wr_nrs_orr_quantum },
1894                 { .name         = "nrs_orr_offset_type",
1895                   .read_fptr    = ptlrpc_lprocfs_rd_nrs_orr_offset_type,
1896                   .write_fptr   = ptlrpc_lprocfs_wr_nrs_orr_offset_type },
1897                 { .name         = "nrs_orr_supported",
1898                   .read_fptr    = ptlrpc_lprocfs_rd_nrs_orr_supported,
1899                   .write_fptr   = ptlrpc_lprocfs_wr_nrs_orr_supported },
1900                 { NULL }
1901         };
1902
1903         if (svc->srv_procroot == NULL)
1904                 return 0;
1905
1906         lprocfs_orr_data.svc = svc;
1907
1908         for (i = 0; i < ARRAY_SIZE(nrs_orr_lprocfs_vars); i++)
1909                 nrs_orr_lprocfs_vars[i].data = &lprocfs_orr_data;
1910
1911         rc = lprocfs_add_vars(svc->srv_procroot, nrs_orr_lprocfs_vars, NULL);
1912
1913         return rc;
1914 }
1915
1916 void nrs_orr_lprocfs_fini(struct ptlrpc_service *svc)
1917 {
1918         if (svc->srv_procroot == NULL)
1919                 return;
1920
1921         lprocfs_remove_proc_entry("nrs_orr_quantum", svc->srv_procroot);
1922         lprocfs_remove_proc_entry("nrs_orr_offset_type", svc->srv_procroot);
1923         lprocfs_remove_proc_entry("nrs_orr_supported", svc->srv_procroot);
1924 }
1925
1926 #endif /* LPROCFS */
1927
1928 static const struct ptlrpc_nrs_pol_ops nrs_orr_ops = {
1929         .op_policy_init         = nrs_orr_init,
1930         .op_policy_start        = nrs_orr_start,
1931         .op_policy_stop         = nrs_orr_stop,
1932         .op_policy_ctl          = nrs_orr_ctl,
1933         .op_res_get             = nrs_orr_res_get,
1934         .op_res_put             = nrs_orr_res_put,
1935         .op_req_get             = nrs_orr_req_get,
1936         .op_req_enqueue         = nrs_orr_req_add,
1937         .op_req_dequeue         = nrs_orr_req_del,
1938         .op_req_stop            = nrs_orr_req_stop,
1939 #ifdef LPROCFS
1940         .op_lprocfs_init        = nrs_orr_lprocfs_init,
1941         .op_lprocfs_fini        = nrs_orr_lprocfs_fini,
1942 #endif
1943 };
1944
1945 struct ptlrpc_nrs_pol_conf nrs_conf_orr = {
1946         .nc_name                = NRS_POL_NAME_ORR,
1947         .nc_ops                 = &nrs_orr_ops,
1948         .nc_compat              = nrs_policy_compat_one,
1949         .nc_compat_svc_name     = "ost_io",
1950 };
1951
1952 /**
1953  * TRR, Target-based Round Robin policy
1954  *
1955  * TRR reuses much of the functions and data structures of ORR
1956  */
1957
1958 #ifdef LPROCFS
1959
1960 int nrs_trr_lprocfs_init(struct ptlrpc_service *svc)
1961 {
1962         int     rc;
1963         int     i;
1964
1965         struct lprocfs_vars nrs_trr_lprocfs_vars[] = {
1966                 { .name         = "nrs_trr_quantum",
1967                   .read_fptr    = ptlrpc_lprocfs_rd_nrs_orr_quantum,
1968                   .write_fptr   = ptlrpc_lprocfs_wr_nrs_orr_quantum },
1969                 { .name         = "nrs_trr_offset_type",
1970                   .read_fptr    = ptlrpc_lprocfs_rd_nrs_orr_offset_type,
1971                   .write_fptr   = ptlrpc_lprocfs_wr_nrs_orr_offset_type },
1972                 { .name         = "nrs_trr_supported",
1973                   .read_fptr    = ptlrpc_lprocfs_rd_nrs_orr_supported,
1974                   .write_fptr   = ptlrpc_lprocfs_wr_nrs_orr_supported },
1975                 { NULL }
1976         };
1977
1978         if (svc->srv_procroot == NULL)
1979                 return 0;
1980
1981         lprocfs_trr_data.svc = svc;
1982
1983         for (i = 0; i < ARRAY_SIZE(nrs_trr_lprocfs_vars); i++)
1984                 nrs_trr_lprocfs_vars[i].data = &lprocfs_trr_data;
1985
1986         rc = lprocfs_add_vars(svc->srv_procroot, nrs_trr_lprocfs_vars, NULL);
1987
1988         return rc;
1989 }
1990
1991 void nrs_trr_lprocfs_fini(struct ptlrpc_service *svc)
1992 {
1993         if (svc->srv_procroot == NULL)
1994                 return;
1995
1996         lprocfs_remove_proc_entry("nrs_trr_quantum", svc->srv_procroot);
1997         lprocfs_remove_proc_entry("nrs_trr_offset_type", svc->srv_procroot);
1998         lprocfs_remove_proc_entry("nrs_trr_supported", svc->srv_procroot);
1999 }
2000
2001 #endif /* LPROCFS */
2002
2003 /**
2004  * Reuse much of the ORR functionality for TRR.
2005  */
2006 static const struct ptlrpc_nrs_pol_ops nrs_trr_ops = {
2007         .op_policy_init         = nrs_orr_init,
2008         .op_policy_start        = nrs_orr_start,
2009         .op_policy_stop         = nrs_orr_stop,
2010         .op_policy_ctl          = nrs_orr_ctl,
2011         .op_res_get             = nrs_orr_res_get,
2012         .op_res_put             = nrs_orr_res_put,
2013         .op_req_get             = nrs_orr_req_get,
2014         .op_req_enqueue         = nrs_orr_req_add,
2015         .op_req_dequeue         = nrs_orr_req_del,
2016         .op_req_stop            = nrs_orr_req_stop,
2017 #ifdef LPROCFS
2018         .op_lprocfs_init        = nrs_trr_lprocfs_init,
2019         .op_lprocfs_fini        = nrs_trr_lprocfs_fini,
2020 #endif
2021 };
2022
2023 struct ptlrpc_nrs_pol_conf nrs_conf_trr = {
2024         .nc_name                = NRS_POL_NAME_TRR,
2025         .nc_ops                 = &nrs_trr_ops,
2026         .nc_compat              = nrs_policy_compat_one,
2027         .nc_compat_svc_name     = "ost_io",
2028 };
2029
2030 /** @} ORR/TRR policy */
2031
2032 /** @} nrs */
2033
2034 #endif /* HAVE_SERVER_SUPPORT */