Whamcloud - gitweb
73606e49520535b36c5a199a120a34b51bc784ba
[fs/lustre-release.git] / lustre / ptlrpc / nrs_crr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2017, Intel Corporation.
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs_crr.c
29  *
30  * Network Request Scheduler (NRS) CRR-N policy
31  *
32  * Request ordering in a batched Round-Robin manner over client NIDs
33  *
34  * Author: Liang Zhen <liang@whamcloud.com>
35  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
36  */
37 /**
38  * \addtogoup nrs
39  * @{
40  */
41
42 #define DEBUG_SUBSYSTEM S_RPC
43 #include <obd_support.h>
44 #include <obd_class.h>
45 #include <lustre_net.h>
46 #include <lprocfs_status.h>
47 #include "ptlrpc_internal.h"
48
49 /**
50  * \name CRR-N policy
51  *
52  * Client Round-Robin scheduling over client NIDs
53  *
54  * @{
55  *
56  */
57
58 #define NRS_POL_NAME_CRRN       "crrn"
59
60 /**
61  * Binary heap predicate.
62  *
63  * Uses ptlrpc_nrs_request::nr_u::crr::cr_round and
64  * ptlrpc_nrs_request::nr_u::crr::cr_sequence to compare two binheap nodes and
65  * produce a binary predicate that shows their relative priority, so that the
66  * binary heap can perform the necessary sorting operations.
67  *
68  * \param[in] e1 the first binheap node to compare
69  * \param[in] e2 the second binheap node to compare
70  *
71  * \retval 0 e1 > e2
72  * \retval 1 e1 <= e2
73  */
74 static int
75 crrn_req_compare(struct binheap_node *e1, struct binheap_node *e2)
76 {
77         struct ptlrpc_nrs_request *nrq1;
78         struct ptlrpc_nrs_request *nrq2;
79
80         nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
81         nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
82
83         if (nrq1->nr_u.crr.cr_round < nrq2->nr_u.crr.cr_round)
84                 return 1;
85         else if (nrq1->nr_u.crr.cr_round > nrq2->nr_u.crr.cr_round)
86                 return 0;
87
88         return nrq1->nr_u.crr.cr_sequence < nrq2->nr_u.crr.cr_sequence;
89 }
90
91 static struct binheap_ops nrs_crrn_heap_ops = {
92         .hop_enter      = NULL,
93         .hop_exit       = NULL,
94         .hop_compare    = crrn_req_compare,
95 };
96
97 /**
98  * rhashtable operations for nrs_crrn_net::cn_cli_hash
99  *
100  * This uses ptlrpc_request::rq_peer.nid (as nid4) as its key, in order to hash
101  * nrs_crrn_client objects.
102  */
103 static u32 nrs_crrn_hashfn(const void *data, u32 len, u32 seed)
104 {
105         const struct lnet_nid *nid = data;
106
107         return cfs_hash_32(nidhash(nid) ^ seed, 32);
108 }
109
110 static int nrs_crrn_cmpfn(struct rhashtable_compare_arg *arg, const void *obj)
111 {
112         const struct nrs_crrn_client *cli = obj;
113         const struct lnet_nid *nid = arg->key;
114
115         return nid_same(nid, &cli->cc_nid) ? 0 : -ESRCH;
116 }
117
118 static const struct rhashtable_params nrs_crrn_hash_params = {
119         .key_len        = sizeof(struct lnet_nid),
120         .key_offset     = offsetof(struct nrs_crrn_client, cc_nid),
121         .head_offset    = offsetof(struct nrs_crrn_client, cc_rhead),
122         .hashfn         = nrs_crrn_hashfn,
123         .obj_cmpfn      = nrs_crrn_cmpfn,
124 };
125
126 static void nrs_crrn_exit(void *vcli, void *data)
127 {
128         struct nrs_crrn_client *cli = vcli;
129
130         LASSERTF(atomic_read(&cli->cc_ref) == 0,
131                  "Busy CRR-N object from client with NID %s, with %d refs\n",
132                  libcfs_nidstr(&cli->cc_nid), atomic_read(&cli->cc_ref));
133
134         OBD_FREE_PTR(cli);
135 }
136
137 /**
138  * Called when a CRR-N policy instance is started.
139  *
140  * \param[in] policy the policy
141  *
142  * \retval -ENOMEM OOM error
143  * \retval 0       success
144  */
145 static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy, char *arg)
146 {
147         struct nrs_crrn_net    *net;
148         int                     rc = 0;
149         ENTRY;
150
151         OBD_CPT_ALLOC_PTR(net, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
152         if (net == NULL)
153                 RETURN(-ENOMEM);
154
155         net->cn_binheap = binheap_create(&nrs_crrn_heap_ops,
156                                              CBH_FLAG_ATOMIC_GROW, 4096, NULL,
157                                              nrs_pol2cptab(policy),
158                                              nrs_pol2cptid(policy));
159         if (net->cn_binheap == NULL)
160                 GOTO(out_net, rc = -ENOMEM);
161
162         rc = rhashtable_init(&net->cn_cli_hash, &nrs_crrn_hash_params);
163         if (rc)
164                 GOTO(out_binheap, rc);
165
166         /**
167          * Set default quantum value to max_rpcs_in_flight for non-MDS OSCs;
168          * there may be more RPCs pending from each struct nrs_crrn_client even
169          * with the default max_rpcs_in_flight value, as we are scheduling over
170          * NIDs, and there may be more than one mount point per client.
171          */
172         net->cn_quantum = OBD_MAX_RIF_DEFAULT;
173         /**
174          * Set to 1 so that the test inside nrs_crrn_req_add() can evaluate to
175          * true.
176          */
177         net->cn_sequence = 1;
178
179         policy->pol_private = net;
180
181         RETURN(rc);
182
183 out_binheap:
184         binheap_destroy(net->cn_binheap);
185 out_net:
186         OBD_FREE_PTR(net);
187
188         RETURN(rc);
189 }
190
191 /**
192  * Called when a CRR-N policy instance is stopped.
193  *
194  * Called when the policy has been instructed to transition to the
195  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more pending
196  * requests to serve.
197  *
198  * \param[in] policy the policy
199  */
200 static void nrs_crrn_stop(struct ptlrpc_nrs_policy *policy)
201 {
202         struct nrs_crrn_net     *net = policy->pol_private;
203         ENTRY;
204
205         LASSERT(net != NULL);
206         LASSERT(net->cn_binheap != NULL);
207         LASSERT(binheap_is_empty(net->cn_binheap));
208
209         rhashtable_free_and_destroy(&net->cn_cli_hash, nrs_crrn_exit, NULL);
210         binheap_destroy(net->cn_binheap);
211
212         OBD_FREE_PTR(net);
213 }
214
215 /**
216  * Performs a policy-specific ctl function on CRR-N policy instances; similar
217  * to ioctl.
218  *
219  * \param[in]     policy the policy instance
220  * \param[in]     opc    the opcode
221  * \param[in,out] arg    used for passing parameters and information
222  *
223  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
224  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
225  *
226  * \retval 0   operation carried out successfully
227  * \retval -ve error
228  */
229 static int nrs_crrn_ctl(struct ptlrpc_nrs_policy *policy,
230                         enum ptlrpc_nrs_ctl opc,
231                         void *arg)
232 {
233         assert_spin_locked(&policy->pol_nrs->nrs_lock);
234
235         switch (opc) {
236         default:
237                 RETURN(-EINVAL);
238
239         /**
240          * Read Round Robin quantum size of a policy instance.
241          */
242         case NRS_CTL_CRRN_RD_QUANTUM: {
243                 struct nrs_crrn_net     *net = policy->pol_private;
244
245                 *(__u16 *)arg = net->cn_quantum;
246                 }
247                 break;
248
249         /**
250          * Write Round Robin quantum size of a policy instance.
251          */
252         case NRS_CTL_CRRN_WR_QUANTUM: {
253                 struct nrs_crrn_net     *net = policy->pol_private;
254
255                 net->cn_quantum = *(__u16 *)arg;
256                 LASSERT(net->cn_quantum != 0);
257                 }
258                 break;
259         }
260
261         RETURN(0);
262 }
263
264 /**
265  * Obtains resources from CRR-N policy instances. The top-level resource lives
266  * inside \e nrs_crrn_net and the second-level resource inside
267  * \e nrs_crrn_client object instances.
268  *
269  * \param[in]  policy     the policy for which resources are being taken for
270  *                        request \a nrq
271  * \param[in]  nrq        the request for which resources are being taken
272  * \param[in]  parent     parent resource, embedded in nrs_crrn_net for the
273  *                        CRR-N policy
274  * \param[out] resp       resources references are placed in this array
275  * \param[in]  moving_req signifies limited caller context; used to perform
276  *                        memory allocations in an atomic context in this
277  *                        policy
278  *
279  * \retval 0   we are returning a top-level, parent resource, one that is
280  *             embedded in an nrs_crrn_net object
281  * \retval 1   we are returning a bottom-level resource, one that is embedded
282  *             in an nrs_crrn_client object
283  *
284  * \see nrs_resource_get_safe()
285  */
286 static int nrs_crrn_res_get(struct ptlrpc_nrs_policy *policy,
287                             struct ptlrpc_nrs_request *nrq,
288                             const struct ptlrpc_nrs_resource *parent,
289                             struct ptlrpc_nrs_resource **resp, bool moving_req)
290 {
291         struct nrs_crrn_net     *net;
292         struct nrs_crrn_client  *cli;
293         struct nrs_crrn_client  *tmp;
294         struct ptlrpc_request   *req;
295
296         if (parent == NULL) {
297                 *resp = &((struct nrs_crrn_net *)policy->pol_private)->cn_res;
298                 return 0;
299         }
300
301         net = container_of(parent, struct nrs_crrn_net, cn_res);
302         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
303         cli = rhashtable_lookup_fast(&net->cn_cli_hash, &req->rq_peer.nid,
304                                      nrs_crrn_hash_params);
305         if (cli)
306                 goto out;
307
308         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
309                           sizeof(*cli), moving_req ? GFP_ATOMIC : GFP_NOFS);
310         if (cli == NULL)
311                 return -ENOMEM;
312
313         cli->cc_nid = req->rq_peer.nid;
314
315         atomic_set(&cli->cc_ref, 0);
316
317         tmp = rhashtable_lookup_get_insert_fast(&net->cn_cli_hash,
318                                                 &cli->cc_rhead,
319                                                 nrs_crrn_hash_params);
320         if (tmp) {
321                 /* insertion failed */
322                 OBD_FREE_PTR(cli);
323                 if (IS_ERR(tmp))
324                         return PTR_ERR(tmp);
325                 cli = tmp;
326         }
327 out:
328         atomic_inc(&cli->cc_ref);
329         *resp = &cli->cc_res;
330
331         return 1;
332 }
333
334 /**
335  * Called when releasing references to the resource hierachy obtained for a
336  * request for scheduling using the CRR-N policy.
337  *
338  * \param[in] policy   the policy the resource belongs to
339  * \param[in] res      the resource to be released
340  */
341 static void nrs_crrn_res_put(struct ptlrpc_nrs_policy *policy,
342                              const struct ptlrpc_nrs_resource *res)
343 {
344         struct nrs_crrn_client *cli;
345
346         /**
347          * Do nothing for freeing parent, nrs_crrn_net resources
348          */
349         if (res->res_parent == NULL)
350                 return;
351
352         cli = container_of(res, struct nrs_crrn_client, cc_res);
353
354         atomic_dec(&cli->cc_ref);
355 }
356
357 /**
358  * Called when getting a request from the CRR-N policy for handlingso that it can be served
359  *
360  * \param[in] policy the policy being polled
361  * \param[in] peek   when set, signifies that we just want to examine the
362  *                   request, and not handle it, so the request is not removed
363  *                   from the policy.
364  * \param[in] force  force the policy to return a request; unused in this policy
365  *
366  * \retval the request to be handled
367  * \retval NULL no request available
368  *
369  * \see ptlrpc_nrs_req_get_nolock()
370  * \see nrs_request_get()
371  */
372 static
373 struct ptlrpc_nrs_request *nrs_crrn_req_get(struct ptlrpc_nrs_policy *policy,
374                                             bool peek, bool force)
375 {
376         struct nrs_crrn_net       *net = policy->pol_private;
377         struct binheap_node       *node = binheap_root(net->cn_binheap);
378         struct ptlrpc_nrs_request *nrq;
379
380         nrq = unlikely(node == NULL) ? NULL :
381               container_of(node, struct ptlrpc_nrs_request, nr_node);
382
383         if (likely(!peek && nrq != NULL)) {
384                 struct nrs_crrn_client *cli;
385                 struct ptlrpc_request *req = container_of(nrq,
386                                                           struct ptlrpc_request,
387                                                           rq_nrq);
388
389                 cli = container_of(nrs_request_resource(nrq),
390                                    struct nrs_crrn_client, cc_res);
391
392                 LASSERT(nrq->nr_u.crr.cr_round <= cli->cc_round);
393
394                 binheap_remove(net->cn_binheap, &nrq->nr_node);
395                 cli->cc_active--;
396
397                 CDEBUG(D_RPCTRACE,
398                        "NRS: starting to handle %s request from %s, with round "
399                        "%llu\n", NRS_POL_NAME_CRRN,
400                        libcfs_idstr(&req->rq_peer), nrq->nr_u.crr.cr_round);
401
402                 /** Peek at the next request to be served */
403                 node = binheap_root(net->cn_binheap);
404
405                 /** No more requests */
406                 if (unlikely(node == NULL)) {
407                         net->cn_round++;
408                 } else {
409                         struct ptlrpc_nrs_request *next;
410
411                         next = container_of(node, struct ptlrpc_nrs_request,
412                                             nr_node);
413
414                         if (net->cn_round < next->nr_u.crr.cr_round)
415                                 net->cn_round = next->nr_u.crr.cr_round;
416                 }
417         }
418
419         return nrq;
420 }
421
422 /**
423  * Adds request \a nrq to a CRR-N \a policy instance's set of queued requests
424  *
425  * A scheduling round is a stream of requests that have been sorted in batches
426  * according to the client that they originate from (as identified by its NID);
427  * there can be only one batch for each client in each round. The batches are of
428  * maximum size nrs_crrn_net:cn_quantum. When a new request arrives for
429  * scheduling from a client that has exhausted its quantum in its current round,
430  * it will start scheduling requests on the next scheduling round. Clients are
431  * allowed to schedule requests against a round until all requests for the round
432  * are serviced, so a client might miss a round if it is not generating requests
433  * for a long enough period of time. Clients that miss a round will continue
434  * with scheduling the next request that they generate, starting at the round
435  * that requests are being dispatched for, at the time of arrival of this new
436  * request.
437  *
438  * Requests are tagged with the round number and a sequence number; the sequence
439  * number indicates the relative ordering amongst the batches of requests in a
440  * round, and is identical for all requests in a batch, as is the round number.
441  * The round and sequence numbers are used by crrn_req_compare() in order to
442  * maintain an ordered set of rounds, with each round consisting of an ordered
443  * set of batches of requests.
444  *
445  * \param[in] policy the policy
446  * \param[in] nrq    the request to add
447  *
448  * \retval 0    request successfully added
449  * \retval != 0 error
450  */
451 static int nrs_crrn_req_add(struct ptlrpc_nrs_policy *policy,
452                             struct ptlrpc_nrs_request *nrq)
453 {
454         struct nrs_crrn_net     *net;
455         struct nrs_crrn_client  *cli;
456         int                      rc;
457
458         cli = container_of(nrs_request_resource(nrq),
459                            struct nrs_crrn_client, cc_res);
460         net = container_of(nrs_request_resource(nrq)->res_parent,
461                            struct nrs_crrn_net, cn_res);
462
463         if (cli->cc_quantum == 0 || cli->cc_round < net->cn_round ||
464             (cli->cc_active == 0 && cli->cc_quantum > 0)) {
465
466                 /**
467                  * If the client has no pending requests, and still some of its
468                  * quantum remaining unused, which implies it has not had a
469                  * chance to schedule up to its maximum allowed batch size of
470                  * requests in the previous round it participated, schedule this
471                  * next request on a new round; this avoids fragmentation of
472                  * request batches caused by client inactivity, at the expense
473                  * of potentially slightly increased service time for the
474                  * request batch this request will be a part of.
475                  */
476                 if (cli->cc_active == 0 && cli->cc_quantum > 0)
477                         cli->cc_round++;
478
479                 /** A new scheduling round has commenced */
480                 if (cli->cc_round < net->cn_round)
481                         cli->cc_round = net->cn_round;
482
483                 /** I was not the last client through here */
484                 if (cli->cc_sequence < net->cn_sequence)
485                         cli->cc_sequence = ++net->cn_sequence;
486                 /**
487                  * Reset the quantum if we have reached the maximum quantum
488                  * size for this batch, or even if we have not managed to
489                  * complete a batch size up to its maximum allowed size.
490                  * XXX: Accessed unlocked
491                  */
492                 cli->cc_quantum = net->cn_quantum;
493         }
494
495         nrq->nr_u.crr.cr_round = cli->cc_round;
496         nrq->nr_u.crr.cr_sequence = cli->cc_sequence;
497
498         rc = binheap_insert(net->cn_binheap, &nrq->nr_node);
499         if (rc == 0) {
500                 cli->cc_active++;
501                 if (--cli->cc_quantum == 0)
502                         cli->cc_round++;
503         }
504         return rc;
505 }
506
507 /**
508  * Removes request \a nrq from a CRR-N \a policy instance's set of queued
509  * requests.
510  *
511  * \param[in] policy the policy
512  * \param[in] nrq    the request to remove
513  */
514 static void nrs_crrn_req_del(struct ptlrpc_nrs_policy *policy,
515                              struct ptlrpc_nrs_request *nrq)
516 {
517         struct nrs_crrn_net     *net;
518         struct nrs_crrn_client  *cli;
519         bool                     is_root;
520
521         cli = container_of(nrs_request_resource(nrq),
522                            struct nrs_crrn_client, cc_res);
523         net = container_of(nrs_request_resource(nrq)->res_parent,
524                            struct nrs_crrn_net, cn_res);
525
526         LASSERT(nrq->nr_u.crr.cr_round <= cli->cc_round);
527
528         is_root = &nrq->nr_node == binheap_root(net->cn_binheap);
529
530         binheap_remove(net->cn_binheap, &nrq->nr_node);
531         cli->cc_active--;
532
533         /**
534          * If we just deleted the node at the root of the binheap, we may have
535          * to adjust round numbers.
536          */
537         if (unlikely(is_root)) {
538                 /** Peek at the next request to be served */
539                 struct binheap_node *node = binheap_root(net->cn_binheap);
540
541                 /** No more requests */
542                 if (unlikely(node == NULL)) {
543                         net->cn_round++;
544                 } else {
545                         nrq = container_of(node, struct ptlrpc_nrs_request,
546                                            nr_node);
547
548                         if (net->cn_round < nrq->nr_u.crr.cr_round)
549                                 net->cn_round = nrq->nr_u.crr.cr_round;
550                 }
551         }
552 }
553
554 /**
555  * Called right after the request \a nrq finishes being handled by CRR-N policy
556  * instance \a policy.
557  *
558  * \param[in] policy the policy that handled the request
559  * \param[in] nrq    the request that was handled
560  */
561 static void nrs_crrn_req_stop(struct ptlrpc_nrs_policy *policy,
562                               struct ptlrpc_nrs_request *nrq)
563 {
564         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
565                                                   rq_nrq);
566
567         CDEBUG(D_RPCTRACE,
568                "NRS: finished handling %s request from %s, with round %llu"
569                "\n", NRS_POL_NAME_CRRN,
570                libcfs_idstr(&req->rq_peer), nrq->nr_u.crr.cr_round);
571 }
572
573 /**
574  * debugfs interface
575  */
576
577 /**
578  * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
579  * for CRR-N policy instances on both the regular and high-priority NRS head
580  * of a service, as long as a policy instance is not in the
581  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
582  * state are skipped later by nrs_crrn_ctl().
583  *
584  * Quantum values are in # of RPCs, and output is in YAML format.
585  *
586  * For example:
587  *
588  *      reg_quantum:8
589  *      hp_quantum:4
590  */
591 static int
592 ptlrpc_lprocfs_nrs_crrn_quantum_seq_show(struct seq_file *m, void *data)
593 {
594         struct ptlrpc_service   *svc = m->private;
595         __u16                   quantum;
596         int                     rc;
597
598         /**
599          * Perform two separate calls to this as only one of the NRS heads'
600          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
601          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
602          */
603         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
604                                        NRS_POL_NAME_CRRN,
605                                        NRS_CTL_CRRN_RD_QUANTUM,
606                                        true, &quantum);
607         if (rc == 0) {
608                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_REG
609                            "%-5d\n", quantum);
610                 /**
611                  * Ignore -ENODEV as the regular NRS head's policy may be in the
612                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
613                  */
614         } else if (rc != -ENODEV) {
615                 return rc;
616         }
617
618         if (!nrs_svc_has_hp(svc))
619                 goto no_hp;
620
621         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
622                                        NRS_POL_NAME_CRRN,
623                                        NRS_CTL_CRRN_RD_QUANTUM,
624                                        true, &quantum);
625         if (rc == 0) {
626                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
627                 /**
628                  * Ignore -ENODEV as the high priority NRS head's policy may be
629                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
630                  */
631         } else if (rc != -ENODEV) {
632                 return rc;
633         }
634
635 no_hp:
636         return rc;
637 }
638
639 /**
640  * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
641  * for CRR-N policy instances of a service. The user can set the quantum size
642  * for the regular or high priority NRS head individually by specifying each
643  * value, or both together in a single invocation.
644  *
645  * For example:
646  *
647  * lctl set_param *.*.*.nrs_crrn_quantum=reg_quantum:32, to set the regular
648  * request quantum size on all PTLRPC services to 32
649  *
650  * lctl set_param *.*.*.nrs_crrn_quantum=hp_quantum:16, to set the high
651  * priority request quantum size on all PTLRPC services to 16, and
652  *
653  * lctl set_param *.*.ost_io.nrs_crrn_quantum=16, to set both the regular and
654  * high priority request quantum sizes of the ost_io service to 16.
655  *
656  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
657  * are skipped later by nrs_crrn_ctl().
658  */
659 static ssize_t
660 ptlrpc_lprocfs_nrs_crrn_quantum_seq_write(struct file *file,
661                                           const char __user *buffer,
662                                           size_t count,
663                                           loff_t *off)
664 {
665         struct seq_file             *m = file->private_data;
666         struct ptlrpc_service       *svc = m->private;
667         enum ptlrpc_nrs_queue_type   queue = 0;
668         char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
669         char                        *val;
670         long                         quantum_reg;
671         long                         quantum_hp;
672         /** lprocfs_find_named_value() modifies its argument, so keep a copy */
673         size_t                       count_copy;
674         int                          rc = 0;
675         int                          rc2 = 0;
676
677         if (count > (sizeof(kernbuf) - 1))
678                 return -EINVAL;
679
680         if (copy_from_user(kernbuf, buffer, count))
681                 return -EFAULT;
682
683         kernbuf[count] = '\0';
684
685         count_copy = count;
686
687         /**
688          * Check if the regular quantum value has been specified
689          */
690         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
691                                        &count_copy);
692         if (val != kernbuf) {
693                 rc = kstrtol(val, 10, &quantum_reg);
694                 if (rc)
695                         return rc;
696
697                 queue |= PTLRPC_NRS_QUEUE_REG;
698         }
699
700         count_copy = count;
701
702         /**
703          * Check if the high priority quantum value has been specified
704          */
705         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
706                                        &count_copy);
707         if (val != kernbuf) {
708                 if (!nrs_svc_has_hp(svc))
709                         return -ENODEV;
710
711                 rc = kstrtol(val, 10, &quantum_hp);
712                 if (rc)
713                         return rc;
714
715                 queue |= PTLRPC_NRS_QUEUE_HP;
716         }
717
718         /**
719          * If none of the queues has been specified, look for a valid numerical
720          * value
721          */
722         if (queue == 0) {
723                 rc = kstrtol(kernbuf, 10, &quantum_reg);
724                 if (rc)
725                         return rc;
726
727                 queue = PTLRPC_NRS_QUEUE_REG;
728
729                 if (nrs_svc_has_hp(svc)) {
730                         queue |= PTLRPC_NRS_QUEUE_HP;
731                         quantum_hp = quantum_reg;
732                 }
733         }
734
735         if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
736             ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
737             (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
738             ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
739                 return -EINVAL;
740
741         /**
742          * We change the values on regular and HP NRS heads separately, so that
743          * we do not exit early from ptlrpc_nrs_policy_control() with an error
744          * returned by nrs_policy_ctl_locked(), in cases where the user has not
745          * started the policy on either the regular or HP NRS head; i.e. we are
746          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
747          * only if the operation fails with -ENODEV on all heads that have been
748          * specified by the command; if at least one operation succeeds,
749          * success is returned.
750          */
751         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
752                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
753                                                NRS_POL_NAME_CRRN,
754                                                NRS_CTL_CRRN_WR_QUANTUM, false,
755                                                &quantum_reg);
756                 if ((rc < 0 && rc != -ENODEV) ||
757                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
758                         return rc;
759         }
760
761         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
762                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
763                                                 NRS_POL_NAME_CRRN,
764                                                 NRS_CTL_CRRN_WR_QUANTUM, false,
765                                                 &quantum_hp);
766                 if ((rc2 < 0 && rc2 != -ENODEV) ||
767                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
768                         return rc2;
769         }
770
771         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
772 }
773
774 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_crrn_quantum);
775
776 /**
777  * Initializes a CRR-N policy's lprocfs interface for service \a svc
778  *
779  * \param[in] svc the service
780  *
781  * \retval 0    success
782  * \retval != 0 error
783  */
784 static int nrs_crrn_lprocfs_init(struct ptlrpc_service *svc)
785 {
786         struct ldebugfs_vars nrs_crrn_lprocfs_vars[] = {
787                 { .name         = "nrs_crrn_quantum",
788                   .fops         = &ptlrpc_lprocfs_nrs_crrn_quantum_fops,
789                   .data = svc },
790                 { NULL }
791         };
792
793         if (!svc->srv_debugfs_entry)
794                 return 0;
795
796         ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_crrn_lprocfs_vars, NULL);
797
798         return 0;
799 }
800
801 /**
802  * CRR-N policy operations
803  */
804 static const struct ptlrpc_nrs_pol_ops nrs_crrn_ops = {
805         .op_policy_start        = nrs_crrn_start,
806         .op_policy_stop         = nrs_crrn_stop,
807         .op_policy_ctl          = nrs_crrn_ctl,
808         .op_res_get             = nrs_crrn_res_get,
809         .op_res_put             = nrs_crrn_res_put,
810         .op_req_get             = nrs_crrn_req_get,
811         .op_req_enqueue         = nrs_crrn_req_add,
812         .op_req_dequeue         = nrs_crrn_req_del,
813         .op_req_stop            = nrs_crrn_req_stop,
814         .op_lprocfs_init        = nrs_crrn_lprocfs_init,
815 };
816
817 /**
818  * CRR-N policy configuration
819  */
820 struct ptlrpc_nrs_pol_conf nrs_conf_crrn = {
821         .nc_name                = NRS_POL_NAME_CRRN,
822         .nc_ops                 = &nrs_crrn_ops,
823         .nc_compat              = nrs_policy_compat_all,
824 };
825
826 /** @} CRR-N policy */
827
828 /** @} nrs */