Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / ptlrpc / nrs_orr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2017, Intel Corporation.
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs_orr.c
29  *
30  * Network Request Scheduler (NRS) ORR and TRR policies
31  *
32  * Request scheduling in a Round-Robin manner over backend-fs objects and OSTs
33  * respectively
34  *
35  * Author: Liang Zhen <liang@whamcloud.com>
36  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
37  */
38
39 /**
40  * \addtogoup nrs
41  * @{
42  */
43 #define DEBUG_SUBSYSTEM S_RPC
44 #include <linux/delay.h>
45
46 #include <obd_support.h>
47 #include <obd_class.h>
48 #include <lustre_net.h>
49 #include <lustre_req_layout.h>
50 #include <lustre_compat.h>
51 #include "ptlrpc_internal.h"
52
53 /**
54  * \name ORR/TRR policy
55  *
56  * ORR/TRR (Object-based Round Robin/Target-based Round Robin) NRS policies
57  *
58  * ORR performs batched Round Robin shceduling of brw RPCs, based on the FID of
59  * the backend-fs object that the brw RPC pertains to; the TRR policy performs
60  * batched Round Robin scheduling of brw RPCs, based on the OST index that the
61  * RPC pertains to. Both policies also order RPCs in each batch in ascending
62  * offset order, which is lprocfs-tunable between logical file offsets, and
63  * physical disk offsets, as reported by fiemap.
64  *
65  * The TRR policy reuses much of the functionality of ORR. These two scheduling
66  * algorithms could alternatively be implemented under a single NRS policy, that
67  * uses an lprocfs tunable in order to switch between the two types of
68  * scheduling behaviour. The two algorithms have been implemented as separate
69  * policies for reasons of clarity to the user, and to avoid issues that would
70  * otherwise arise at the point of switching between behaviours in the case of
71  * having a single policy, such as resource cleanup for nrs_orr_object
72  * instances. It is possible that this may need to be re-examined in the future,
73  * along with potentially coalescing other policies that perform batched request
74  * scheduling in a Round-Robin manner, all into one policy.
75  *
76  * @{
77  */
78
79 #define NRS_POL_NAME_ORR        "orr"
80 #define NRS_POL_NAME_TRR        "trr"
81
82 /**
83  * Checks if the RPC type of \a nrq is currently handled by an ORR/TRR policy
84  *
85  * \param[in]  orrd   the ORR/TRR policy scheduler instance
86  * \param[in]  nrq    the request
87  * \param[out] opcode the opcode is saved here, just in order to avoid calling
88  *                    lustre_msg_get_opc() again later
89  *
90  * \retval true  request type is supported by the policy instance
91  * \retval false request type is not supported by the policy instance
92  */
93 static bool nrs_orr_req_supported(struct nrs_orr_data *orrd,
94                                   struct ptlrpc_nrs_request *nrq, __u32 *opcode)
95 {
96         struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
97                                                    rq_nrq);
98         __u32                   opc = lustre_msg_get_opc(req->rq_reqmsg);
99         bool                    rc = false;
100
101         /**
102          * XXX: nrs_orr_data::od_supp accessed unlocked.
103          */
104         switch (opc) {
105         case OST_READ:
106                 rc = orrd->od_supp & NOS_OST_READ;
107                 break;
108         case OST_WRITE:
109                 rc = orrd->od_supp & NOS_OST_WRITE;
110                 break;
111         }
112
113         if (rc)
114                 *opcode = opc;
115
116         return rc;
117 }
118
119 /**
120  * Returns the ORR/TRR key fields for the request \a nrq in \a key.
121  *
122  * \param[in]  orrd the ORR/TRR policy scheduler instance
123  * \param[in]  nrq  the request
124  * \param[in]  opc  the request's opcode
125  * \param[in]  name the policy name
126  * \param[out] key  fields of the key are returned here.
127  *
128  * \retval 0   key filled successfully
129  * \retval < 0 error
130  */
131 static int nrs_orr_key_fill(struct nrs_orr_data *orrd,
132                             struct ptlrpc_nrs_request *nrq, __u32 opc,
133                             char *name, struct nrs_orr_key *key)
134 {
135         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
136                                                   rq_nrq);
137         struct ost_body *body;
138         u32 ost_idx;
139         int rc;
140
141         LASSERT(req != NULL);
142
143         /**
144          * This is an attempt to fill in the request key fields while
145          * moving a request from the regular to the high-priority NRS
146          * head (via ldlm_lock_reorder_req()), but the request key has
147          * been adequately filled when nrs_orr_res_get() was called through
148          * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR
149          * policy, so there is nothing to do.
150          */
151         if (nrq->nr_u.orr.or_orr_set) {
152                 *key = nrq->nr_u.orr.or_key;
153                 return 0;
154         }
155
156         /* Bounce unconnected requests to the default policy. */
157         if (req->rq_export == NULL)
158                 return -ENOTCONN;
159
160         ost_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
161
162         /**
163          * The request pill for OST_READ and OST_WRITE requests is
164          * initialized in the ost_io service's
165          * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(),
166          * so no need to redo it here.
167          */
168         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
169         if (!body)
170                 RETURN(-EFAULT);
171
172         rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx);
173         if (rc < 0)
174                 return rc;
175
176         nrq->nr_u.orr.or_orr_set = 1;
177
178         return 0;
179 }
180
181 /**
182  * Populates the range values in \a range with logical offsets obtained via
183  * \a nb.
184  *
185  * \param[in]  nb       niobuf_remote struct array for this request
186  * \param[in]  niocount count of niobuf_remote structs for this request
187  * \param[out] range    the offset range is returned here
188  */
189 static void nrs_orr_range_fill_logical(struct niobuf_remote *nb, int niocount,
190                                        struct nrs_orr_req_range *range)
191 {
192         /* Should we do this at page boundaries ? */
193         range->or_start = nb[0].rnb_offset & PAGE_MASK;
194         range->or_end = (nb[niocount - 1].rnb_offset +
195                          nb[niocount - 1].rnb_len - 1) | ~PAGE_MASK;
196 }
197
198 /**
199  * We obtain information just for a single extent, as the request can only be in
200  * a single place in the binary heap anyway.
201  */
202 #define ORR_NUM_EXTENTS 1
203
204 /**
205  * Converts the logical file offset range in \a range, to a physical disk offset
206  * range in \a range, for a request. Uses obd_get_info() in order to carry out a
207  * fiemap call and obtain backend-fs extent information. The returned range is
208  * in physical block numbers.
209  *
210  * \param[in]     nrq   the request
211  * \param[in]     oa    obdo struct for this request
212  * \param[in,out] range the offset range in bytes; logical range in, physical
213  *                      range out
214  *
215  * \retval 0    physical offsets obtained successfully
216  * \retvall < 0 error
217  */
218 static int nrs_orr_range_fill_physical(struct ptlrpc_nrs_request *nrq,
219                                        struct obdo *oa,
220                                        struct nrs_orr_req_range *range)
221 {
222         struct ptlrpc_request     *req = container_of(nrq,
223                                                       struct ptlrpc_request,
224                                                       rq_nrq);
225         char                       fiemap_buf[offsetof(struct fiemap,
226                                                   fm_extents[ORR_NUM_EXTENTS])];
227         struct fiemap              *fiemap = (struct fiemap *)fiemap_buf;
228         struct ll_fiemap_info_key  key;
229         loff_t                     start;
230         loff_t                     end;
231         int                        rc;
232
233         key = (typeof(key)) {
234                 .lfik_name = KEY_FIEMAP,
235                 .lfik_oa = *oa,
236                 .lfik_fiemap = {
237                         .fm_start = range->or_start,
238                         .fm_length = range->or_end - range->or_start,
239                         .fm_extent_count = ORR_NUM_EXTENTS
240                 }
241         };
242
243         rc = obd_get_info(req->rq_svc_thread->t_env, req->rq_export,
244                           sizeof(key), &key, NULL, fiemap);
245         if (rc < 0)
246                 GOTO(out, rc);
247
248         if (fiemap->fm_mapped_extents == 0 ||
249             fiemap->fm_mapped_extents > ORR_NUM_EXTENTS)
250                 GOTO(out, rc = -EFAULT);
251
252         /**
253          * Calculate the physical offset ranges for the request from the extent
254          * information and the logical request offsets.
255          */
256         start = fiemap->fm_extents[0].fe_physical + range->or_start -
257                 fiemap->fm_extents[0].fe_logical;
258         end = start + range->or_end - range->or_start;
259
260         range->or_start = start;
261         range->or_end = end;
262
263         nrq->nr_u.orr.or_physical_set = 1;
264 out:
265         return rc;
266 }
267
268 /**
269  * Sets the offset range the request covers; either in logical file
270  * offsets or in physical disk offsets.
271  *
272  * \param[in] nrq        the request
273  * \param[in] orrd       the ORR/TRR policy scheduler instance
274  * \param[in] opc        the request's opcode
275  * \param[in] moving_req is the request in the process of moving onto the
276  *                       high-priority NRS head?
277  *
278  * \retval 0    range filled successfully
279  * \retval != 0 error
280  */
281 static int nrs_orr_range_fill(struct ptlrpc_nrs_request *nrq,
282                               struct nrs_orr_data *orrd, __u32 opc,
283                               bool moving_req)
284 {
285         struct ptlrpc_request       *req = container_of(nrq,
286                                                         struct ptlrpc_request,
287                                                         rq_nrq);
288         struct obd_ioobj            *ioo;
289         struct niobuf_remote        *nb;
290         struct ost_body             *body;
291         struct nrs_orr_req_range     range;
292         int                          niocount;
293         int                          rc = 0;
294
295         /**
296          * If we are scheduling using physical disk offsets, but we have filled
297          * the offset information in the request previously
298          * (i.e. ldlm_lock_reorder_req() is moving the request to the
299          * high-priority NRS head), there is no need to do anything, and we can
300          * exit. Moreover than the lack of need, we would be unable to perform
301          * the obd_get_info() call required in nrs_orr_range_fill_physical(),
302          * because ldlm_lock_reorder_lock() calls into here while holding a
303          * spinlock, and retrieving fiemap information via obd_get_info() is a
304          * potentially sleeping operation.
305          */
306         if (orrd->od_physical && nrq->nr_u.orr.or_physical_set)
307                 return 0;
308
309         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
310         if (ioo == NULL)
311                 GOTO(out, rc = -EFAULT);
312
313         niocount = ioo->ioo_bufcnt;
314
315         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
316         if (nb == NULL)
317                 GOTO(out, rc = -EFAULT);
318
319         /**
320          * Use logical information from niobuf_remote structures.
321          */
322         nrs_orr_range_fill_logical(nb, niocount, &range);
323
324         /**
325          * Obtain physical offsets if selected, and this is an OST_READ RPC
326          * RPC. We do not enter this block if moving_req is set which indicates
327          * that the request is being moved to the high-priority NRS head by
328          * ldlm_lock_reorder_req(), as that function calls in here while holding
329          * a spinlock, and nrs_orr_range_physical() can sleep, so we just use
330          * logical file offsets for the range values for such requests.
331          */
332         if (orrd->od_physical && opc == OST_READ && !moving_req) {
333                 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
334                 if (body == NULL)
335                         GOTO(out, rc = -EFAULT);
336
337                 /**
338                  * Translate to physical block offsets from backend filesystem
339                  * extents.
340                  * Ignore return values; if obtaining the physical offsets
341                  * fails, use the logical offsets.
342                  */
343                 nrs_orr_range_fill_physical(nrq, &body->oa, &range);
344         }
345
346         nrq->nr_u.orr.or_range = range;
347 out:
348         return rc;
349 }
350
351 /**
352  * Generates a character string that can be used in order to register uniquely
353  * named slab objects for ORR/TRR policy instances. The character string is
354  * unique per policy instance, as it includes the policy's name, the CPT number,
355  * and a {reg|hp} token, and there is one policy instance per NRS head on each
356  * CPT, and the policy is only compatible with the ost_io service.
357  *
358  * \param[in] policy the policy instance
359  * \param[out] name  the character array that will hold the generated name
360  */
361 static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name)
362 {
363         snprintf(name, NRS_ORR_OBJ_NAME_MAX, "%s%s%s%d",
364                  "nrs_", policy->pol_desc->pd_name,
365                  policy->pol_nrs->nrs_queue_type == PTLRPC_NRS_QUEUE_REG ?
366                  "_reg_" : "_hp_", nrs_pol2cptid(policy));
367 }
368
369 /**
370  * ORR/TRR hash operations
371  */
372 static u32 nrs_orr_hashfn(const void *data, u32 len, u32 seed)
373 {
374         const struct nrs_orr_key *key = data;
375
376         seed = cfs_hash_32(seed ^ key->ok_fid.f_oid, 32);
377         seed ^= cfs_hash_64(key->ok_fid.f_seq, 32);
378         return seed;
379 }
380
381 static int nrs_orr_cmpfn(struct rhashtable_compare_arg *arg, const void *obj)
382 {
383         const struct nrs_orr_object *orro = obj;
384         const struct nrs_orr_key *key = arg->key;
385
386         return lu_fid_eq(&orro->oo_key.ok_fid, &key->ok_fid) ? 0 : -ESRCH;
387 }
388
389 static void nrs_orr_hash_exit(void *vobj, void *data)
390 {
391         struct nrs_orr_object *orro = vobj;
392         struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent,
393                                                  struct nrs_orr_data, od_res);
394
395         /* We shouldn't reach here but just in case. nrs_xxx_res_put
396          * should of have freed orro.
397          */
398         LASSERTF(refcount_read(&orro->oo_ref) == 0,
399                  "Busy NRS ORR policy object for OST with index %u, with %d refs\n",
400                  orro->oo_key.ok_idx, refcount_read(&orro->oo_ref));
401
402         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
403 }
404
405 static const struct rhashtable_params nrs_orr_hash_params = {
406         .key_len        = sizeof(struct lu_fid),
407         .key_offset     = offsetof(struct nrs_orr_object, oo_key),
408         .head_offset    = offsetof(struct nrs_orr_object, oo_rhead),
409         .hashfn         = nrs_orr_hashfn,
410         .obj_cmpfn      = nrs_orr_cmpfn,
411 };
412
413 #define NRS_ORR_QUANTUM_DFLT    256
414
415 /**
416  * Binary heap predicate.
417  *
418  * Uses
419  * ptlrpc_nrs_request::nr_u::orr::or_round,
420  * ptlrpc_nrs_request::nr_u::orr::or_sequence, and
421  * ptlrpc_nrs_request::nr_u::orr::or_range to compare two binheap nodes and
422  * produce a binary predicate that indicates their relative priority, so that
423  * the binary heap can perform the necessary sorting operations.
424  *
425  * \param[in] e1 the first binheap node to compare
426  * \param[in] e2 the second binheap node to compare
427  *
428  * \retval 0 e1 > e2
429  * \retval 1 e1 < e2
430  */
431 static int
432 orr_req_compare(struct binheap_node *e1, struct binheap_node *e2)
433 {
434         struct ptlrpc_nrs_request *nrq1;
435         struct ptlrpc_nrs_request *nrq2;
436
437         nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
438         nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
439
440         /**
441          * Requests have been scheduled against a different scheduling round.
442          */
443         if (nrq1->nr_u.orr.or_round < nrq2->nr_u.orr.or_round)
444                 return 1;
445         else if (nrq1->nr_u.orr.or_round > nrq2->nr_u.orr.or_round)
446                 return 0;
447
448         /**
449          * Requests have been scheduled against the same scheduling round, but
450          * belong to a different batch, i.e. they pertain to a different
451          * backend-fs object (for ORR policy instances) or OST (for TRR policy
452          * instances).
453          */
454         if (nrq1->nr_u.orr.or_sequence < nrq2->nr_u.orr.or_sequence)
455                 return 1;
456         else if (nrq1->nr_u.orr.or_sequence > nrq2->nr_u.orr.or_sequence)
457                 return 0;
458
459         /**
460          * If round numbers and sequence numbers are equal, the two requests
461          * have been scheduled on the same round, and belong to the same batch,
462          * which means they pertain to the same backend-fs object (if this is an
463          * ORR policy instance), or to the same OST (if this is a TRR policy
464          * instance), so these requests should be sorted by ascending offset
465          * order.
466          */
467         if (nrq1->nr_u.orr.or_range.or_start <
468             nrq2->nr_u.orr.or_range.or_start) {
469                 return 1;
470         } else if (nrq1->nr_u.orr.or_range.or_start >
471                  nrq2->nr_u.orr.or_range.or_start) {
472                 return 0;
473         } else {
474                 /**
475                  * Requests start from the same offset; Dispatch the shorter one
476                  * first; perhaps slightly more chances of hitting caches like
477                  * this.
478                  */
479                 return nrq1->nr_u.orr.or_range.or_end <
480                        nrq2->nr_u.orr.or_range.or_end;
481         }
482 }
483
484 /**
485  * ORR binary heap operations
486  */
487 static struct binheap_ops nrs_orr_heap_ops = {
488         .hop_enter      = NULL,
489         .hop_exit       = NULL,
490         .hop_compare    = orr_req_compare,
491 };
492
493 /**
494  * Prints a warning message if an ORR/TRR policy is started on a service with
495  * more than one CPT.  Not printed on the console for now, since we don't
496  * have any performance metrics in the first place, and it is annoying.
497  *
498  * \param[in] policy the policy instance
499  *
500  * \retval 0 success
501  */
502 static int nrs_orr_init(struct ptlrpc_nrs_policy *policy)
503 {
504         if (policy->pol_nrs->nrs_svcpt->scp_service->srv_ncpts > 1)
505                 CDEBUG(D_CONFIG, "%s: The %s NRS policy was registered on a "
506                       "service with multiple service partitions. This policy "
507                       "may perform better with a single partition.\n",
508                       policy->pol_nrs->nrs_svcpt->scp_service->srv_name,
509                       policy->pol_desc->pd_name);
510
511         return 0;
512 }
513
514 /**
515  * Called when an ORR policy instance is started.
516  *
517  * \param[in] policy the policy
518  *
519  * \retval -ENOMEM OOM error
520  * \retval 0       success
521  */
522 static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg)
523 {
524         struct nrs_orr_data    *orrd;
525         int                     rc = 0;
526         ENTRY;
527
528         OBD_CPT_ALLOC_PTR(orrd, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
529         if (orrd == NULL)
530                 RETURN(-ENOMEM);
531
532         /*
533          * Binary heap instance for sorted incoming requests.
534          */
535         orrd->od_binheap = binheap_create(&nrs_orr_heap_ops,
536                                           CBH_FLAG_ATOMIC_GROW, 4096, NULL,
537                                           nrs_pol2cptab(policy),
538                                           nrs_pol2cptid(policy));
539         if (orrd->od_binheap == NULL)
540                 GOTO(out_orrd, rc = -ENOMEM);
541
542         nrs_orr_genobjname(policy, orrd->od_objname);
543
544         /**
545          * Slab cache for NRS ORR/TRR objects.
546          */
547         orrd->od_cache = kmem_cache_create(orrd->od_objname,
548                                            sizeof(struct nrs_orr_object),
549                                            0, 0, NULL);
550         if (orrd->od_cache == NULL)
551                 GOTO(out_binheap, rc = -ENOMEM);
552
553         /**
554          * Use a hash for finding objects by struct nrs_orr_key.
555          * For TRR we use Xarray instead since items are resolved
556          * using the OST indices, and they will stay relatively
557          * stable during an OSS node's lifetime.
558          */
559         if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
560                     NRS_POL_NAME_MAX) == 0) {
561                 rc = rhashtable_init(&orrd->od_obj_hash, &nrs_orr_hash_params);
562                 if (rc < 0)
563                         GOTO(out_cache, rc);
564         } else {
565                 xa_init(&orrd->od_trr_objs);
566         }
567
568         /* XXX: Fields accessed unlocked */
569         orrd->od_quantum = NRS_ORR_QUANTUM_DFLT;
570         orrd->od_supp = NOS_DFLT;
571         orrd->od_physical = true;
572         /**
573          * Set to 1 so that the test inside nrs_orr_req_add() can evaluate to
574          * true.
575          */
576         orrd->od_sequence = 1;
577
578         policy->pol_private = orrd;
579
580         RETURN(rc);
581
582 out_cache:
583         kmem_cache_destroy(orrd->od_cache);
584 out_binheap:
585         binheap_destroy(orrd->od_binheap);
586 out_orrd:
587         OBD_FREE_PTR(orrd);
588
589         RETURN(rc);
590 }
591
592 /**
593  * Called when an ORR/TRR policy instance is stopped.
594  *
595  * Called when the policy has been instructed to transition to the
596  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more
597  * pending requests to serve.
598  *
599  * \param[in] policy the policy
600  */
601 static void nrs_orr_stop(struct ptlrpc_nrs_policy *policy)
602 {
603         struct nrs_orr_data *orrd = policy->pol_private;
604         ENTRY;
605
606         LASSERT(orrd != NULL);
607         LASSERT(orrd->od_binheap != NULL);
608         LASSERT(orrd->od_cache != NULL);
609         LASSERT(binheap_is_empty(orrd->od_binheap));
610
611         binheap_destroy(orrd->od_binheap);
612         if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_TRR,
613                     NRS_POL_NAME_MAX) == 0) {
614                 struct nrs_orr_object *orro;
615                 unsigned long i;
616
617                 xa_for_each(&orrd->od_trr_objs, i, orro) {
618                         xa_erase(&orrd->od_trr_objs, i);
619                         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
620                 }
621                 xa_destroy(&orrd->od_trr_objs);
622         } else {
623                 rhashtable_free_and_destroy(&orrd->od_obj_hash,
624                                             nrs_orr_hash_exit, NULL);
625         }
626         rcu_barrier();
627         kmem_cache_destroy(orrd->od_cache);
628
629         OBD_FREE_PTR(orrd);
630 }
631
632 /**
633  * Performs a policy-specific ctl function on ORR/TRR policy instances; similar
634  * to ioctl.
635  *
636  * \param[in]     policy the policy instance
637  * \param[in]     opc    the opcode
638  * \param[in,out] arg    used for passing parameters and information
639  *
640  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
641  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
642  *
643  * \retval 0   operation carried successfully
644  * \retval -ve error
645  */
646 static int nrs_orr_ctl(struct ptlrpc_nrs_policy *policy,
647                        enum ptlrpc_nrs_ctl opc, void *arg)
648 {
649         assert_spin_locked(&policy->pol_nrs->nrs_lock);
650
651         switch (opc) {
652         default:
653                 RETURN(-EINVAL);
654
655         case NRS_CTL_ORR_RD_QUANTUM: {
656                 struct nrs_orr_data     *orrd = policy->pol_private;
657
658                 *(__u16 *)arg = orrd->od_quantum;
659                 }
660                 break;
661
662         case NRS_CTL_ORR_WR_QUANTUM: {
663                 struct nrs_orr_data     *orrd = policy->pol_private;
664
665                 orrd->od_quantum = *(__u16 *)arg;
666                 LASSERT(orrd->od_quantum != 0);
667                 }
668                 break;
669
670         case NRS_CTL_ORR_RD_OFF_TYPE: {
671                 struct nrs_orr_data     *orrd = policy->pol_private;
672
673                 *(bool *)arg = orrd->od_physical;
674                 }
675                 break;
676
677         case NRS_CTL_ORR_WR_OFF_TYPE: {
678                 struct nrs_orr_data     *orrd = policy->pol_private;
679
680                 orrd->od_physical = *(bool *)arg;
681                 }
682                 break;
683
684         case NRS_CTL_ORR_RD_SUPP_REQ: {
685                 struct nrs_orr_data     *orrd = policy->pol_private;
686
687                 *(enum nrs_orr_supp *)arg = orrd->od_supp;
688                 }
689                 break;
690
691         case NRS_CTL_ORR_WR_SUPP_REQ: {
692                 struct nrs_orr_data     *orrd = policy->pol_private;
693
694                 orrd->od_supp = *(enum nrs_orr_supp *)arg;
695                 LASSERT((orrd->od_supp & NOS_OST_RW) != 0);
696                 }
697                 break;
698         }
699         RETURN(0);
700 }
701
702 /**
703  * Obtains resources for ORR/TRR policy instances. The top-level resource lives
704  * inside \e nrs_orr_data and the second-level resource inside
705  * \e nrs_orr_object instances.
706  *
707  * \param[in]  policy     the policy for which resources are being taken for
708  *                        request \a nrq
709  * \param[in]  nrq        the request for which resources are being taken
710  * \param[in]  parent     parent resource, embedded in nrs_orr_data for the
711  *                        ORR/TRR policies
712  * \param[out] resp       used to return resource references
713  * \param[in]  moving_req signifies limited caller context; used to perform
714  *                        memory allocations in an atomic context in this
715  *                        policy
716  *
717  * \retval 0   we are returning a top-level, parent resource, one that is
718  *             embedded in an nrs_orr_data object
719  * \retval 1   we are returning a bottom-level resource, one that is embedded
720  *             in an nrs_orr_object object
721  *
722  * \see nrs_resource_get_safe()
723  */
724 static int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
725                            struct ptlrpc_nrs_request *nrq,
726                            const struct ptlrpc_nrs_resource *parent,
727                            struct ptlrpc_nrs_resource **resp, bool moving_req)
728 {
729         struct nrs_orr_data *orrd;
730         struct nrs_orr_object *orro, *new_orro;
731         struct nrs_orr_key key = { { { 0 } } };
732         u32 opc;
733         int rc = 0;
734
735         /**
736          * struct nrs_orr_data is requested.
737          */
738         if (parent == NULL) {
739                 *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res;
740                 return 0;
741         }
742
743         orrd = container_of(parent, struct nrs_orr_data, od_res);
744
745         /**
746          * If the request type is not supported, fail the enqueuing; the RPC
747          * will be handled by the fallback NRS policy.
748          */
749         if (!nrs_orr_req_supported(orrd, nrq, &opc))
750                 return -1;
751
752         /**
753          * Fill in the key for the request; OST FID for ORR policy instances,
754          * and OST index for TRR policy instances.
755          */
756         rc = nrs_orr_key_fill(orrd, nrq, opc, policy->pol_desc->pd_name, &key);
757         if (rc < 0)
758                 RETURN(rc);
759
760         /**
761          * Set the offset range the request covers
762          */
763         rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req);
764         if (rc < 0)
765                 RETURN(rc);
766
767         /* Handle the ORR case which involves looking up the orro in the
768          * hashtable. If not found then insert it. Unlike TRR the orro can
769          * be deleted in parallel during the life cycle of the object.
770          */
771         rcu_read_lock();
772         orro = rhashtable_lookup_fast(&orrd->od_obj_hash, &key,
773                                       nrs_orr_hash_params);
774         if (orro && refcount_inc_not_zero(&orro->oo_ref))
775                 goto found_orro;
776
777         rcu_read_unlock();
778
779         OBD_SLAB_CPT_ALLOC_PTR_GFP(new_orro, orrd->od_cache,
780                                    nrs_pol2cptab(policy), nrs_pol2cptid(policy),
781                                    moving_req ? GFP_ATOMIC : GFP_NOFS);
782         if (new_orro == NULL)
783                 RETURN(-ENOMEM);
784
785         new_orro->oo_key = key;
786         refcount_set(&new_orro->oo_ref, 1);
787 try_again:
788         rcu_read_lock();
789         orro = rhashtable_lookup_get_insert_fast(&orrd->od_obj_hash,
790                                                   &new_orro->oo_rhead,
791                                                   nrs_orr_hash_params);
792         /* insertion sucessfull */
793         if (likely(orro == NULL)) {
794                 orro = new_orro;
795                 goto found_orro;
796         }
797
798         /* A returned non-error orro means it already exist */
799         rc = IS_ERR(orro) ? PTR_ERR(orro) : 0;
800         if (!rc && refcount_inc_not_zero(&orro->oo_ref)) {
801                 OBD_SLAB_FREE_PTR(new_orro, orrd->od_cache);
802                 goto found_orro;
803         }
804         rcu_read_unlock();
805
806         /* oo_ref == 0, orro will be freed */
807         if (!rc)
808                 goto try_again;
809
810         /* hash table could be resizing. */
811         if (rc == -ENOMEM || rc == -EBUSY) {
812                 mdelay(20);
813                 goto try_again;
814         }
815         OBD_SLAB_FREE_PTR(new_orro, orrd->od_cache);
816
817         RETURN(rc);
818
819 found_orro:
820         rcu_read_unlock();
821         /**
822          * For debugging purposes
823          */
824         nrq->nr_u.orr.or_key = orro->oo_key;
825
826         *resp = &orro->oo_res;
827
828         return 1;
829 }
830
831 static void nrs_orr_object_free(struct rcu_head *head)
832 {
833         struct nrs_orr_object *orro = container_of(head, struct nrs_orr_object,
834                                                    oo_rcu_head);
835         struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent,
836                                                  struct nrs_orr_data, od_res);
837
838         OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
839 }
840
841 /**
842  * Called when releasing references to the resource hierachy obtained for a
843  * request for scheduling using ORR/TRR policy instances
844  *
845  * \param[in] policy   the policy the resource belongs to
846  * \param[in] res      the resource to be released
847  */
848 static void nrs_orr_res_put(struct ptlrpc_nrs_policy *policy,
849                             const struct ptlrpc_nrs_resource *res)
850 {
851         struct nrs_orr_data     *orrd;
852         struct nrs_orr_object   *orro;
853
854         /**
855          * Do nothing for freeing parent, nrs_orr_data resources.
856          */
857         if (res->res_parent == NULL)
858                 return;
859
860         orro = container_of(res, struct nrs_orr_object, oo_res);
861         if (!refcount_dec_and_test(&orro->oo_ref))
862                 return;
863
864         orrd = container_of(orro->oo_res.res_parent, struct nrs_orr_data, od_res);
865         rhashtable_remove_fast(&orrd->od_obj_hash, &orro->oo_rhead,
866                                nrs_orr_hash_params);
867         call_rcu(&orro->oo_rcu_head, nrs_orr_object_free);
868 }
869
870 /**
871  * Obtains resources for TRR policy instances. The top-level resource lives
872  * inside \e nrs_orr_data and the second-level resource inside
873  * \e nrs_orr_object instances.
874  *
875  * @policy      the policy for which resources are being taken for
876  *              request @nrq
877  * @nrq         the request for which resources are being taken
878  * @parent      parent resource, embedded in nrs_orr_data for the
879  *              TRR policies
880  * @resp        used to return resource references
881  * @moving_req  signifies limited caller context; used to perform
882  *              memory allocations in an atomic context in this
883  *              policy
884  *
885  * RETURN       0 we are returning a top-level, parent resource, one that is
886  *                embedded in an nrs_orr_data object
887  *              1 we are returning a bottom-level resource, one that is embedded
888  *                in an nrs_orr_object object
889  *
890  * \see nrs_resource_get_safe()
891  */
892 static int nrs_trr_res_get(struct ptlrpc_nrs_policy *policy,
893                            struct ptlrpc_nrs_request *nrq,
894                            const struct ptlrpc_nrs_resource *parent,
895                            struct ptlrpc_nrs_resource **resp, bool moving_req)
896 {
897         struct nrs_orr_key key = { { { 0 } } };
898         struct nrs_orr_object *orro;
899         struct nrs_orr_data *orrd;
900         int rc = 0;
901         u32 opc;
902
903         /**
904          * struct nrs_orr_data is requested.
905          */
906         if (!parent) {
907                 *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res;
908                 return 0;
909         }
910
911         orrd = container_of(parent, struct nrs_orr_data, od_res);
912
913         /**
914          * If the request type is not supported, fail the enqueuing; the RPC
915          * will be handled by the fallback NRS policy.
916          */
917         if (!nrs_orr_req_supported(orrd, nrq, &opc))
918                 return -1;
919
920         /**
921          * This is an attempt to fill in the request key fields while
922          * moving a request from the regular to the high-priority NRS
923          * head (via ldlm_lock_reorder_req()), but the request key has
924          * been adequately filled when nrs_trr_res_get() was called through
925          * ptlrpc_nrs_req_initialize() for the regular NRS head's TRR
926          * policy, so there is nothing to do.
927          */
928         if (!nrq->nr_u.orr.or_trr_set) {
929                 struct ptlrpc_request *req;
930
931                 /* Bounce unconnected requests to the default policy. */
932                 req = container_of(nrq, struct ptlrpc_request, rq_nrq);
933                 if (!req->rq_export)
934                         return -ENOTCONN;
935
936                 key.ok_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
937                 nrq->nr_u.orr.or_trr_set = 1;
938         } else {
939                 key = nrq->nr_u.orr.or_key;
940         }
941
942         /**
943          * Set the offset range the request covers
944          */
945         rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req);
946         if (rc < 0)
947                 RETURN(rc);
948
949         /* For TRR we just attempt to find the orro via the ok_idx.
950          * If not found we insert it into the Xarray.
951          */
952 try_again:
953         orro = xa_load(&orrd->od_trr_objs, key.ok_idx);
954         if (orro)
955                 goto found_orro;
956
957         OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache,
958                                    nrs_pol2cptab(policy), nrs_pol2cptid(policy),
959                                    moving_req ? GFP_ATOMIC : GFP_NOFS);
960         if (!orro)
961                 RETURN(-ENOMEM);
962
963         orro->oo_key = key;
964         rc = ll_xa_insert(&orrd->od_trr_objs, key.ok_idx, orro,
965                        moving_req ? GFP_ATOMIC : GFP_NOFS);
966         if (rc < 0) {
967                 OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
968                 if (rc == -EBUSY)
969                         goto try_again;
970
971                 RETURN(rc);
972         }
973
974 found_orro:
975         /**
976          * For debugging purposes
977          */
978         nrq->nr_u.orr.or_key = orro->oo_key;
979
980         *resp = &orro->oo_res;
981
982         return 1;
983 }
984
985 /**
986  * Called when polling an ORR/TRR policy instance for a request so that it can
987  * be served. Returns the request that is at the root of the binary heap, as
988  * that is the lowest priority one (i.e. binheap is an implementation of a
989  * min-heap)
990  *
991  * \param[in] policy the policy instance being polled
992  * \param[in] peek   when set, signifies that we just want to examine the
993  *                   request, and not handle it, so the request is not removed
994  *                   from the policy.
995  * \param[in] force  force the policy to return a request; unused in this policy
996  *
997  * \retval the request to be handled
998  * \retval NULL no request available
999  *
1000  * \see ptlrpc_nrs_req_get_nolock()
1001  * \see nrs_request_get()
1002  */
1003 static
1004 struct ptlrpc_nrs_request *nrs_orr_req_get(struct ptlrpc_nrs_policy *policy,
1005                                            bool peek, bool force)
1006 {
1007         struct nrs_orr_data       *orrd = policy->pol_private;
1008         struct binheap_node       *node = binheap_root(orrd->od_binheap);
1009         struct ptlrpc_nrs_request *nrq;
1010
1011         nrq = unlikely(node == NULL) ? NULL :
1012               container_of(node, struct ptlrpc_nrs_request, nr_node);
1013
1014         if (likely(!peek && nrq != NULL)) {
1015                 struct nrs_orr_object *orro;
1016
1017                 orro = container_of(nrs_request_resource(nrq),
1018                                     struct nrs_orr_object, oo_res);
1019
1020                 LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
1021
1022                 binheap_remove(orrd->od_binheap, &nrq->nr_node);
1023                 orro->oo_active--;
1024
1025                 if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
1026                                  NRS_POL_NAME_MAX) == 0)
1027                         CDEBUG(D_RPCTRACE,
1028                                "NRS: starting to handle %s request for object "
1029                                "with FID "DFID", from OST with index %u, with "
1030                                "round %llu\n", NRS_POL_NAME_ORR,
1031                                PFID(&orro->oo_key.ok_fid),
1032                                nrq->nr_u.orr.or_key.ok_idx,
1033                                nrq->nr_u.orr.or_round);
1034                 else
1035                         CDEBUG(D_RPCTRACE,
1036                                "NRS: starting to handle %s request from OST "
1037                                "with index %u, with round %llu\n",
1038                                NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
1039                                nrq->nr_u.orr.or_round);
1040
1041                 /** Peek at the next request to be served */
1042                 node = binheap_root(orrd->od_binheap);
1043
1044                 /** No more requests */
1045                 if (unlikely(node == NULL)) {
1046                         orrd->od_round++;
1047                 } else {
1048                         struct ptlrpc_nrs_request *next;
1049
1050                         next = container_of(node, struct ptlrpc_nrs_request,
1051                                             nr_node);
1052
1053                         if (orrd->od_round < next->nr_u.orr.or_round)
1054                                 orrd->od_round = next->nr_u.orr.or_round;
1055                 }
1056         }
1057
1058         return nrq;
1059 }
1060
1061 /**
1062  * Sort-adds request \a nrq to an ORR/TRR \a policy instance's set of queued
1063  * requests in the policy's binary heap.
1064  *
1065  * A scheduling round is a stream of requests that have been sorted in batches
1066  * according to the backend-fs object (for ORR policy instances) or OST (for TRR
1067  * policy instances) that they pertain to (as identified by its IDIF FID or OST
1068  * index respectively); there can be only one batch for each object or OST in
1069  * each round. The batches are of maximum size nrs_orr_data:od_quantum. When a
1070  * new request arrives for scheduling for an object or OST that has exhausted
1071  * its quantum in its current round, the request will be scheduled on the next
1072  * scheduling round. Requests are allowed to be scheduled against a round until
1073  * all requests for the round are serviced, so an object or OST might miss a
1074  * round if requests are not scheduled for it for a long enough period of time.
1075  * Objects or OSTs that miss a round will continue with having their next
1076  * request scheduled, starting at the round that requests are being dispatched
1077  * for, at the time of arrival of this request.
1078  *
1079  * Requests are tagged with the round number and a sequence number; the sequence
1080  * number indicates the relative ordering amongst the batches of requests in a
1081  * round, and is identical for all requests in a batch, as is the round number.
1082  * The round and sequence numbers are used by orr_req_compare() in order to use
1083  * nrs_orr_data::od_binheap in order to maintain an ordered set of rounds, with
1084  * each round consisting of an ordered set of batches of requests, and each
1085  * batch consisting of an ordered set of requests according to their logical
1086  * file or physical disk offsets.
1087  *
1088  * \param[in] policy the policy
1089  * \param[in] nrq    the request to add
1090  *
1091  * \retval 0    request successfully added
1092  * \retval != 0 error
1093  */
1094 static int nrs_orr_req_add(struct ptlrpc_nrs_policy *policy,
1095                            struct ptlrpc_nrs_request *nrq)
1096 {
1097         struct nrs_orr_data     *orrd;
1098         struct nrs_orr_object   *orro;
1099         int                      rc;
1100
1101         orro = container_of(nrs_request_resource(nrq),
1102                             struct nrs_orr_object, oo_res);
1103         orrd = container_of(nrs_request_resource(nrq)->res_parent,
1104                             struct nrs_orr_data, od_res);
1105
1106         if (orro->oo_quantum == 0 || orro->oo_round < orrd->od_round ||
1107             (orro->oo_active == 0 && orro->oo_quantum > 0)) {
1108
1109                 /**
1110                  * If there are no pending requests for the object/OST, but some
1111                  * of its quantum still remains unused, which implies we did not
1112                  * get a chance to schedule up to its maximum allowed batch size
1113                  * of requests in the previous round this object/OST
1114                  * participated in, schedule this next request on a new round;
1115                  * this avoids fragmentation of request batches caused by
1116                  * intermittent inactivity on the object/OST, at the expense of
1117                  * potentially slightly increased service time for the request
1118                  * batch this request will be a part of.
1119                  */
1120                 if (orro->oo_active == 0 && orro->oo_quantum > 0)
1121                         orro->oo_round++;
1122
1123                 /** A new scheduling round has commenced */
1124                 if (orro->oo_round < orrd->od_round)
1125                         orro->oo_round = orrd->od_round;
1126
1127                 /** I was not the last object/OST that scheduled a request */
1128                 if (orro->oo_sequence < orrd->od_sequence)
1129                         orro->oo_sequence = ++orrd->od_sequence;
1130                 /**
1131                  * Reset the quantum if we have reached the maximum quantum
1132                  * size for this batch, or even if we have not managed to
1133                  * complete a batch size up to its maximum allowed size.
1134                  * XXX: Accessed unlocked
1135                  */
1136                 orro->oo_quantum = orrd->od_quantum;
1137         }
1138
1139         nrq->nr_u.orr.or_round = orro->oo_round;
1140         nrq->nr_u.orr.or_sequence = orro->oo_sequence;
1141
1142         rc = binheap_insert(orrd->od_binheap, &nrq->nr_node);
1143         if (rc == 0) {
1144                 orro->oo_active++;
1145                 if (--orro->oo_quantum == 0)
1146                         orro->oo_round++;
1147         }
1148         return rc;
1149 }
1150
1151 /**
1152  * Removes request \a nrq from an ORR/TRR \a policy instance's set of queued
1153  * requests.
1154  *
1155  * \param[in] policy the policy
1156  * \param[in] nrq    the request to remove
1157  */
1158 static void nrs_orr_req_del(struct ptlrpc_nrs_policy *policy,
1159                             struct ptlrpc_nrs_request *nrq)
1160 {
1161         struct nrs_orr_data     *orrd;
1162         struct nrs_orr_object   *orro;
1163         bool                     is_root;
1164
1165         orro = container_of(nrs_request_resource(nrq),
1166                             struct nrs_orr_object, oo_res);
1167         orrd = container_of(nrs_request_resource(nrq)->res_parent,
1168                             struct nrs_orr_data, od_res);
1169
1170         LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
1171
1172         is_root = &nrq->nr_node == binheap_root(orrd->od_binheap);
1173
1174         binheap_remove(orrd->od_binheap, &nrq->nr_node);
1175         orro->oo_active--;
1176
1177         /**
1178          * If we just deleted the node at the root of the binheap, we may have
1179          * to adjust round numbers.
1180          */
1181         if (unlikely(is_root)) {
1182                 /** Peek at the next request to be served */
1183                 struct binheap_node *node = binheap_root(orrd->od_binheap);
1184
1185                 /** No more requests */
1186                 if (unlikely(node == NULL)) {
1187                         orrd->od_round++;
1188                 } else {
1189                         nrq = container_of(node, struct ptlrpc_nrs_request,
1190                                            nr_node);
1191
1192                         if (orrd->od_round < nrq->nr_u.orr.or_round)
1193                                 orrd->od_round = nrq->nr_u.orr.or_round;
1194                 }
1195         }
1196 }
1197
1198 /**
1199  * Called right after the request @nrq finishes being handled by ORR policy
1200  * instance \a policy.
1201  *
1202  * @policy      the policy that handled the request
1203  * @nrq         the request that was handled
1204  */
1205 static void nrs_orr_req_stop(struct ptlrpc_nrs_policy *policy,
1206                              struct ptlrpc_nrs_request *nrq)
1207 {
1208         /** NB: resource control, credits etc can be added here */
1209         CDEBUG(D_RPCTRACE,
1210                "NRS: finished handling ORR request for object with FID "DFID", from OST with index %u, with round %llu\n",
1211                PFID(&nrq->nr_u.orr.or_key.ok_fid), nrq->nr_u.orr.or_key.ok_idx,
1212                nrq->nr_u.orr.or_round);
1213 }
1214
1215 /**
1216  * Called right after the request @nrq finishes being handled by TRR policy
1217  * instance @policy.
1218  *
1219  * @policy      the policy that handled the request
1220  * @nrq         the request that was handled
1221  */
1222 static void nrs_trr_req_stop(struct ptlrpc_nrs_policy *policy,
1223                              struct ptlrpc_nrs_request *nrq)
1224 {
1225         /** NB: resource control, credits etc can be added here */
1226         CDEBUG(D_RPCTRACE,
1227                "NRS: finished handling TRR request from OST with index %u, with round %llu\n",
1228                nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round);
1229 }
1230
1231 /**
1232  * debugfs interface
1233  */
1234
1235 /**
1236  * This allows to bundle the policy name into the lprocfs_vars::data pointer
1237  * so that lprocfs read/write functions can be used by both the ORR and TRR
1238  * policies.
1239  */
1240 static struct nrs_lprocfs_orr_data {
1241         struct ptlrpc_service   *svc;
1242         char                    *name;
1243 } lprocfs_orr_data = {
1244         .name = NRS_POL_NAME_ORR
1245 }, lprocfs_trr_data = {
1246         .name = NRS_POL_NAME_TRR
1247 };
1248
1249 /**
1250  * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
1251  * for ORR/TRR policy instances on both the regular and high-priority NRS head
1252  * of a service, as long as a policy instance is not in the
1253  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1254  * state are skipped later by nrs_orr_ctl().
1255  *
1256  * Quantum values are in # of RPCs, and the output is in YAML format.
1257  *
1258  * For example:
1259  *
1260  *      reg_quantum:256
1261  *      hp_quantum:8
1262  *
1263  * XXX: the CRR-N version of this, ptlrpc_lprocfs_rd_nrs_crrn_quantum() is
1264  * almost identical; it can be reworked and then reused for ORR/TRR.
1265  */
1266 static int
1267 ptlrpc_lprocfs_nrs_orr_quantum_seq_show(struct seq_file *m, void *data)
1268 {
1269         struct nrs_lprocfs_orr_data *orr_data = m->private;
1270         struct ptlrpc_service       *svc = orr_data->svc;
1271         __u16                        quantum;
1272         int                          rc;
1273
1274         /**
1275          * Perform two separate calls to this as only one of the NRS heads'
1276          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
1277          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1278          */
1279         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1280                                        orr_data->name,
1281                                        NRS_CTL_ORR_RD_QUANTUM,
1282                                        true, &quantum);
1283         if (rc == 0) {
1284                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_REG "%-5d\n", quantum);
1285                 /**
1286                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1287                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1288                  */
1289         } else if (rc != -ENODEV) {
1290                 return rc;
1291         }
1292
1293         /**
1294          * We know the ost_io service which is the only one ORR/TRR policies are
1295          * compatible with, do have an HP NRS head, but it may be best to guard
1296          * against a possible change of this in the future.
1297          */
1298         if (!nrs_svc_has_hp(svc))
1299                 goto no_hp;
1300
1301         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1302                                        orr_data->name, NRS_CTL_ORR_RD_QUANTUM,
1303                                        true, &quantum);
1304         if (rc == 0) {
1305                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
1306                 /**
1307                  * Ignore -ENODEV as the high priority NRS head's policy may be
1308                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1309                  */
1310         } else if (rc != -ENODEV) {
1311                 return rc;
1312         }
1313
1314 no_hp:
1315
1316         return rc;
1317 }
1318
1319 /**
1320  * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
1321  * for ORR/TRR policy instances of a service. The user can set the quantum size
1322  * for the regular and high priority NRS head separately by specifying each
1323  * value, or both together in a single invocation.
1324  *
1325  * For example:
1326  *
1327  * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=req_quantum:64, to set the
1328  * request quantum size of the ORR policy instance on the regular NRS head of
1329  * the ost_io service to 64
1330  *
1331  * lctl set_param ost.OSS.ost_io.nrs_trr_quantum=hp_quantum:8 to set the request
1332  * quantum size of the TRR policy instance on the high priority NRS head of the
1333  * ost_io service to 8
1334  *
1335  * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=32, to set both the request
1336  * quantum size of the ORR policy instance on both the regular and the high
1337  * priority NRS head of the ost_io service to 32
1338  *
1339  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
1340  * are skipped later by nrs_orr_ctl().
1341  *
1342  * XXX: the CRR-N version of this, ptlrpc_lprocfs_wr_nrs_crrn_quantum() is
1343  * almost identical; it can be reworked and then reused for ORR/TRR.
1344  */
1345 static ssize_t
1346 ptlrpc_lprocfs_nrs_orr_quantum_seq_write(struct file *file,
1347                                          const char __user *buffer,
1348                                          size_t count, loff_t *off)
1349 {
1350         struct seq_file             *m = file->private_data;
1351         struct nrs_lprocfs_orr_data *orr_data = m->private;
1352         struct ptlrpc_service       *svc = orr_data->svc;
1353         enum ptlrpc_nrs_queue_type   queue = 0;
1354         char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
1355         char                        *val;
1356         long                         quantum_reg;
1357         long                         quantum_hp;
1358         /** lprocfs_find_named_value() modifies its argument, so keep a copy */
1359         size_t                       count_copy;
1360         int                          rc = 0;
1361         int                          rc2 = 0;
1362
1363         if (count > (sizeof(kernbuf) - 1))
1364                 return -EINVAL;
1365
1366         if (copy_from_user(kernbuf, buffer, count))
1367                 return -EFAULT;
1368
1369         kernbuf[count] = '\0';
1370
1371         count_copy = count;
1372
1373         /**
1374          * Check if the regular quantum value has been specified
1375          */
1376         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
1377                                        &count_copy);
1378         if (val != kernbuf) {
1379                 rc = kstrtol(val, 10, &quantum_reg);
1380                 if (rc)
1381                         return rc;
1382                 queue |= PTLRPC_NRS_QUEUE_REG;
1383         }
1384
1385         count_copy = count;
1386
1387         /**
1388          * Check if the high priority quantum value has been specified
1389          */
1390         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
1391                                        &count_copy);
1392         if (val != kernbuf) {
1393                 if (!nrs_svc_has_hp(svc))
1394                         return -ENODEV;
1395
1396                 rc = kstrtol(val, 10, &quantum_hp);
1397                 if (rc)
1398                         return rc;
1399
1400                 queue |= PTLRPC_NRS_QUEUE_HP;
1401         }
1402
1403         /**
1404          * If none of the queues has been specified, look for a valid numerical
1405          * value
1406          */
1407         if (queue == 0) {
1408                 rc = kstrtol(kernbuf, 10, &quantum_reg);
1409                 if (rc)
1410                         return rc;
1411
1412                 queue = PTLRPC_NRS_QUEUE_REG;
1413
1414                 if (nrs_svc_has_hp(svc)) {
1415                         queue |= PTLRPC_NRS_QUEUE_HP;
1416                         quantum_hp = quantum_reg;
1417                 }
1418         }
1419
1420         if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
1421             ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
1422             (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
1423             ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
1424                 return -EINVAL;
1425
1426         /**
1427          * We change the values on regular and HP NRS heads separately, so that
1428          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1429          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1430          * started the policy on either the regular or HP NRS head; i.e. we are
1431          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1432          * only if the operation fails with -ENODEV on all heads that have been
1433          * specified by the command; if at least one operation succeeds,
1434          * success is returned.
1435          */
1436         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1437                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1438                                                orr_data->name,
1439                                                NRS_CTL_ORR_WR_QUANTUM, false,
1440                                                &quantum_reg);
1441                 if ((rc < 0 && rc != -ENODEV) ||
1442                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1443                         return rc;
1444         }
1445
1446         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1447                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1448                                                 orr_data->name,
1449                                                 NRS_CTL_ORR_WR_QUANTUM, false,
1450                                                 &quantum_hp);
1451                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1452                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1453                         return rc2;
1454         }
1455
1456         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1457 }
1458
1459 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_quantum);
1460
1461 #define LPROCFS_NRS_OFF_NAME_REG                "reg_offset_type:"
1462 #define LPROCFS_NRS_OFF_NAME_HP                 "hp_offset_type:"
1463
1464 #define LPROCFS_NRS_OFF_NAME_PHYSICAL           "physical"
1465 #define LPROCFS_NRS_OFF_NAME_LOGICAL            "logical"
1466
1467 /**
1468  * Retrieves the offset type used by ORR/TRR policy instances on both the
1469  * regular and high-priority NRS head of a service, as long as a policy
1470  * instance is not in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state;
1471  * policy instances in this state are skipped later by nrs_orr_ctl().
1472  *
1473  * Offset type information is a (physical|logical) string, and output is
1474  * in YAML format.
1475  *
1476  * For example:
1477  *
1478  *      reg_offset_type:physical
1479  *      hp_offset_type:logical
1480  */
1481 static int
1482 ptlrpc_lprocfs_nrs_orr_offset_type_seq_show(struct seq_file *m, void *data)
1483 {
1484         struct nrs_lprocfs_orr_data *orr_data = m->private;
1485         struct ptlrpc_service       *svc = orr_data->svc;
1486         bool                         physical;
1487         int                          rc;
1488
1489         /**
1490          * Perform two separate calls to this as only one of the NRS heads'
1491          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1492          * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1493          */
1494         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1495                                        orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1496                                        true, &physical);
1497         if (rc == 0) {
1498                 seq_printf(m, LPROCFS_NRS_OFF_NAME_REG"%s\n",
1499                            physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1500                            LPROCFS_NRS_OFF_NAME_LOGICAL);
1501                 /**
1502                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1503                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1504                  */
1505         } else if (rc != -ENODEV) {
1506                 return rc;
1507         }
1508
1509         /**
1510          * We know the ost_io service which is the only one ORR/TRR policies are
1511          * compatible with, do have an HP NRS head, but it may be best to guard
1512          * against a possible change of this in the future.
1513          */
1514         if (!nrs_svc_has_hp(svc))
1515                 goto no_hp;
1516
1517         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1518                                        orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
1519                                        true, &physical);
1520         if (rc == 0) {
1521                 seq_printf(m, LPROCFS_NRS_OFF_NAME_HP"%s\n",
1522                            physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
1523                            LPROCFS_NRS_OFF_NAME_LOGICAL);
1524                 /**
1525                  * Ignore -ENODEV as the high priority NRS head's policy may be
1526                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1527                  */
1528         } else if (rc != -ENODEV) {
1529                 return rc;
1530         }
1531
1532 no_hp:
1533         return rc;
1534 }
1535
1536 /**
1537  * Max valid command string is the size of the labels, plus "physical" twice.
1538  * plus a separating ' '
1539  */
1540 #define LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD                                        \
1541         sizeof(LPROCFS_NRS_OFF_NAME_REG LPROCFS_NRS_OFF_NAME_PHYSICAL " "      \
1542                LPROCFS_NRS_OFF_NAME_HP LPROCFS_NRS_OFF_NAME_PHYSICAL)
1543
1544 /**
1545  * Sets the type of offsets used to order RPCs in ORR/TRR policy instances. The
1546  * user can set offset type for the regular or high priority NRS head
1547  * separately by specifying each value, or both together in a single invocation.
1548  *
1549  * For example:
1550  *
1551  * lctl set_param ost.OSS.ost_io.nrs_orr_offset_type=
1552  * reg_offset_type:physical, to enable the ORR policy instance on the regular
1553  * NRS head of the ost_io service to use physical disk offset ordering.
1554  *
1555  * lctl set_param ost.OSS.ost_io.nrs_trr_offset_type=logical, to enable the TRR
1556  * policy instances on both the regular ang high priority NRS heads of the
1557  * ost_io service to use logical file offset ordering.
1558  *
1559  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1560  * are skipped later by nrs_orr_ctl().
1561  */
1562 static ssize_t
1563 ptlrpc_lprocfs_nrs_orr_offset_type_seq_write(struct file *file,
1564                                              const char __user *buffer,
1565                                               size_t count,
1566                                              loff_t *off)
1567 {
1568         struct seq_file             *m = file->private_data;
1569         struct nrs_lprocfs_orr_data *orr_data = m->private;
1570         struct ptlrpc_service       *svc = orr_data->svc;
1571         enum ptlrpc_nrs_queue_type   queue = 0;
1572         char                         kernbuf[LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD];
1573         char                        *val_reg;
1574         char                        *val_hp;
1575         bool                         physical_reg;
1576         bool                         physical_hp;
1577         size_t                       count_copy;
1578         int                          rc = 0;
1579         int                          rc2 = 0;
1580
1581         if (count > (sizeof(kernbuf) - 1))
1582                 return -EINVAL;
1583
1584         if (copy_from_user(kernbuf, buffer, count))
1585                 return -EFAULT;
1586
1587         kernbuf[count] = '\0';
1588
1589         count_copy = count;
1590
1591         /**
1592          * Check if the regular offset type has been specified
1593          */
1594         val_reg = lprocfs_find_named_value(kernbuf,
1595                                            LPROCFS_NRS_OFF_NAME_REG,
1596                                            &count_copy);
1597         if (val_reg != kernbuf)
1598                 queue |= PTLRPC_NRS_QUEUE_REG;
1599
1600         count_copy = count;
1601
1602         /**
1603          * Check if the high priority offset type has been specified
1604          */
1605         val_hp = lprocfs_find_named_value(kernbuf, LPROCFS_NRS_OFF_NAME_HP,
1606                                           &count_copy);
1607         if (val_hp != kernbuf) {
1608                 if (!nrs_svc_has_hp(svc))
1609                         return -ENODEV;
1610
1611                 queue |= PTLRPC_NRS_QUEUE_HP;
1612         }
1613
1614         /**
1615          * If none of the queues has been specified, there may be a valid
1616          * command string at the start of the buffer.
1617          */
1618         if (queue == 0) {
1619                 queue = PTLRPC_NRS_QUEUE_REG;
1620
1621                 if (nrs_svc_has_hp(svc))
1622                         queue |= PTLRPC_NRS_QUEUE_HP;
1623         }
1624
1625         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1626                 if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1627                             sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1628                         physical_reg = true;
1629                 else if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_LOGICAL,
1630                          sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1631                         physical_reg = false;
1632                 else
1633                         return -EINVAL;
1634         }
1635
1636         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1637                 if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_PHYSICAL,
1638                             sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
1639                         physical_hp = true;
1640                 else if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_LOGICAL,
1641                                  sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
1642                         physical_hp = false;
1643                 else
1644                         return -EINVAL;
1645         }
1646
1647         /**
1648          * We change the values on regular and HP NRS heads separately, so that
1649          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1650          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1651          * started the policy on either the regular or HP NRS head; i.e. we are
1652          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1653          * only if the operation fails with -ENODEV on all heads that have been
1654          * specified by the command; if at least one operation succeeds,
1655          * success is returned.
1656          */
1657         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1658                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1659                                                orr_data->name,
1660                                                NRS_CTL_ORR_WR_OFF_TYPE, false,
1661                                                &physical_reg);
1662                 if ((rc < 0 && rc != -ENODEV) ||
1663                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1664                         return rc;
1665         }
1666
1667         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1668                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1669                                                 orr_data->name,
1670                                                 NRS_CTL_ORR_WR_OFF_TYPE, false,
1671                                                 &physical_hp);
1672                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1673                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1674                         return rc2;
1675         }
1676
1677         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1678 }
1679
1680 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_offset_type);
1681
1682 #define NRS_LPROCFS_REQ_SUPP_NAME_REG           "reg_supported:"
1683 #define NRS_LPROCFS_REQ_SUPP_NAME_HP            "hp_supported:"
1684
1685 #define LPROCFS_NRS_SUPP_NAME_READS             "reads"
1686 #define LPROCFS_NRS_SUPP_NAME_WRITES            "writes"
1687 #define LPROCFS_NRS_SUPP_NAME_READWRITES        "reads_and_writes"
1688
1689 /**
1690  * Translates enum nrs_orr_supp values to a corresponding string.
1691  */
1692 static const char *nrs_orr_supp2str(enum nrs_orr_supp supp)
1693 {
1694         switch(supp) {
1695         default:
1696                 LBUG();
1697         case NOS_OST_READ:
1698                 return LPROCFS_NRS_SUPP_NAME_READS;
1699         case NOS_OST_WRITE:
1700                 return LPROCFS_NRS_SUPP_NAME_WRITES;
1701         case NOS_OST_RW:
1702                 return LPROCFS_NRS_SUPP_NAME_READWRITES;
1703         }
1704 }
1705
1706 /**
1707  * Translates strings to the corresponding enum nrs_orr_supp value
1708  */
1709 static enum nrs_orr_supp nrs_orr_str2supp(const char *val)
1710 {
1711         if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READWRITES,
1712                     sizeof(LPROCFS_NRS_SUPP_NAME_READWRITES) - 1) == 0)
1713                 return NOS_OST_RW;
1714         else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READS,
1715                          sizeof(LPROCFS_NRS_SUPP_NAME_READS) - 1) == 0)
1716                 return NOS_OST_READ;
1717         else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_WRITES,
1718                          sizeof(LPROCFS_NRS_SUPP_NAME_WRITES) - 1) == 0)
1719                 return NOS_OST_WRITE;
1720         else
1721                 return -EINVAL;
1722 }
1723
1724 /**
1725  * Retrieves the type of RPCs handled at the point of invocation by ORR/TRR
1726  * policy instances on both the regular and high-priority NRS head of a service,
1727  * as long as a policy instance is not in the
1728  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
1729  * state are skipped later by nrs_orr_ctl().
1730  *
1731  * Supported RPC type information is a (reads|writes|reads_and_writes) string,
1732  * and output is in YAML format.
1733  *
1734  * For example:
1735  *
1736  *      reg_supported:reads
1737  *      hp_supported:reads_and_writes
1738  */
1739 static int
1740 ptlrpc_lprocfs_nrs_orr_supported_seq_show(struct seq_file *m, void *data)
1741 {
1742         struct nrs_lprocfs_orr_data *orr_data = m->private;
1743         struct ptlrpc_service       *svc = orr_data->svc;
1744         enum nrs_orr_supp            supported;
1745         int                          rc;
1746
1747         /**
1748          * Perform two separate calls to this as only one of the NRS heads'
1749          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
1750          * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1751          */
1752         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1753                                        orr_data->name,
1754                                        NRS_CTL_ORR_RD_SUPP_REQ, true,
1755                                        &supported);
1756
1757         if (rc == 0) {
1758                 seq_printf(m, NRS_LPROCFS_REQ_SUPP_NAME_REG"%s\n",
1759                            nrs_orr_supp2str(supported));
1760                 /**
1761                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1762                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1763                  */
1764         } else if (rc != -ENODEV) {
1765                 return rc;
1766         }
1767
1768         /**
1769          * We know the ost_io service which is the only one ORR/TRR policies are
1770          * compatible with, do have an HP NRS head, but it may be best to guard
1771          * against a possible change of this in the future.
1772          */
1773         if (!nrs_svc_has_hp(svc))
1774                 goto no_hp;
1775
1776         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1777                                        orr_data->name,
1778                                        NRS_CTL_ORR_RD_SUPP_REQ, true,
1779                                        &supported);
1780         if (rc == 0) {
1781                 seq_printf(m, NRS_LPROCFS_REQ_SUPP_NAME_HP"%s\n",
1782                            nrs_orr_supp2str(supported));
1783                 /**
1784                  * Ignore -ENODEV as the high priority NRS head's policy may be
1785                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1786                  */
1787         } else if (rc != -ENODEV) {
1788                 return rc;
1789         }
1790
1791 no_hp:
1792
1793         return rc;
1794 }
1795
1796 /**
1797  * Max valid command string is the size of the labels, plus "reads_and_writes"
1798  * twice, plus a separating ' '
1799  */
1800 #define LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD                                        \
1801         sizeof(NRS_LPROCFS_REQ_SUPP_NAME_REG LPROCFS_NRS_SUPP_NAME_READWRITES  \
1802                NRS_LPROCFS_REQ_SUPP_NAME_HP LPROCFS_NRS_SUPP_NAME_READWRITES   \
1803                " ")
1804
1805 /**
1806  * Sets the type of RPCs handled by ORR/TRR policy instances. The user can
1807  * modify this setting for the regular or high priority NRS heads separately, or
1808  * both together in a single invocation.
1809  *
1810  * For example:
1811  *
1812  * lctl set_param ost.OSS.ost_io.nrs_orr_supported=
1813  * "reg_supported:reads", to enable the ORR policy instance on the regular NRS
1814  * head of the ost_io service to handle OST_READ RPCs.
1815  *
1816  * lctl set_param ost.OSS.ost_io.nrs_trr_supported=reads_and_writes, to enable
1817  * the TRR policy instances on both the regular ang high priority NRS heads of
1818  * the ost_io service to use handle OST_READ and OST_WRITE RPCs.
1819  *
1820  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
1821  * are skipped later by nrs_orr_ctl().
1822  */
1823 static ssize_t
1824 ptlrpc_lprocfs_nrs_orr_supported_seq_write(struct file *file,
1825                                            const char __user *buffer,
1826                                            size_t count,
1827                                            loff_t *off)
1828 {
1829         struct seq_file             *m = file->private_data;
1830         struct nrs_lprocfs_orr_data *orr_data = m->private;
1831         struct ptlrpc_service       *svc = orr_data->svc;
1832         enum ptlrpc_nrs_queue_type   queue = 0;
1833         char                         kernbuf[LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD];
1834         char                        *val_reg;
1835         char                        *val_hp;
1836         enum nrs_orr_supp            supp_reg;
1837         enum nrs_orr_supp            supp_hp;
1838         size_t                       count_copy;
1839         int                          rc = 0;
1840         int                          rc2 = 0;
1841
1842         if (count > (sizeof(kernbuf) - 1))
1843                 return -EINVAL;
1844
1845         if (copy_from_user(kernbuf, buffer, count))
1846                 return -EFAULT;
1847
1848         kernbuf[count] = '\0';
1849
1850         count_copy = count;
1851
1852         /**
1853          * Check if the regular supported requests setting has been specified
1854          */
1855         val_reg = lprocfs_find_named_value(kernbuf,
1856                                            NRS_LPROCFS_REQ_SUPP_NAME_REG,
1857                                            &count_copy);
1858         if (val_reg != kernbuf)
1859                 queue |= PTLRPC_NRS_QUEUE_REG;
1860
1861         count_copy = count;
1862
1863         /**
1864          * Check if the high priority supported requests setting has been
1865          * specified
1866          */
1867         val_hp = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_REQ_SUPP_NAME_HP,
1868                                           &count_copy);
1869         if (val_hp != kernbuf) {
1870                 if (!nrs_svc_has_hp(svc))
1871                         return -ENODEV;
1872
1873                 queue |= PTLRPC_NRS_QUEUE_HP;
1874         }
1875
1876         /**
1877          * If none of the queues has been specified, there may be a valid
1878          * command string at the start of the buffer.
1879          */
1880         if (queue == 0) {
1881                 queue = PTLRPC_NRS_QUEUE_REG;
1882
1883                 if (nrs_svc_has_hp(svc))
1884                         queue |= PTLRPC_NRS_QUEUE_HP;
1885         }
1886
1887         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1888                 supp_reg = nrs_orr_str2supp(val_reg);
1889                 if (supp_reg == -EINVAL)
1890                         return -EINVAL;
1891         }
1892
1893         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1894                 supp_hp = nrs_orr_str2supp(val_hp);
1895                 if (supp_hp == -EINVAL)
1896                         return -EINVAL;
1897         }
1898
1899         /**
1900          * We change the values on regular and HP NRS heads separately, so that
1901          * we do not exit early from ptlrpc_nrs_policy_control() with an error
1902          * returned by nrs_policy_ctl_locked(), in cases where the user has not
1903          * started the policy on either the regular or HP NRS head; i.e. we are
1904          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
1905          * only if the operation fails with -ENODEV on all heads that have been
1906          * specified by the command; if at least one operation succeeds,
1907          * success is returned.
1908          */
1909         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1910                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1911                                                orr_data->name,
1912                                                NRS_CTL_ORR_WR_SUPP_REQ, false,
1913                                                &supp_reg);
1914                 if ((rc < 0 && rc != -ENODEV) ||
1915                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
1916                         return rc;
1917         }
1918
1919         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1920                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1921                                                 orr_data->name,
1922                                                 NRS_CTL_ORR_WR_SUPP_REQ, false,
1923                                                 &supp_hp);
1924                 if ((rc2 < 0 && rc2 != -ENODEV) ||
1925                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
1926                         return rc2;
1927         }
1928
1929         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
1930 }
1931
1932 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_orr_supported);
1933
1934 static int nrs_orr_lprocfs_init(struct ptlrpc_service *svc)
1935 {
1936         int     i;
1937
1938         struct ldebugfs_vars nrs_orr_lprocfs_vars[] = {
1939                 { .name         = "nrs_orr_quantum",
1940                   .fops         = &ptlrpc_lprocfs_nrs_orr_quantum_fops  },
1941                 { .name         = "nrs_orr_offset_type",
1942                   .fops         = &ptlrpc_lprocfs_nrs_orr_offset_type_fops },
1943                 { .name         = "nrs_orr_supported",
1944                   .fops         = &ptlrpc_lprocfs_nrs_orr_supported_fops },
1945                 { NULL }
1946         };
1947
1948         if (!svc->srv_debugfs_entry)
1949                 return 0;
1950
1951         lprocfs_orr_data.svc = svc;
1952
1953         for (i = 0; i < ARRAY_SIZE(nrs_orr_lprocfs_vars); i++)
1954                 nrs_orr_lprocfs_vars[i].data = &lprocfs_orr_data;
1955
1956         ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_orr_lprocfs_vars, NULL);
1957
1958         return 0;
1959 }
1960
1961 static const struct ptlrpc_nrs_pol_ops nrs_orr_ops = {
1962         .op_policy_init         = nrs_orr_init,
1963         .op_policy_start        = nrs_orr_start,
1964         .op_policy_stop         = nrs_orr_stop,
1965         .op_policy_ctl          = nrs_orr_ctl,
1966         .op_res_get             = nrs_orr_res_get,
1967         .op_res_put             = nrs_orr_res_put,
1968         .op_req_get             = nrs_orr_req_get,
1969         .op_req_enqueue         = nrs_orr_req_add,
1970         .op_req_dequeue         = nrs_orr_req_del,
1971         .op_req_stop            = nrs_orr_req_stop,
1972         .op_lprocfs_init        = nrs_orr_lprocfs_init,
1973 };
1974
1975 struct ptlrpc_nrs_pol_conf nrs_conf_orr = {
1976         .nc_name                = NRS_POL_NAME_ORR,
1977         .nc_ops                 = &nrs_orr_ops,
1978         .nc_compat              = nrs_policy_compat_one,
1979         .nc_compat_svc_name     = "ost_io",
1980 };
1981
1982 /**
1983  * TRR, Target-based Round Robin policy
1984  *
1985  * TRR reuses much of the functions and data structures of ORR
1986  */
1987 static int nrs_trr_lprocfs_init(struct ptlrpc_service *svc)
1988 {
1989         int     i;
1990
1991         struct ldebugfs_vars nrs_trr_lprocfs_vars[] = {
1992                 { .name         = "nrs_trr_quantum",
1993                   .fops         = &ptlrpc_lprocfs_nrs_orr_quantum_fops },
1994                 { .name         = "nrs_trr_offset_type",
1995                   .fops         = &ptlrpc_lprocfs_nrs_orr_offset_type_fops },
1996                 { .name         = "nrs_trr_supported",
1997                   .fops         = &ptlrpc_lprocfs_nrs_orr_supported_fops },
1998                 { NULL }
1999         };
2000
2001         if (!svc->srv_debugfs_entry)
2002                 return 0;
2003
2004         lprocfs_trr_data.svc = svc;
2005
2006         for (i = 0; i < ARRAY_SIZE(nrs_trr_lprocfs_vars); i++)
2007                 nrs_trr_lprocfs_vars[i].data = &lprocfs_trr_data;
2008
2009         ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_trr_lprocfs_vars, NULL);
2010
2011         return 0;
2012 }
2013
2014 /**
2015  * Reuse much of the ORR functionality for TRR.
2016  */
2017 static const struct ptlrpc_nrs_pol_ops nrs_trr_ops = {
2018         .op_policy_init         = nrs_orr_init,
2019         .op_policy_start        = nrs_orr_start,
2020         .op_policy_stop         = nrs_orr_stop,
2021         .op_policy_ctl          = nrs_orr_ctl,
2022         .op_res_get             = nrs_trr_res_get,
2023         .op_req_get             = nrs_orr_req_get,
2024         .op_req_enqueue         = nrs_orr_req_add,
2025         .op_req_dequeue         = nrs_orr_req_del,
2026         .op_req_stop            = nrs_trr_req_stop,
2027         .op_lprocfs_init        = nrs_trr_lprocfs_init,
2028 };
2029
2030 struct ptlrpc_nrs_pol_conf nrs_conf_trr = {
2031         .nc_name                = NRS_POL_NAME_TRR,
2032         .nc_ops                 = &nrs_trr_ops,
2033         .nc_compat              = nrs_policy_compat_one,
2034         .nc_compat_svc_name     = "ost_io",
2035 };
2036
2037 /** @} ORR/TRR policy */
2038
2039 /** @} nrs */