Whamcloud - gitweb
LU-5710 all: second batch of corrected typos and grammar errors
[fs/lustre-release.git] / lustre / ptlrpc / nrs_crr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2011, 2014, Intel Corporation.
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs_crr.c
29  *
30  * Network Request Scheduler (NRS) CRR-N policy
31  *
32  * Request ordering in a batched Round-Robin manner over client NIDs
33  *
34  * Author: Liang Zhen <liang@whamcloud.com>
35  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
36  */
37 /**
38  * \addtogoup nrs
39  * @{
40  */
41 #ifdef HAVE_SERVER_SUPPORT
42
43 #define DEBUG_SUBSYSTEM S_RPC
44 #include <obd_support.h>
45 #include <obd_class.h>
46 #include <lustre_net.h>
47 #include <lprocfs_status.h>
48 #include "ptlrpc_internal.h"
49
50 /**
51  * \name CRR-N policy
52  *
53  * Client Round-Robin scheduling over client NIDs
54  *
55  * @{
56  *
57  */
58
59 #define NRS_POL_NAME_CRRN       "crrn"
60
61 /**
62  * Binary heap predicate.
63  *
64  * Uses ptlrpc_nrs_request::nr_u::crr::cr_round and
65  * ptlrpc_nrs_request::nr_u::crr::cr_sequence to compare two binheap nodes and
66  * produce a binary predicate that shows their relative priority, so that the
67  * binary heap can perform the necessary sorting operations.
68  *
69  * \param[in] e1 the first binheap node to compare
70  * \param[in] e2 the second binheap node to compare
71  *
72  * \retval 0 e1 > e2
73  * \retval 1 e1 <= e2
74  */
75 static int crrn_req_compare(cfs_binheap_node_t *e1, cfs_binheap_node_t *e2)
76 {
77         struct ptlrpc_nrs_request *nrq1;
78         struct ptlrpc_nrs_request *nrq2;
79
80         nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
81         nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
82
83         if (nrq1->nr_u.crr.cr_round < nrq2->nr_u.crr.cr_round)
84                 return 1;
85         else if (nrq1->nr_u.crr.cr_round > nrq2->nr_u.crr.cr_round)
86                 return 0;
87
88         return nrq1->nr_u.crr.cr_sequence < nrq2->nr_u.crr.cr_sequence;
89 }
90
91 static cfs_binheap_ops_t nrs_crrn_heap_ops = {
92         .hop_enter      = NULL,
93         .hop_exit       = NULL,
94         .hop_compare    = crrn_req_compare,
95 };
96
97 /**
98  * libcfs_hash operations for nrs_crrn_net::cn_cli_hash
99  *
100  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
101  * nrs_crrn_client objects.
102  */
103 #define NRS_NID_BKT_BITS        8
104 #define NRS_NID_BITS            16
105
106 static unsigned nrs_crrn_hop_hash(cfs_hash_t *hs, const void *key,
107                                   unsigned mask)
108 {
109         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
110 }
111
112 static int nrs_crrn_hop_keycmp(const void *key, struct hlist_node *hnode)
113 {
114         lnet_nid_t              *nid = (lnet_nid_t *)key;
115         struct nrs_crrn_client  *cli = hlist_entry(hnode,
116                                                        struct nrs_crrn_client,
117                                                        cc_hnode);
118         return *nid == cli->cc_nid;
119 }
120
121 static void *nrs_crrn_hop_key(struct hlist_node *hnode)
122 {
123         struct nrs_crrn_client  *cli = hlist_entry(hnode,
124                                                        struct nrs_crrn_client,
125                                                        cc_hnode);
126         return &cli->cc_nid;
127 }
128
129 static void *nrs_crrn_hop_object(struct hlist_node *hnode)
130 {
131         return hlist_entry(hnode, struct nrs_crrn_client, cc_hnode);
132 }
133
134 static void nrs_crrn_hop_get(cfs_hash_t *hs, struct hlist_node *hnode)
135 {
136         struct nrs_crrn_client *cli = hlist_entry(hnode,
137                                                       struct nrs_crrn_client,
138                                                       cc_hnode);
139         atomic_inc(&cli->cc_ref);
140 }
141
142 static void nrs_crrn_hop_put(cfs_hash_t *hs, struct hlist_node *hnode)
143 {
144         struct nrs_crrn_client  *cli = hlist_entry(hnode,
145                                                        struct nrs_crrn_client,
146                                                        cc_hnode);
147         atomic_dec(&cli->cc_ref);
148 }
149
150 static void nrs_crrn_hop_exit(cfs_hash_t *hs, struct hlist_node *hnode)
151 {
152         struct nrs_crrn_client  *cli = hlist_entry(hnode,
153                                                        struct nrs_crrn_client,
154                                                        cc_hnode);
155         LASSERTF(atomic_read(&cli->cc_ref) == 0,
156                  "Busy CRR-N object from client with NID %s, with %d refs\n",
157                  libcfs_nid2str(cli->cc_nid), atomic_read(&cli->cc_ref));
158
159         OBD_FREE_PTR(cli);
160 }
161
162 static cfs_hash_ops_t nrs_crrn_hash_ops = {
163         .hs_hash        = nrs_crrn_hop_hash,
164         .hs_keycmp      = nrs_crrn_hop_keycmp,
165         .hs_key         = nrs_crrn_hop_key,
166         .hs_object      = nrs_crrn_hop_object,
167         .hs_get         = nrs_crrn_hop_get,
168         .hs_put         = nrs_crrn_hop_put,
169         .hs_put_locked  = nrs_crrn_hop_put,
170         .hs_exit        = nrs_crrn_hop_exit,
171 };
172
173 /**
174  * Called when a CRR-N policy instance is started.
175  *
176  * \param[in] policy the policy
177  *
178  * \retval -ENOMEM OOM error
179  * \retval 0       success
180  */
181 static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy, char *arg)
182 {
183         struct nrs_crrn_net    *net;
184         int                     rc = 0;
185         ENTRY;
186
187         OBD_CPT_ALLOC_PTR(net, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
188         if (net == NULL)
189                 RETURN(-ENOMEM);
190
191         net->cn_binheap = cfs_binheap_create(&nrs_crrn_heap_ops,
192                                              CBH_FLAG_ATOMIC_GROW, 4096, NULL,
193                                              nrs_pol2cptab(policy),
194                                              nrs_pol2cptid(policy));
195         if (net->cn_binheap == NULL)
196                 GOTO(failed, rc = -ENOMEM);
197
198         net->cn_cli_hash = cfs_hash_create("nrs_crrn_nid_hash",
199                                            NRS_NID_BITS, NRS_NID_BITS,
200                                            NRS_NID_BKT_BITS, 0,
201                                            CFS_HASH_MIN_THETA,
202                                            CFS_HASH_MAX_THETA,
203                                            &nrs_crrn_hash_ops,
204                                            CFS_HASH_RW_BKTLOCK);
205         if (net->cn_cli_hash == NULL)
206                 GOTO(failed, rc = -ENOMEM);
207
208         /**
209          * Set default quantum value to max_rpcs_in_flight for non-MDS OSCs;
210          * there may be more RPCs pending from each struct nrs_crrn_client even
211          * with the default max_rpcs_in_flight value, as we are scheduling over
212          * NIDs, and there may be more than one mount point per client.
213          */
214         net->cn_quantum = OBD_MAX_RIF_DEFAULT;
215         /**
216          * Set to 1 so that the test inside nrs_crrn_req_add() can evaluate to
217          * true.
218          */
219         net->cn_sequence = 1;
220
221         policy->pol_private = net;
222
223         RETURN(rc);
224
225 failed:
226         if (net->cn_binheap != NULL)
227                 cfs_binheap_destroy(net->cn_binheap);
228
229         OBD_FREE_PTR(net);
230
231         RETURN(rc);
232 }
233
234 /**
235  * Called when a CRR-N policy instance is stopped.
236  *
237  * Called when the policy has been instructed to transition to the
238  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more pending
239  * requests to serve.
240  *
241  * \param[in] policy the policy
242  */
243 static void nrs_crrn_stop(struct ptlrpc_nrs_policy *policy)
244 {
245         struct nrs_crrn_net     *net = policy->pol_private;
246         ENTRY;
247
248         LASSERT(net != NULL);
249         LASSERT(net->cn_binheap != NULL);
250         LASSERT(net->cn_cli_hash != NULL);
251         LASSERT(cfs_binheap_is_empty(net->cn_binheap));
252
253         cfs_binheap_destroy(net->cn_binheap);
254         cfs_hash_putref(net->cn_cli_hash);
255
256         OBD_FREE_PTR(net);
257 }
258
259 /**
260  * Performs a policy-specific ctl function on CRR-N policy instances; similar
261  * to ioctl.
262  *
263  * \param[in]     policy the policy instance
264  * \param[in]     opc    the opcode
265  * \param[in,out] arg    used for passing parameters and information
266  *
267  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
268  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
269  *
270  * \retval 0   operation carried out successfully
271  * \retval -ve error
272  */
273 static int nrs_crrn_ctl(struct ptlrpc_nrs_policy *policy,
274                         enum ptlrpc_nrs_ctl opc,
275                         void *arg)
276 {
277         assert_spin_locked(&policy->pol_nrs->nrs_lock);
278
279         switch((enum nrs_ctl_crr)opc) {
280         default:
281                 RETURN(-EINVAL);
282
283         /**
284          * Read Round Robin quantum size of a policy instance.
285          */
286         case NRS_CTL_CRRN_RD_QUANTUM: {
287                 struct nrs_crrn_net     *net = policy->pol_private;
288
289                 *(__u16 *)arg = net->cn_quantum;
290                 }
291                 break;
292
293         /**
294          * Write Round Robin quantum size of a policy instance.
295          */
296         case NRS_CTL_CRRN_WR_QUANTUM: {
297                 struct nrs_crrn_net     *net = policy->pol_private;
298
299                 net->cn_quantum = *(__u16 *)arg;
300                 LASSERT(net->cn_quantum != 0);
301                 }
302                 break;
303         }
304
305         RETURN(0);
306 }
307
308 /**
309  * Obtains resources from CRR-N policy instances. The top-level resource lives
310  * inside \e nrs_crrn_net and the second-level resource inside
311  * \e nrs_crrn_client object instances.
312  *
313  * \param[in]  policy     the policy for which resources are being taken for
314  *                        request \a nrq
315  * \param[in]  nrq        the request for which resources are being taken
316  * \param[in]  parent     parent resource, embedded in nrs_crrn_net for the
317  *                        CRR-N policy
318  * \param[out] resp       resources references are placed in this array
319  * \param[in]  moving_req signifies limited caller context; used to perform
320  *                        memory allocations in an atomic context in this
321  *                        policy
322  *
323  * \retval 0   we are returning a top-level, parent resource, one that is
324  *             embedded in an nrs_crrn_net object
325  * \retval 1   we are returning a bottom-level resource, one that is embedded
326  *             in an nrs_crrn_client object
327  *
328  * \see nrs_resource_get_safe()
329  */
330 static int nrs_crrn_res_get(struct ptlrpc_nrs_policy *policy,
331                             struct ptlrpc_nrs_request *nrq,
332                             const struct ptlrpc_nrs_resource *parent,
333                             struct ptlrpc_nrs_resource **resp, bool moving_req)
334 {
335         struct nrs_crrn_net     *net;
336         struct nrs_crrn_client  *cli;
337         struct nrs_crrn_client  *tmp;
338         struct ptlrpc_request   *req;
339
340         if (parent == NULL) {
341                 *resp = &((struct nrs_crrn_net *)policy->pol_private)->cn_res;
342                 return 0;
343         }
344
345         net = container_of(parent, struct nrs_crrn_net, cn_res);
346         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
347
348         cli = cfs_hash_lookup(net->cn_cli_hash, &req->rq_peer.nid);
349         if (cli != NULL)
350                 goto out;
351
352         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
353                           sizeof(*cli), moving_req ? GFP_ATOMIC : GFP_NOFS);
354         if (cli == NULL)
355                 return -ENOMEM;
356
357         cli->cc_nid = req->rq_peer.nid;
358
359         atomic_set(&cli->cc_ref, 1);
360         tmp = cfs_hash_findadd_unique(net->cn_cli_hash, &cli->cc_nid,
361                                       &cli->cc_hnode);
362         if (tmp != cli) {
363                 OBD_FREE_PTR(cli);
364                 cli = tmp;
365         }
366 out:
367         *resp = &cli->cc_res;
368
369         return 1;
370 }
371
372 /**
373  * Called when releasing references to the resource hierachy obtained for a
374  * request for scheduling using the CRR-N policy.
375  *
376  * \param[in] policy   the policy the resource belongs to
377  * \param[in] res      the resource to be released
378  */
379 static void nrs_crrn_res_put(struct ptlrpc_nrs_policy *policy,
380                              const struct ptlrpc_nrs_resource *res)
381 {
382         struct nrs_crrn_net     *net;
383         struct nrs_crrn_client  *cli;
384
385         /**
386          * Do nothing for freeing parent, nrs_crrn_net resources
387          */
388         if (res->res_parent == NULL)
389                 return;
390
391         cli = container_of(res, struct nrs_crrn_client, cc_res);
392         net = container_of(res->res_parent, struct nrs_crrn_net, cn_res);
393
394         cfs_hash_put(net->cn_cli_hash, &cli->cc_hnode);
395 }
396
397 /**
398  * Called when getting a request from the CRR-N policy for handlingso that it can be served
399  *
400  * \param[in] policy the policy being polled
401  * \param[in] peek   when set, signifies that we just want to examine the
402  *                   request, and not handle it, so the request is not removed
403  *                   from the policy.
404  * \param[in] force  force the policy to return a request; unused in this policy
405  *
406  * \retval the request to be handled
407  * \retval NULL no request available
408  *
409  * \see ptlrpc_nrs_req_get_nolock()
410  * \see nrs_request_get()
411  */
412 static
413 struct ptlrpc_nrs_request *nrs_crrn_req_get(struct ptlrpc_nrs_policy *policy,
414                                             bool peek, bool force)
415 {
416         struct nrs_crrn_net       *net = policy->pol_private;
417         cfs_binheap_node_t        *node = cfs_binheap_root(net->cn_binheap);
418         struct ptlrpc_nrs_request *nrq;
419
420         nrq = unlikely(node == NULL) ? NULL :
421               container_of(node, struct ptlrpc_nrs_request, nr_node);
422
423         if (likely(!peek && nrq != NULL)) {
424                 struct nrs_crrn_client *cli;
425                 struct ptlrpc_request *req = container_of(nrq,
426                                                           struct ptlrpc_request,
427                                                           rq_nrq);
428
429                 cli = container_of(nrs_request_resource(nrq),
430                                    struct nrs_crrn_client, cc_res);
431
432                 LASSERT(nrq->nr_u.crr.cr_round <= cli->cc_round);
433
434                 cfs_binheap_remove(net->cn_binheap, &nrq->nr_node);
435                 cli->cc_active--;
436
437                 CDEBUG(D_RPCTRACE,
438                        "NRS: starting to handle %s request from %s, with round "
439                        LPU64"\n", NRS_POL_NAME_CRRN,
440                        libcfs_id2str(req->rq_peer), nrq->nr_u.crr.cr_round);
441
442                 /** Peek at the next request to be served */
443                 node = cfs_binheap_root(net->cn_binheap);
444
445                 /** No more requests */
446                 if (unlikely(node == NULL)) {
447                         net->cn_round++;
448                 } else {
449                         struct ptlrpc_nrs_request *next;
450
451                         next = container_of(node, struct ptlrpc_nrs_request,
452                                             nr_node);
453
454                         if (net->cn_round < next->nr_u.crr.cr_round)
455                                 net->cn_round = next->nr_u.crr.cr_round;
456                 }
457         }
458
459         return nrq;
460 }
461
462 /**
463  * Adds request \a nrq to a CRR-N \a policy instance's set of queued requests
464  *
465  * A scheduling round is a stream of requests that have been sorted in batches
466  * according to the client that they originate from (as identified by its NID);
467  * there can be only one batch for each client in each round. The batches are of
468  * maximum size nrs_crrn_net:cn_quantum. When a new request arrives for
469  * scheduling from a client that has exhausted its quantum in its current round,
470  * it will start scheduling requests on the next scheduling round. Clients are
471  * allowed to schedule requests against a round until all requests for the round
472  * are serviced, so a client might miss a round if it is not generating requests
473  * for a long enough period of time. Clients that miss a round will continue
474  * with scheduling the next request that they generate, starting at the round
475  * that requests are being dispatched for, at the time of arrival of this new
476  * request.
477  *
478  * Requests are tagged with the round number and a sequence number; the sequence
479  * number indicates the relative ordering amongst the batches of requests in a
480  * round, and is identical for all requests in a batch, as is the round number.
481  * The round and sequence numbers are used by crrn_req_compare() in order to
482  * maintain an ordered set of rounds, with each round consisting of an ordered
483  * set of batches of requests.
484  *
485  * \param[in] policy the policy
486  * \param[in] nrq    the request to add
487  *
488  * \retval 0    request successfully added
489  * \retval != 0 error
490  */
491 static int nrs_crrn_req_add(struct ptlrpc_nrs_policy *policy,
492                             struct ptlrpc_nrs_request *nrq)
493 {
494         struct nrs_crrn_net     *net;
495         struct nrs_crrn_client  *cli;
496         int                      rc;
497
498         cli = container_of(nrs_request_resource(nrq),
499                            struct nrs_crrn_client, cc_res);
500         net = container_of(nrs_request_resource(nrq)->res_parent,
501                            struct nrs_crrn_net, cn_res);
502
503         if (cli->cc_quantum == 0 || cli->cc_round < net->cn_round ||
504             (cli->cc_active == 0 && cli->cc_quantum > 0)) {
505
506                 /**
507                  * If the client has no pending requests, and still some of its
508                  * quantum remaining unused, which implies it has not had a
509                  * chance to schedule up to its maximum allowed batch size of
510                  * requests in the previous round it participated, schedule this
511                  * next request on a new round; this avoids fragmentation of
512                  * request batches caused by client inactivity, at the expense
513                  * of potentially slightly increased service time for the
514                  * request batch this request will be a part of.
515                  */
516                 if (cli->cc_active == 0 && cli->cc_quantum > 0)
517                         cli->cc_round++;
518
519                 /** A new scheduling round has commenced */
520                 if (cli->cc_round < net->cn_round)
521                         cli->cc_round = net->cn_round;
522
523                 /** I was not the last client through here */
524                 if (cli->cc_sequence < net->cn_sequence)
525                         cli->cc_sequence = ++net->cn_sequence;
526                 /**
527                  * Reset the quantum if we have reached the maximum quantum
528                  * size for this batch, or even if we have not managed to
529                  * complete a batch size up to its maximum allowed size.
530                  * XXX: Accessed unlocked
531                  */
532                 cli->cc_quantum = net->cn_quantum;
533         }
534
535         nrq->nr_u.crr.cr_round = cli->cc_round;
536         nrq->nr_u.crr.cr_sequence = cli->cc_sequence;
537
538         rc = cfs_binheap_insert(net->cn_binheap, &nrq->nr_node);
539         if (rc == 0) {
540                 cli->cc_active++;
541                 if (--cli->cc_quantum == 0)
542                         cli->cc_round++;
543         }
544         return rc;
545 }
546
547 /**
548  * Removes request \a nrq from a CRR-N \a policy instance's set of queued
549  * requests.
550  *
551  * \param[in] policy the policy
552  * \param[in] nrq    the request to remove
553  */
554 static void nrs_crrn_req_del(struct ptlrpc_nrs_policy *policy,
555                              struct ptlrpc_nrs_request *nrq)
556 {
557         struct nrs_crrn_net     *net;
558         struct nrs_crrn_client  *cli;
559         bool                     is_root;
560
561         cli = container_of(nrs_request_resource(nrq),
562                            struct nrs_crrn_client, cc_res);
563         net = container_of(nrs_request_resource(nrq)->res_parent,
564                            struct nrs_crrn_net, cn_res);
565
566         LASSERT(nrq->nr_u.crr.cr_round <= cli->cc_round);
567
568         is_root = &nrq->nr_node == cfs_binheap_root(net->cn_binheap);
569
570         cfs_binheap_remove(net->cn_binheap, &nrq->nr_node);
571         cli->cc_active--;
572
573         /**
574          * If we just deleted the node at the root of the binheap, we may have
575          * to adjust round numbers.
576          */
577         if (unlikely(is_root)) {
578                 /** Peek at the next request to be served */
579                 cfs_binheap_node_t *node = cfs_binheap_root(net->cn_binheap);
580
581                 /** No more requests */
582                 if (unlikely(node == NULL)) {
583                         net->cn_round++;
584                 } else {
585                         nrq = container_of(node, struct ptlrpc_nrs_request,
586                                            nr_node);
587
588                         if (net->cn_round < nrq->nr_u.crr.cr_round)
589                                 net->cn_round = nrq->nr_u.crr.cr_round;
590                 }
591         }
592 }
593
594 /**
595  * Called right after the request \a nrq finishes being handled by CRR-N policy
596  * instance \a policy.
597  *
598  * \param[in] policy the policy that handled the request
599  * \param[in] nrq    the request that was handled
600  */
601 static void nrs_crrn_req_stop(struct ptlrpc_nrs_policy *policy,
602                               struct ptlrpc_nrs_request *nrq)
603 {
604         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
605                                                   rq_nrq);
606
607         CDEBUG(D_RPCTRACE,
608                "NRS: finished handling %s request from %s, with round "LPU64
609                "\n", NRS_POL_NAME_CRRN,
610                libcfs_id2str(req->rq_peer), nrq->nr_u.crr.cr_round);
611 }
612
613 #ifdef CONFIG_PROC_FS
614
615 /**
616  * lprocfs interface
617  */
618
619 /**
620  * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
621  * for CRR-N policy instances on both the regular and high-priority NRS head
622  * of a service, as long as a policy instance is not in the
623  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
624  * state are skipped later by nrs_crrn_ctl().
625  *
626  * Quantum values are in # of RPCs, and output is in YAML format.
627  *
628  * For example:
629  *
630  *      reg_quantum:8
631  *      hp_quantum:4
632  */
633 static int
634 ptlrpc_lprocfs_nrs_crrn_quantum_seq_show(struct seq_file *m, void *data)
635 {
636         struct ptlrpc_service   *svc = m->private;
637         __u16                   quantum;
638         int                     rc;
639
640         /**
641          * Perform two separate calls to this as only one of the NRS heads'
642          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
643          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
644          */
645         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
646                                        NRS_POL_NAME_CRRN,
647                                        NRS_CTL_CRRN_RD_QUANTUM,
648                                        true, &quantum);
649         if (rc == 0) {
650                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_REG
651                            "%-5d\n", quantum);
652                 /**
653                  * Ignore -ENODEV as the regular NRS head's policy may be in the
654                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
655                  */
656         } else if (rc != -ENODEV) {
657                 return rc;
658         }
659
660         if (!nrs_svc_has_hp(svc))
661                 goto no_hp;
662
663         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
664                                        NRS_POL_NAME_CRRN,
665                                        NRS_CTL_CRRN_RD_QUANTUM,
666                                        true, &quantum);
667         if (rc == 0) {
668                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
669                 /**
670                  * Ignore -ENODEV as the high priority NRS head's policy may be
671                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
672                  */
673         } else if (rc != -ENODEV) {
674                 return rc;
675         }
676
677 no_hp:
678         return rc;
679 }
680
681 /**
682  * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
683  * for CRR-N policy instances of a service. The user can set the quantum size
684  * for the regular or high priority NRS head individually by specifying each
685  * value, or both together in a single invocation.
686  *
687  * For example:
688  *
689  * lctl set_param *.*.*.nrs_crrn_quantum=reg_quantum:32, to set the regular
690  * request quantum size on all PTLRPC services to 32
691  *
692  * lctl set_param *.*.*.nrs_crrn_quantum=hp_quantum:16, to set the high
693  * priority request quantum size on all PTLRPC services to 16, and
694  *
695  * lctl set_param *.*.ost_io.nrs_crrn_quantum=16, to set both the regular and
696  * high priority request quantum sizes of the ost_io service to 16.
697  *
698  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
699  * are skipped later by nrs_crrn_ctl().
700  */
701 static ssize_t
702 ptlrpc_lprocfs_nrs_crrn_quantum_seq_write(struct file *file,
703                                           const char *buffer, size_t count,
704                                           loff_t *off)
705 {
706         struct ptlrpc_service       *svc = ((struct seq_file *)file->private_data)->private;
707         enum ptlrpc_nrs_queue_type   queue = 0;
708         char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
709         char                        *val;
710         long                         quantum_reg;
711         long                         quantum_hp;
712         /** lprocfs_find_named_value() modifies its argument, so keep a copy */
713         size_t                       count_copy;
714         int                          rc = 0;
715         int                          rc2 = 0;
716
717         if (count > (sizeof(kernbuf) - 1))
718                 return -EINVAL;
719
720         if (copy_from_user(kernbuf, buffer, count))
721                 return -EFAULT;
722
723         kernbuf[count] = '\0';
724
725         count_copy = count;
726
727         /**
728          * Check if the regular quantum value has been specified
729          */
730         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
731                                        &count_copy);
732         if (val != kernbuf) {
733                 quantum_reg = simple_strtol(val, NULL, 10);
734
735                 queue |= PTLRPC_NRS_QUEUE_REG;
736         }
737
738         count_copy = count;
739
740         /**
741          * Check if the high priority quantum value has been specified
742          */
743         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
744                                        &count_copy);
745         if (val != kernbuf) {
746                 if (!nrs_svc_has_hp(svc))
747                         return -ENODEV;
748
749                 quantum_hp = simple_strtol(val, NULL, 10);
750
751                 queue |= PTLRPC_NRS_QUEUE_HP;
752         }
753
754         /**
755          * If none of the queues has been specified, look for a valid numerical
756          * value
757          */
758         if (queue == 0) {
759                 if (!isdigit(kernbuf[0]))
760                         return -EINVAL;
761
762                 quantum_reg = simple_strtol(kernbuf, NULL, 10);
763
764                 queue = PTLRPC_NRS_QUEUE_REG;
765
766                 if (nrs_svc_has_hp(svc)) {
767                         queue |= PTLRPC_NRS_QUEUE_HP;
768                         quantum_hp = quantum_reg;
769                 }
770         }
771
772         if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
773             ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
774             (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
775             ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
776                 return -EINVAL;
777
778         /**
779          * We change the values on regular and HP NRS heads separately, so that
780          * we do not exit early from ptlrpc_nrs_policy_control() with an error
781          * returned by nrs_policy_ctl_locked(), in cases where the user has not
782          * started the policy on either the regular or HP NRS head; i.e. we are
783          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
784          * only if the operation fails with -ENODEV on all heads that have been
785          * specified by the command; if at least one operation succeeds,
786          * success is returned.
787          */
788         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
789                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
790                                                NRS_POL_NAME_CRRN,
791                                                NRS_CTL_CRRN_WR_QUANTUM, false,
792                                                &quantum_reg);
793                 if ((rc < 0 && rc != -ENODEV) ||
794                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
795                         return rc;
796         }
797
798         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
799                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
800                                                 NRS_POL_NAME_CRRN,
801                                                 NRS_CTL_CRRN_WR_QUANTUM, false,
802                                                 &quantum_hp);
803                 if ((rc2 < 0 && rc2 != -ENODEV) ||
804                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
805                         return rc2;
806         }
807
808         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
809 }
810 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_crrn_quantum);
811
812 /**
813  * Initializes a CRR-N policy's lprocfs interface for service \a svc
814  *
815  * \param[in] svc the service
816  *
817  * \retval 0    success
818  * \retval != 0 error
819  */
820 static int nrs_crrn_lprocfs_init(struct ptlrpc_service *svc)
821 {
822         struct lprocfs_vars nrs_crrn_lprocfs_vars[] = {
823                 { .name         = "nrs_crrn_quantum",
824                   .fops         = &ptlrpc_lprocfs_nrs_crrn_quantum_fops,
825                   .data = svc },
826                 { NULL }
827         };
828
829         if (svc->srv_procroot == NULL)
830                 return 0;
831
832         return lprocfs_add_vars(svc->srv_procroot, nrs_crrn_lprocfs_vars, NULL);
833 }
834
835 /**
836  * Cleans up a CRR-N policy's lprocfs interface for service \a svc
837  *
838  * \param[in] svc the service
839  */
840 static void nrs_crrn_lprocfs_fini(struct ptlrpc_service *svc)
841 {
842         if (svc->srv_procroot == NULL)
843                 return;
844
845         lprocfs_remove_proc_entry("nrs_crrn_quantum", svc->srv_procroot);
846 }
847
848 #endif /* CONFIG_PROC_FS */
849
850 /**
851  * CRR-N policy operations
852  */
853 static const struct ptlrpc_nrs_pol_ops nrs_crrn_ops = {
854         .op_policy_start        = nrs_crrn_start,
855         .op_policy_stop         = nrs_crrn_stop,
856         .op_policy_ctl          = nrs_crrn_ctl,
857         .op_res_get             = nrs_crrn_res_get,
858         .op_res_put             = nrs_crrn_res_put,
859         .op_req_get             = nrs_crrn_req_get,
860         .op_req_enqueue         = nrs_crrn_req_add,
861         .op_req_dequeue         = nrs_crrn_req_del,
862         .op_req_stop            = nrs_crrn_req_stop,
863 #ifdef CONFIG_PROC_FS
864         .op_lprocfs_init        = nrs_crrn_lprocfs_init,
865         .op_lprocfs_fini        = nrs_crrn_lprocfs_fini,
866 #endif
867 };
868
869 /**
870  * CRR-N policy configuration
871  */
872 struct ptlrpc_nrs_pol_conf nrs_conf_crrn = {
873         .nc_name                = NRS_POL_NAME_CRRN,
874         .nc_ops                 = &nrs_crrn_ops,
875         .nc_compat              = nrs_policy_compat_all,
876 };
877
878 /** @} CRR-N policy */
879
880 /** @} nrs */
881
882 #endif /* HAVE_SERVER_SUPPORT */