Whamcloud - gitweb
f62728fd0381d225f4ec3aca3e1e151f6fededd0
[fs/lustre-release.git] / lustre / ptlrpc / nrs_crr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2017, Intel Corporation.
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs_crr.c
29  *
30  * Network Request Scheduler (NRS) CRR-N policy
31  *
32  * Request ordering in a batched Round-Robin manner over client NIDs
33  *
34  * Author: Liang Zhen <liang@whamcloud.com>
35  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
36  */
37 /**
38  * \addtogoup nrs
39  * @{
40  */
41 #ifdef HAVE_SERVER_SUPPORT
42
43 #define DEBUG_SUBSYSTEM S_RPC
44 #include <obd_support.h>
45 #include <obd_class.h>
46 #include <lustre_net.h>
47 #include <lprocfs_status.h>
48 #include "ptlrpc_internal.h"
49
50 /**
51  * \name CRR-N policy
52  *
53  * Client Round-Robin scheduling over client NIDs
54  *
55  * @{
56  *
57  */
58
59 #define NRS_POL_NAME_CRRN       "crrn"
60
61 /**
62  * Binary heap predicate.
63  *
64  * Uses ptlrpc_nrs_request::nr_u::crr::cr_round and
65  * ptlrpc_nrs_request::nr_u::crr::cr_sequence to compare two binheap nodes and
66  * produce a binary predicate that shows their relative priority, so that the
67  * binary heap can perform the necessary sorting operations.
68  *
69  * \param[in] e1 the first binheap node to compare
70  * \param[in] e2 the second binheap node to compare
71  *
72  * \retval 0 e1 > e2
73  * \retval 1 e1 <= e2
74  */
75 static int
76 crrn_req_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
77 {
78         struct ptlrpc_nrs_request *nrq1;
79         struct ptlrpc_nrs_request *nrq2;
80
81         nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
82         nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
83
84         if (nrq1->nr_u.crr.cr_round < nrq2->nr_u.crr.cr_round)
85                 return 1;
86         else if (nrq1->nr_u.crr.cr_round > nrq2->nr_u.crr.cr_round)
87                 return 0;
88
89         return nrq1->nr_u.crr.cr_sequence < nrq2->nr_u.crr.cr_sequence;
90 }
91
92 static struct cfs_binheap_ops nrs_crrn_heap_ops = {
93         .hop_enter      = NULL,
94         .hop_exit       = NULL,
95         .hop_compare    = crrn_req_compare,
96 };
97
98 /**
99  * rhashtable operations for nrs_crrn_net::cn_cli_hash
100  *
101  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
102  * nrs_crrn_client objects.
103  */
104 static u32 nrs_crrn_hashfn(const void *data, u32 len, u32 seed)
105 {
106         const lnet_nid_t *nid = data;
107
108         seed ^= cfs_hash_64((u64)nid, 32);
109         return seed;
110 }
111
112 static int nrs_crrn_cmpfn(struct rhashtable_compare_arg *arg, const void *obj)
113 {
114         const struct nrs_crrn_client *cli = obj;
115         const lnet_nid_t *nid = arg->key;
116
117         return *nid != cli->cc_nid;
118 }
119
120 static const struct rhashtable_params nrs_crrn_hash_params = {
121         .key_len        = sizeof(lnet_nid_t),
122         .key_offset     = offsetof(struct nrs_crrn_client, cc_nid),
123         .head_offset    = offsetof(struct nrs_crrn_client, cc_rhead),
124         .hashfn         = nrs_crrn_hashfn,
125         .obj_cmpfn      = nrs_crrn_cmpfn,
126 };
127
128 static void nrs_crrn_exit(void *vcli, void *data)
129 {
130         struct nrs_crrn_client *cli = vcli;
131
132         LASSERTF(atomic_read(&cli->cc_ref) == 0,
133                  "Busy CRR-N object from client with NID %s, with %d refs\n",
134                  libcfs_nid2str(cli->cc_nid), atomic_read(&cli->cc_ref));
135
136         OBD_FREE_PTR(cli);
137 }
138
139 /**
140  * Called when a CRR-N policy instance is started.
141  *
142  * \param[in] policy the policy
143  *
144  * \retval -ENOMEM OOM error
145  * \retval 0       success
146  */
147 static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy, char *arg)
148 {
149         struct nrs_crrn_net    *net;
150         int                     rc = 0;
151         ENTRY;
152
153         OBD_CPT_ALLOC_PTR(net, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
154         if (net == NULL)
155                 RETURN(-ENOMEM);
156
157         net->cn_binheap = cfs_binheap_create(&nrs_crrn_heap_ops,
158                                              CBH_FLAG_ATOMIC_GROW, 4096, NULL,
159                                              nrs_pol2cptab(policy),
160                                              nrs_pol2cptid(policy));
161         if (net->cn_binheap == NULL)
162                 GOTO(out_net, rc = -ENOMEM);
163
164         rc = rhashtable_init(&net->cn_cli_hash, &nrs_crrn_hash_params);
165         if (rc)
166                 GOTO(out_binheap, rc);
167
168         /**
169          * Set default quantum value to max_rpcs_in_flight for non-MDS OSCs;
170          * there may be more RPCs pending from each struct nrs_crrn_client even
171          * with the default max_rpcs_in_flight value, as we are scheduling over
172          * NIDs, and there may be more than one mount point per client.
173          */
174         net->cn_quantum = OBD_MAX_RIF_DEFAULT;
175         /**
176          * Set to 1 so that the test inside nrs_crrn_req_add() can evaluate to
177          * true.
178          */
179         net->cn_sequence = 1;
180
181         policy->pol_private = net;
182
183         RETURN(rc);
184
185 out_binheap:
186         cfs_binheap_destroy(net->cn_binheap);
187 out_net:
188         OBD_FREE_PTR(net);
189
190         RETURN(rc);
191 }
192
193 /**
194  * Called when a CRR-N policy instance is stopped.
195  *
196  * Called when the policy has been instructed to transition to the
197  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more pending
198  * requests to serve.
199  *
200  * \param[in] policy the policy
201  */
202 static void nrs_crrn_stop(struct ptlrpc_nrs_policy *policy)
203 {
204         struct nrs_crrn_net     *net = policy->pol_private;
205         ENTRY;
206
207         LASSERT(net != NULL);
208         LASSERT(net->cn_binheap != NULL);
209         LASSERT(cfs_binheap_is_empty(net->cn_binheap));
210
211         rhashtable_free_and_destroy(&net->cn_cli_hash, nrs_crrn_exit, NULL);
212         cfs_binheap_destroy(net->cn_binheap);
213
214         OBD_FREE_PTR(net);
215 }
216
217 /**
218  * Performs a policy-specific ctl function on CRR-N policy instances; similar
219  * to ioctl.
220  *
221  * \param[in]     policy the policy instance
222  * \param[in]     opc    the opcode
223  * \param[in,out] arg    used for passing parameters and information
224  *
225  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
226  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
227  *
228  * \retval 0   operation carried out successfully
229  * \retval -ve error
230  */
231 static int nrs_crrn_ctl(struct ptlrpc_nrs_policy *policy,
232                         enum ptlrpc_nrs_ctl opc,
233                         void *arg)
234 {
235         assert_spin_locked(&policy->pol_nrs->nrs_lock);
236
237         switch((enum nrs_ctl_crr)opc) {
238         default:
239                 RETURN(-EINVAL);
240
241         /**
242          * Read Round Robin quantum size of a policy instance.
243          */
244         case NRS_CTL_CRRN_RD_QUANTUM: {
245                 struct nrs_crrn_net     *net = policy->pol_private;
246
247                 *(__u16 *)arg = net->cn_quantum;
248                 }
249                 break;
250
251         /**
252          * Write Round Robin quantum size of a policy instance.
253          */
254         case NRS_CTL_CRRN_WR_QUANTUM: {
255                 struct nrs_crrn_net     *net = policy->pol_private;
256
257                 net->cn_quantum = *(__u16 *)arg;
258                 LASSERT(net->cn_quantum != 0);
259                 }
260                 break;
261         }
262
263         RETURN(0);
264 }
265
266 /**
267  * Obtains resources from CRR-N policy instances. The top-level resource lives
268  * inside \e nrs_crrn_net and the second-level resource inside
269  * \e nrs_crrn_client object instances.
270  *
271  * \param[in]  policy     the policy for which resources are being taken for
272  *                        request \a nrq
273  * \param[in]  nrq        the request for which resources are being taken
274  * \param[in]  parent     parent resource, embedded in nrs_crrn_net for the
275  *                        CRR-N policy
276  * \param[out] resp       resources references are placed in this array
277  * \param[in]  moving_req signifies limited caller context; used to perform
278  *                        memory allocations in an atomic context in this
279  *                        policy
280  *
281  * \retval 0   we are returning a top-level, parent resource, one that is
282  *             embedded in an nrs_crrn_net object
283  * \retval 1   we are returning a bottom-level resource, one that is embedded
284  *             in an nrs_crrn_client object
285  *
286  * \see nrs_resource_get_safe()
287  */
288 static int nrs_crrn_res_get(struct ptlrpc_nrs_policy *policy,
289                             struct ptlrpc_nrs_request *nrq,
290                             const struct ptlrpc_nrs_resource *parent,
291                             struct ptlrpc_nrs_resource **resp, bool moving_req)
292 {
293         struct nrs_crrn_net     *net;
294         struct nrs_crrn_client  *cli;
295         struct nrs_crrn_client  *tmp;
296         struct ptlrpc_request   *req;
297
298         if (parent == NULL) {
299                 *resp = &((struct nrs_crrn_net *)policy->pol_private)->cn_res;
300                 return 0;
301         }
302
303         net = container_of(parent, struct nrs_crrn_net, cn_res);
304         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
305
306         cli = rhashtable_lookup_fast(&net->cn_cli_hash, &req->rq_peer.nid,
307                                      nrs_crrn_hash_params);
308         if (cli)
309                 goto out;
310
311         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
312                           sizeof(*cli), moving_req ? GFP_ATOMIC : GFP_NOFS);
313         if (cli == NULL)
314                 return -ENOMEM;
315
316         cli->cc_nid = req->rq_peer.nid;
317
318         atomic_set(&cli->cc_ref, 0);
319
320         tmp = rhashtable_lookup_get_insert_fast(&net->cn_cli_hash,
321                                                 &cli->cc_rhead,
322                                                 nrs_crrn_hash_params);
323         if (tmp) {
324                 /* insertion failed */
325                 OBD_FREE_PTR(cli);
326                 if (IS_ERR(tmp))
327                         return PTR_ERR(tmp);
328                 cli = tmp;
329         }
330 out:
331         atomic_inc(&cli->cc_ref);
332         *resp = &cli->cc_res;
333
334         return 1;
335 }
336
337 /**
338  * Called when releasing references to the resource hierachy obtained for a
339  * request for scheduling using the CRR-N policy.
340  *
341  * \param[in] policy   the policy the resource belongs to
342  * \param[in] res      the resource to be released
343  */
344 static void nrs_crrn_res_put(struct ptlrpc_nrs_policy *policy,
345                              const struct ptlrpc_nrs_resource *res)
346 {
347         struct nrs_crrn_client *cli;
348
349         /**
350          * Do nothing for freeing parent, nrs_crrn_net resources
351          */
352         if (res->res_parent == NULL)
353                 return;
354
355         cli = container_of(res, struct nrs_crrn_client, cc_res);
356
357         atomic_dec(&cli->cc_ref);
358 }
359
360 /**
361  * Called when getting a request from the CRR-N policy for handlingso that it can be served
362  *
363  * \param[in] policy the policy being polled
364  * \param[in] peek   when set, signifies that we just want to examine the
365  *                   request, and not handle it, so the request is not removed
366  *                   from the policy.
367  * \param[in] force  force the policy to return a request; unused in this policy
368  *
369  * \retval the request to be handled
370  * \retval NULL no request available
371  *
372  * \see ptlrpc_nrs_req_get_nolock()
373  * \see nrs_request_get()
374  */
375 static
376 struct ptlrpc_nrs_request *nrs_crrn_req_get(struct ptlrpc_nrs_policy *policy,
377                                             bool peek, bool force)
378 {
379         struct nrs_crrn_net       *net = policy->pol_private;
380         struct cfs_binheap_node   *node = cfs_binheap_root(net->cn_binheap);
381         struct ptlrpc_nrs_request *nrq;
382
383         nrq = unlikely(node == NULL) ? NULL :
384               container_of(node, struct ptlrpc_nrs_request, nr_node);
385
386         if (likely(!peek && nrq != NULL)) {
387                 struct nrs_crrn_client *cli;
388                 struct ptlrpc_request *req = container_of(nrq,
389                                                           struct ptlrpc_request,
390                                                           rq_nrq);
391
392                 cli = container_of(nrs_request_resource(nrq),
393                                    struct nrs_crrn_client, cc_res);
394
395                 LASSERT(nrq->nr_u.crr.cr_round <= cli->cc_round);
396
397                 cfs_binheap_remove(net->cn_binheap, &nrq->nr_node);
398                 cli->cc_active--;
399
400                 CDEBUG(D_RPCTRACE,
401                        "NRS: starting to handle %s request from %s, with round "
402                        "%llu\n", NRS_POL_NAME_CRRN,
403                        libcfs_id2str(req->rq_peer), nrq->nr_u.crr.cr_round);
404
405                 /** Peek at the next request to be served */
406                 node = cfs_binheap_root(net->cn_binheap);
407
408                 /** No more requests */
409                 if (unlikely(node == NULL)) {
410                         net->cn_round++;
411                 } else {
412                         struct ptlrpc_nrs_request *next;
413
414                         next = container_of(node, struct ptlrpc_nrs_request,
415                                             nr_node);
416
417                         if (net->cn_round < next->nr_u.crr.cr_round)
418                                 net->cn_round = next->nr_u.crr.cr_round;
419                 }
420         }
421
422         return nrq;
423 }
424
425 /**
426  * Adds request \a nrq to a CRR-N \a policy instance's set of queued requests
427  *
428  * A scheduling round is a stream of requests that have been sorted in batches
429  * according to the client that they originate from (as identified by its NID);
430  * there can be only one batch for each client in each round. The batches are of
431  * maximum size nrs_crrn_net:cn_quantum. When a new request arrives for
432  * scheduling from a client that has exhausted its quantum in its current round,
433  * it will start scheduling requests on the next scheduling round. Clients are
434  * allowed to schedule requests against a round until all requests for the round
435  * are serviced, so a client might miss a round if it is not generating requests
436  * for a long enough period of time. Clients that miss a round will continue
437  * with scheduling the next request that they generate, starting at the round
438  * that requests are being dispatched for, at the time of arrival of this new
439  * request.
440  *
441  * Requests are tagged with the round number and a sequence number; the sequence
442  * number indicates the relative ordering amongst the batches of requests in a
443  * round, and is identical for all requests in a batch, as is the round number.
444  * The round and sequence numbers are used by crrn_req_compare() in order to
445  * maintain an ordered set of rounds, with each round consisting of an ordered
446  * set of batches of requests.
447  *
448  * \param[in] policy the policy
449  * \param[in] nrq    the request to add
450  *
451  * \retval 0    request successfully added
452  * \retval != 0 error
453  */
454 static int nrs_crrn_req_add(struct ptlrpc_nrs_policy *policy,
455                             struct ptlrpc_nrs_request *nrq)
456 {
457         struct nrs_crrn_net     *net;
458         struct nrs_crrn_client  *cli;
459         int                      rc;
460
461         cli = container_of(nrs_request_resource(nrq),
462                            struct nrs_crrn_client, cc_res);
463         net = container_of(nrs_request_resource(nrq)->res_parent,
464                            struct nrs_crrn_net, cn_res);
465
466         if (cli->cc_quantum == 0 || cli->cc_round < net->cn_round ||
467             (cli->cc_active == 0 && cli->cc_quantum > 0)) {
468
469                 /**
470                  * If the client has no pending requests, and still some of its
471                  * quantum remaining unused, which implies it has not had a
472                  * chance to schedule up to its maximum allowed batch size of
473                  * requests in the previous round it participated, schedule this
474                  * next request on a new round; this avoids fragmentation of
475                  * request batches caused by client inactivity, at the expense
476                  * of potentially slightly increased service time for the
477                  * request batch this request will be a part of.
478                  */
479                 if (cli->cc_active == 0 && cli->cc_quantum > 0)
480                         cli->cc_round++;
481
482                 /** A new scheduling round has commenced */
483                 if (cli->cc_round < net->cn_round)
484                         cli->cc_round = net->cn_round;
485
486                 /** I was not the last client through here */
487                 if (cli->cc_sequence < net->cn_sequence)
488                         cli->cc_sequence = ++net->cn_sequence;
489                 /**
490                  * Reset the quantum if we have reached the maximum quantum
491                  * size for this batch, or even if we have not managed to
492                  * complete a batch size up to its maximum allowed size.
493                  * XXX: Accessed unlocked
494                  */
495                 cli->cc_quantum = net->cn_quantum;
496         }
497
498         nrq->nr_u.crr.cr_round = cli->cc_round;
499         nrq->nr_u.crr.cr_sequence = cli->cc_sequence;
500
501         rc = cfs_binheap_insert(net->cn_binheap, &nrq->nr_node);
502         if (rc == 0) {
503                 cli->cc_active++;
504                 if (--cli->cc_quantum == 0)
505                         cli->cc_round++;
506         }
507         return rc;
508 }
509
510 /**
511  * Removes request \a nrq from a CRR-N \a policy instance's set of queued
512  * requests.
513  *
514  * \param[in] policy the policy
515  * \param[in] nrq    the request to remove
516  */
517 static void nrs_crrn_req_del(struct ptlrpc_nrs_policy *policy,
518                              struct ptlrpc_nrs_request *nrq)
519 {
520         struct nrs_crrn_net     *net;
521         struct nrs_crrn_client  *cli;
522         bool                     is_root;
523
524         cli = container_of(nrs_request_resource(nrq),
525                            struct nrs_crrn_client, cc_res);
526         net = container_of(nrs_request_resource(nrq)->res_parent,
527                            struct nrs_crrn_net, cn_res);
528
529         LASSERT(nrq->nr_u.crr.cr_round <= cli->cc_round);
530
531         is_root = &nrq->nr_node == cfs_binheap_root(net->cn_binheap);
532
533         cfs_binheap_remove(net->cn_binheap, &nrq->nr_node);
534         cli->cc_active--;
535
536         /**
537          * If we just deleted the node at the root of the binheap, we may have
538          * to adjust round numbers.
539          */
540         if (unlikely(is_root)) {
541                 /** Peek at the next request to be served */
542                 struct cfs_binheap_node *node = cfs_binheap_root(net->cn_binheap);
543
544                 /** No more requests */
545                 if (unlikely(node == NULL)) {
546                         net->cn_round++;
547                 } else {
548                         nrq = container_of(node, struct ptlrpc_nrs_request,
549                                            nr_node);
550
551                         if (net->cn_round < nrq->nr_u.crr.cr_round)
552                                 net->cn_round = nrq->nr_u.crr.cr_round;
553                 }
554         }
555 }
556
557 /**
558  * Called right after the request \a nrq finishes being handled by CRR-N policy
559  * instance \a policy.
560  *
561  * \param[in] policy the policy that handled the request
562  * \param[in] nrq    the request that was handled
563  */
564 static void nrs_crrn_req_stop(struct ptlrpc_nrs_policy *policy,
565                               struct ptlrpc_nrs_request *nrq)
566 {
567         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
568                                                   rq_nrq);
569
570         CDEBUG(D_RPCTRACE,
571                "NRS: finished handling %s request from %s, with round %llu"
572                "\n", NRS_POL_NAME_CRRN,
573                libcfs_id2str(req->rq_peer), nrq->nr_u.crr.cr_round);
574 }
575
576 /**
577  * debugfs interface
578  */
579
580 /**
581  * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
582  * for CRR-N policy instances on both the regular and high-priority NRS head
583  * of a service, as long as a policy instance is not in the
584  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
585  * state are skipped later by nrs_crrn_ctl().
586  *
587  * Quantum values are in # of RPCs, and output is in YAML format.
588  *
589  * For example:
590  *
591  *      reg_quantum:8
592  *      hp_quantum:4
593  */
594 static int
595 ptlrpc_lprocfs_nrs_crrn_quantum_seq_show(struct seq_file *m, void *data)
596 {
597         struct ptlrpc_service   *svc = m->private;
598         __u16                   quantum;
599         int                     rc;
600
601         /**
602          * Perform two separate calls to this as only one of the NRS heads'
603          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
604          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
605          */
606         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
607                                        NRS_POL_NAME_CRRN,
608                                        NRS_CTL_CRRN_RD_QUANTUM,
609                                        true, &quantum);
610         if (rc == 0) {
611                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_REG
612                            "%-5d\n", quantum);
613                 /**
614                  * Ignore -ENODEV as the regular NRS head's policy may be in the
615                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
616                  */
617         } else if (rc != -ENODEV) {
618                 return rc;
619         }
620
621         if (!nrs_svc_has_hp(svc))
622                 goto no_hp;
623
624         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
625                                        NRS_POL_NAME_CRRN,
626                                        NRS_CTL_CRRN_RD_QUANTUM,
627                                        true, &quantum);
628         if (rc == 0) {
629                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
630                 /**
631                  * Ignore -ENODEV as the high priority NRS head's policy may be
632                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
633                  */
634         } else if (rc != -ENODEV) {
635                 return rc;
636         }
637
638 no_hp:
639         return rc;
640 }
641
642 /**
643  * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
644  * for CRR-N policy instances of a service. The user can set the quantum size
645  * for the regular or high priority NRS head individually by specifying each
646  * value, or both together in a single invocation.
647  *
648  * For example:
649  *
650  * lctl set_param *.*.*.nrs_crrn_quantum=reg_quantum:32, to set the regular
651  * request quantum size on all PTLRPC services to 32
652  *
653  * lctl set_param *.*.*.nrs_crrn_quantum=hp_quantum:16, to set the high
654  * priority request quantum size on all PTLRPC services to 16, and
655  *
656  * lctl set_param *.*.ost_io.nrs_crrn_quantum=16, to set both the regular and
657  * high priority request quantum sizes of the ost_io service to 16.
658  *
659  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
660  * are skipped later by nrs_crrn_ctl().
661  */
662 static ssize_t
663 ptlrpc_lprocfs_nrs_crrn_quantum_seq_write(struct file *file,
664                                           const char __user *buffer,
665                                           size_t count,
666                                           loff_t *off)
667 {
668         struct ptlrpc_service       *svc = ((struct seq_file *)file->private_data)->private;
669         enum ptlrpc_nrs_queue_type   queue = 0;
670         char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
671         char                        *val;
672         long                         quantum_reg;
673         long                         quantum_hp;
674         /** lprocfs_find_named_value() modifies its argument, so keep a copy */
675         size_t                       count_copy;
676         int                          rc = 0;
677         int                          rc2 = 0;
678
679         if (count > (sizeof(kernbuf) - 1))
680                 return -EINVAL;
681
682         if (copy_from_user(kernbuf, buffer, count))
683                 return -EFAULT;
684
685         kernbuf[count] = '\0';
686
687         count_copy = count;
688
689         /**
690          * Check if the regular quantum value has been specified
691          */
692         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
693                                        &count_copy);
694         if (val != kernbuf) {
695                 rc = kstrtol(val, 10, &quantum_reg);
696                 if (rc)
697                         return rc;
698
699                 queue |= PTLRPC_NRS_QUEUE_REG;
700         }
701
702         count_copy = count;
703
704         /**
705          * Check if the high priority quantum value has been specified
706          */
707         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
708                                        &count_copy);
709         if (val != kernbuf) {
710                 if (!nrs_svc_has_hp(svc))
711                         return -ENODEV;
712
713                 rc = kstrtol(val, 10, &quantum_hp);
714                 if (rc)
715                         return rc;
716
717                 queue |= PTLRPC_NRS_QUEUE_HP;
718         }
719
720         /**
721          * If none of the queues has been specified, look for a valid numerical
722          * value
723          */
724         if (queue == 0) {
725                 rc = kstrtol(kernbuf, 10, &quantum_reg);
726                 if (rc)
727                         return rc;
728
729                 queue = PTLRPC_NRS_QUEUE_REG;
730
731                 if (nrs_svc_has_hp(svc)) {
732                         queue |= PTLRPC_NRS_QUEUE_HP;
733                         quantum_hp = quantum_reg;
734                 }
735         }
736
737         if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
738             ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
739             (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
740             ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
741                 return -EINVAL;
742
743         /**
744          * We change the values on regular and HP NRS heads separately, so that
745          * we do not exit early from ptlrpc_nrs_policy_control() with an error
746          * returned by nrs_policy_ctl_locked(), in cases where the user has not
747          * started the policy on either the regular or HP NRS head; i.e. we are
748          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
749          * only if the operation fails with -ENODEV on all heads that have been
750          * specified by the command; if at least one operation succeeds,
751          * success is returned.
752          */
753         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
754                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
755                                                NRS_POL_NAME_CRRN,
756                                                NRS_CTL_CRRN_WR_QUANTUM, false,
757                                                &quantum_reg);
758                 if ((rc < 0 && rc != -ENODEV) ||
759                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
760                         return rc;
761         }
762
763         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
764                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
765                                                 NRS_POL_NAME_CRRN,
766                                                 NRS_CTL_CRRN_WR_QUANTUM, false,
767                                                 &quantum_hp);
768                 if ((rc2 < 0 && rc2 != -ENODEV) ||
769                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
770                         return rc2;
771         }
772
773         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
774 }
775
776 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_crrn_quantum);
777
778 /**
779  * Initializes a CRR-N policy's lprocfs interface for service \a svc
780  *
781  * \param[in] svc the service
782  *
783  * \retval 0    success
784  * \retval != 0 error
785  */
786 static int nrs_crrn_lprocfs_init(struct ptlrpc_service *svc)
787 {
788         struct lprocfs_vars nrs_crrn_lprocfs_vars[] = {
789                 { .name         = "nrs_crrn_quantum",
790                   .fops         = &ptlrpc_lprocfs_nrs_crrn_quantum_fops,
791                   .data = svc },
792                 { NULL }
793         };
794
795         if (IS_ERR_OR_NULL(svc->srv_debugfs_entry))
796                 return 0;
797
798         return ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_crrn_lprocfs_vars, NULL);
799 }
800
801 /**
802  * CRR-N policy operations
803  */
804 static const struct ptlrpc_nrs_pol_ops nrs_crrn_ops = {
805         .op_policy_start        = nrs_crrn_start,
806         .op_policy_stop         = nrs_crrn_stop,
807         .op_policy_ctl          = nrs_crrn_ctl,
808         .op_res_get             = nrs_crrn_res_get,
809         .op_res_put             = nrs_crrn_res_put,
810         .op_req_get             = nrs_crrn_req_get,
811         .op_req_enqueue         = nrs_crrn_req_add,
812         .op_req_dequeue         = nrs_crrn_req_del,
813         .op_req_stop            = nrs_crrn_req_stop,
814         .op_lprocfs_init        = nrs_crrn_lprocfs_init,
815 };
816
817 /**
818  * CRR-N policy configuration
819  */
820 struct ptlrpc_nrs_pol_conf nrs_conf_crrn = {
821         .nc_name                = NRS_POL_NAME_CRRN,
822         .nc_ops                 = &nrs_crrn_ops,
823         .nc_compat              = nrs_policy_compat_all,
824 };
825
826 /** @} CRR-N policy */
827
828 /** @} nrs */
829
830 #endif /* HAVE_SERVER_SUPPORT */