Whamcloud - gitweb
LU-2392 kerberos: GSS keyring is broken >=2.6.29
[fs/lustre-release.git] / lustre / ptlrpc / nrs_crr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2011 Intel Corporation
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs_crr.c
29  *
30  * Network Request Scheduler (NRS) CRR-N policy
31  *
32  * Request ordering in a batched Round-Robin manner over client NIDs
33  *
34  * Author: Liang Zhen <liang@whamcloud.com>
35  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
36  */
37 /**
38  * \addtogoup nrs
39  * @{
40  */
41 #ifdef HAVE_SERVER_SUPPORT
42
43 #define DEBUG_SUBSYSTEM S_RPC
44 #ifndef __KERNEL__
45 #include <liblustre.h>
46 #endif
47 #include <obd_support.h>
48 #include <obd_class.h>
49 #include <lustre_net.h>
50 #include <lprocfs_status.h>
51 #include "ptlrpc_internal.h"
52
53 /**
54  * \name CRR-N policy
55  *
56  * Client Round-Robin scheduling over client NIDs
57  *
58  * @{
59  *
60  */
61
62 #define NRS_POL_NAME_CRRN       "crrn"
63
64 /**
65  * Binary heap predicate.
66  *
67  * Uses ptlrpc_nrs_request::nr_u::crr::cr_round and
68  * ptlrpc_nrs_request::nr_u::crr::cr_sequence to compare two binheap nodes and
69  * produce a binary predicate that shows their relative priority, so that the
70  * binary heap can perform the necessary sorting operations.
71  *
72  * \param[in] e1 the first binheap node to compare
73  * \param[in] e2 the second binheap node to compare
74  *
75  * \retval 0 e1 > e2
76  * \retval 1 e1 <= e2
77  */
78 static int crrn_req_compare(cfs_binheap_node_t *e1, cfs_binheap_node_t *e2)
79 {
80         struct ptlrpc_nrs_request *nrq1;
81         struct ptlrpc_nrs_request *nrq2;
82
83         nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
84         nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
85
86         if (nrq1->nr_u.crr.cr_round < nrq2->nr_u.crr.cr_round)
87                 return 1;
88         else if (nrq1->nr_u.crr.cr_round > nrq2->nr_u.crr.cr_round)
89                 return 0;
90
91         return nrq1->nr_u.crr.cr_sequence < nrq2->nr_u.crr.cr_sequence;
92 }
93
94 static cfs_binheap_ops_t nrs_crrn_heap_ops = {
95         .hop_enter      = NULL,
96         .hop_exit       = NULL,
97         .hop_compare    = crrn_req_compare,
98 };
99
100 /**
101  * libcfs_hash operations for nrs_crrn_net::cn_cli_hash
102  *
103  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
104  * nrs_crrn_client objects.
105  */
106 #define NRS_NID_BKT_BITS        8
107 #define NRS_NID_BITS            16
108
109 static unsigned nrs_crrn_hop_hash(cfs_hash_t *hs, const void *key,
110                                   unsigned mask)
111 {
112         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
113 }
114
115 static int nrs_crrn_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
116 {
117         lnet_nid_t              *nid = (lnet_nid_t *)key;
118         struct nrs_crrn_client  *cli = cfs_hlist_entry(hnode,
119                                                        struct nrs_crrn_client,
120                                                        cc_hnode);
121         return *nid == cli->cc_nid;
122 }
123
124 static void *nrs_crrn_hop_key(cfs_hlist_node_t *hnode)
125 {
126         struct nrs_crrn_client  *cli = cfs_hlist_entry(hnode,
127                                                        struct nrs_crrn_client,
128                                                        cc_hnode);
129         return &cli->cc_nid;
130 }
131
132 static void *nrs_crrn_hop_object(cfs_hlist_node_t *hnode)
133 {
134         return cfs_hlist_entry(hnode, struct nrs_crrn_client, cc_hnode);
135 }
136
137 static void nrs_crrn_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
138 {
139         struct nrs_crrn_client *cli = cfs_hlist_entry(hnode,
140                                                       struct nrs_crrn_client,
141                                                       cc_hnode);
142         cfs_atomic_inc(&cli->cc_ref);
143 }
144
145 static void nrs_crrn_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
146 {
147         struct nrs_crrn_client  *cli = cfs_hlist_entry(hnode,
148                                                        struct nrs_crrn_client,
149                                                        cc_hnode);
150         cfs_atomic_dec(&cli->cc_ref);
151 }
152
153 static void nrs_crrn_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
154 {
155         struct nrs_crrn_client  *cli = cfs_hlist_entry(hnode,
156                                                        struct nrs_crrn_client,
157                                                        cc_hnode);
158         LASSERTF(cfs_atomic_read(&cli->cc_ref) == 0,
159                  "Busy CRR-N object from client with NID %s, with %d refs\n",
160                  libcfs_nid2str(cli->cc_nid), cfs_atomic_read(&cli->cc_ref));
161
162         OBD_FREE_PTR(cli);
163 }
164
165 static cfs_hash_ops_t nrs_crrn_hash_ops = {
166         .hs_hash        = nrs_crrn_hop_hash,
167         .hs_keycmp      = nrs_crrn_hop_keycmp,
168         .hs_key         = nrs_crrn_hop_key,
169         .hs_object      = nrs_crrn_hop_object,
170         .hs_get         = nrs_crrn_hop_get,
171         .hs_put         = nrs_crrn_hop_put,
172         .hs_put_locked  = nrs_crrn_hop_put,
173         .hs_exit        = nrs_crrn_hop_exit,
174 };
175
176 /**
177  * Called when a CRR-N policy instance is started.
178  *
179  * \param[in] policy the policy
180  *
181  * \retval -ENOMEM OOM error
182  * \retval 0       success
183  */
184 static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy)
185 {
186         struct nrs_crrn_net    *net;
187         int                     rc = 0;
188         ENTRY;
189
190         OBD_CPT_ALLOC_PTR(net, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
191         if (net == NULL)
192                 RETURN(-ENOMEM);
193
194         net->cn_binheap = cfs_binheap_create(&nrs_crrn_heap_ops,
195                                              CBH_FLAG_ATOMIC_GROW, 4096, NULL,
196                                              nrs_pol2cptab(policy),
197                                              nrs_pol2cptid(policy));
198         if (net->cn_binheap == NULL)
199                 GOTO(failed, rc = -ENOMEM);
200
201         net->cn_cli_hash = cfs_hash_create("nrs_crrn_nid_hash",
202                                            NRS_NID_BITS, NRS_NID_BITS,
203                                            NRS_NID_BKT_BITS, 0,
204                                            CFS_HASH_MIN_THETA,
205                                            CFS_HASH_MAX_THETA,
206                                            &nrs_crrn_hash_ops,
207                                            CFS_HASH_RW_BKTLOCK);
208         if (net->cn_cli_hash == NULL)
209                 GOTO(failed, rc = -ENOMEM);
210
211         /**
212          * Set default quantum value to max_rpcs_in_flight for non-MDS OSCs;
213          * there may be more RPCs pending from each struct nrs_crrn_client even
214          * with the default max_rpcs_in_flight value, as we are scheduling over
215          * NIDs, and there may be more than one mount point per client.
216          */
217         net->cn_quantum = OSC_MAX_RIF_DEFAULT;
218         /**
219          * Set to 1 so that the test inside nrs_crrn_req_add() can evaluate to
220          * true.
221          */
222         net->cn_sequence = 1;
223
224         policy->pol_private = net;
225
226         RETURN(rc);
227
228 failed:
229         if (net->cn_binheap != NULL)
230                 cfs_binheap_destroy(net->cn_binheap);
231
232         OBD_FREE_PTR(net);
233
234         RETURN(rc);
235 }
236
237 /**
238  * Called when a CRR-N policy instance is stopped.
239  *
240  * Called when the policy has been instructed to transition to the
241  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more pending
242  * requests to serve.
243  *
244  * \param[in] policy the policy
245  */
246 static void nrs_crrn_stop(struct ptlrpc_nrs_policy *policy)
247 {
248         struct nrs_crrn_net     *net = policy->pol_private;
249         ENTRY;
250
251         LASSERT(net != NULL);
252         LASSERT(net->cn_binheap != NULL);
253         LASSERT(net->cn_cli_hash != NULL);
254         LASSERT(cfs_binheap_is_empty(net->cn_binheap));
255
256         cfs_binheap_destroy(net->cn_binheap);
257         cfs_hash_putref(net->cn_cli_hash);
258
259         OBD_FREE_PTR(net);
260 }
261
262 /**
263  * Performs a policy-specific ctl function on CRR-N policy instances; similar
264  * to ioctl.
265  *
266  * \param[in]     policy the policy instance
267  * \param[in]     opc    the opcode
268  * \param[in,out] arg    used for passing parameters and information
269  *
270  * \pre spin_is_locked(&policy->pol_nrs->->nrs_lock)
271  * \post spin_is_locked(&policy->pol_nrs->->nrs_lock)
272  *
273  * \retval 0   operation carried out successfully
274  * \retval -ve error
275  */
276 int nrs_crrn_ctl(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc,
277                  void *arg)
278 {
279         LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock));
280
281         switch(opc) {
282         default:
283                 RETURN(-EINVAL);
284
285         /**
286          * Read Round Robin quantum size of a policy instance.
287          */
288         case NRS_CTL_CRRN_RD_QUANTUM: {
289                 struct nrs_crrn_net     *net = policy->pol_private;
290
291                 *(__u16 *)arg = net->cn_quantum;
292                 }
293                 break;
294
295         /**
296          * Write Round Robin quantum size of a policy instance.
297          */
298         case NRS_CTL_CRRN_WR_QUANTUM: {
299                 struct nrs_crrn_net     *net = policy->pol_private;
300
301                 net->cn_quantum = *(__u16 *)arg;
302                 LASSERT(net->cn_quantum != 0);
303                 }
304                 break;
305         }
306
307         RETURN(0);
308 }
309
310 /**
311  * Obtains resources from CRR-N policy instances. The top-level resource lives
312  * inside \e nrs_crrn_net and the second-level resource inside
313  * \e nrs_crrn_client object instances.
314  *
315  * \param[in]  policy     the policy for which resources are being taken for
316  *                        request \a nrq
317  * \param[in]  nrq        the request for which resources are being taken
318  * \param[in]  parent     parent resource, embedded in nrs_crrn_net for the
319  *                        CRR-N policy
320  * \param[out] resp       resources references are placed in this array
321  * \param[in]  moving_req signifies limited caller context; used to perform
322  *                        memory allocations in an atomic context in this
323  *                        policy
324  *
325  * \retval 0   we are returning a top-level, parent resource, one that is
326  *             embedded in an nrs_crrn_net object
327  * \retval 1   we are returning a bottom-level resource, one that is embedded
328  *             in an nrs_crrn_client object
329  *
330  * \see nrs_resource_get_safe()
331  */
332 int nrs_crrn_res_get(struct ptlrpc_nrs_policy *policy,
333                      struct ptlrpc_nrs_request *nrq,
334                      const struct ptlrpc_nrs_resource *parent,
335                      struct ptlrpc_nrs_resource **resp, bool moving_req)
336 {
337         struct nrs_crrn_net     *net;
338         struct nrs_crrn_client  *cli;
339         struct nrs_crrn_client  *tmp;
340         struct ptlrpc_request   *req;
341
342         if (parent == NULL) {
343                 *resp = &((struct nrs_crrn_net *)policy->pol_private)->cn_res;
344                 return 0;
345         }
346
347         net = container_of(parent, struct nrs_crrn_net, cn_res);
348         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
349
350         cli = cfs_hash_lookup(net->cn_cli_hash, &req->rq_peer.nid);
351         if (cli != NULL)
352                 goto out;
353
354         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
355                           sizeof(*cli), moving_req ? CFS_ALLOC_ATOMIC :
356                           CFS_ALLOC_IO);
357         if (cli == NULL)
358                 return -ENOMEM;
359
360         cli->cc_nid = req->rq_peer.nid;
361
362         cfs_atomic_set(&cli->cc_ref, 1);
363         tmp = cfs_hash_findadd_unique(net->cn_cli_hash, &cli->cc_nid,
364                                       &cli->cc_hnode);
365         if (tmp != cli) {
366                 OBD_FREE_PTR(cli);
367                 cli = tmp;
368         }
369 out:
370         *resp = &cli->cc_res;
371
372         return 1;
373 }
374
375 /**
376  * Called when releasing references to the resource hierachy obtained for a
377  * request for scheduling using the CRR-N policy.
378  *
379  * \param[in] policy   the policy the resource belongs to
380  * \param[in] res      the resource to be released
381  */
382 static void nrs_crrn_res_put(struct ptlrpc_nrs_policy *policy,
383                              const struct ptlrpc_nrs_resource *res)
384 {
385         struct nrs_crrn_net     *net;
386         struct nrs_crrn_client  *cli;
387
388         /**
389          * Do nothing for freeing parent, nrs_crrn_net resources
390          */
391         if (res->res_parent == NULL)
392                 return;
393
394         cli = container_of(res, struct nrs_crrn_client, cc_res);
395         net = container_of(res->res_parent, struct nrs_crrn_net, cn_res);
396
397         cfs_hash_put(net->cn_cli_hash, &cli->cc_hnode);
398 }
399
400 /**
401  * Called when getting a request from the CRR-N policy for handlingso that it can be served
402  *
403  * \param[in] policy the policy being polled
404  * \param[in] peek   when set, signifies that we just want to examine the
405  *                   request, and not handle it, so the request is not removed
406  *                   from the policy.
407  * \param[in] force  force the policy to return a request; unused in this policy
408  *
409  * \retval the request to be handled
410  * \retval NULL no request available
411  *
412  * \see ptlrpc_nrs_req_get_nolock()
413  * \see nrs_request_get()
414  */
415 static
416 struct ptlrpc_nrs_request *nrs_crrn_req_get(struct ptlrpc_nrs_policy *policy,
417                                             bool peek, bool force)
418 {
419         struct nrs_crrn_net       *net = policy->pol_private;
420         cfs_binheap_node_t        *node = cfs_binheap_root(net->cn_binheap);
421         struct ptlrpc_nrs_request *nrq;
422
423         nrq = unlikely(node == NULL) ? NULL :
424               container_of(node, struct ptlrpc_nrs_request, nr_node);
425
426         if (likely(!peek && nrq != NULL)) {
427                 struct nrs_crrn_client *cli;
428                 struct ptlrpc_request *req = container_of(nrq,
429                                                           struct ptlrpc_request,
430                                                           rq_nrq);
431
432                 cli = container_of(nrs_request_resource(nrq),
433                                    struct nrs_crrn_client, cc_res);
434
435                 LASSERT(nrq->nr_u.crr.cr_round <= cli->cc_round);
436
437                 cfs_binheap_remove(net->cn_binheap, &nrq->nr_node);
438                 cli->cc_active--;
439
440                 CDEBUG(D_RPCTRACE,
441                        "NRS: starting to handle %s request from %s, with round "
442                        LPU64"\n", NRS_POL_NAME_CRRN,
443                        libcfs_id2str(req->rq_peer), nrq->nr_u.crr.cr_round);
444
445                 /** Peek at the next request to be served */
446                 node = cfs_binheap_root(net->cn_binheap);
447
448                 /** No more requests */
449                 if (unlikely(node == NULL)) {
450                         net->cn_round++;
451                 } else {
452                         struct ptlrpc_nrs_request *next;
453
454                         next = container_of(node, struct ptlrpc_nrs_request,
455                                             nr_node);
456
457                         if (net->cn_round < next->nr_u.crr.cr_round)
458                                 net->cn_round = next->nr_u.crr.cr_round;
459                 }
460         }
461
462         return nrq;
463 }
464
465 /**
466  * Adds request \a nrq to a CRR-N \a policy instance's set of queued requests
467  *
468  * A scheduling round is a stream of requests that have been sorted in batches
469  * according to the client that they originate from (as identified by its NID);
470  * there can be only one batch for each client in each round. The batches are of
471  * maximum size nrs_crrn_net:cn_quantum. When a new request arrives for
472  * scheduling from a client that has exhausted its quantum in its current round,
473  * it will start scheduling requests on the next scheduling round. Clients are
474  * allowed to schedule requests against a round until all requests for the round
475  * are serviced, so a client might miss a round if it is not generating requests
476  * for a long enough period of time. Clients that miss a round will continue
477  * with scheduling the next request that they generate, starting at the round
478  * that requests are being dispatched for, at the time of arrival of this new
479  * request.
480  *
481  * Requests are tagged with the round number and a sequence number; the sequence
482  * number indicates the relative ordering amongst the batches of requests in a
483  * round, and is identical for all requests in a batch, as is the round number.
484  * The round and sequence numbers are used by crrn_req_compare() in order to
485  * maintain an ordered set of rounds, with each round consisting of an ordered
486  * set of batches of requests.
487  *
488  * \param[in] policy the policy
489  * \param[in] nrq    the request to add
490  *
491  * \retval 0    request successfully added
492  * \retval != 0 error
493  */
494 static int nrs_crrn_req_add(struct ptlrpc_nrs_policy *policy,
495                             struct ptlrpc_nrs_request *nrq)
496 {
497         struct nrs_crrn_net     *net;
498         struct nrs_crrn_client  *cli;
499         int                      rc;
500
501         cli = container_of(nrs_request_resource(nrq),
502                            struct nrs_crrn_client, cc_res);
503         net = container_of(nrs_request_resource(nrq)->res_parent,
504                            struct nrs_crrn_net, cn_res);
505
506         if (cli->cc_quantum == 0 || cli->cc_round < net->cn_round ||
507             (cli->cc_active == 0 && cli->cc_quantum > 0)) {
508
509                 /**
510                  * If the client has no pending requests, and still some of its
511                  * quantum remaining unused, which implies it has not had a
512                  * chance to schedule up to its maximum allowed batch size of
513                  * requests in the previous round it participated, schedule this
514                  * next request on a new round; this avoids fragmentation of
515                  * request batches caused by client inactivity, at the expense
516                  * of potentially slightly increased service time for the
517                  * request batch this request will be a part of.
518                  */
519                 if (cli->cc_active == 0 && cli->cc_quantum > 0)
520                         cli->cc_round++;
521
522                 /** A new scheduling round has commenced */
523                 if (cli->cc_round < net->cn_round)
524                         cli->cc_round = net->cn_round;
525
526                 /** I was not the last client through here */
527                 if (cli->cc_sequence < net->cn_sequence)
528                         cli->cc_sequence = ++net->cn_sequence;
529                 /**
530                  * Reset the quantum if we have reached the maximum quantum
531                  * size for this batch, or even if we have not managed to
532                  * complete a batch size up to its maximum allowed size.
533                  * XXX: Accessed unlocked
534                  */
535                 cli->cc_quantum = net->cn_quantum;
536         }
537
538         nrq->nr_u.crr.cr_round = cli->cc_round;
539         nrq->nr_u.crr.cr_sequence = cli->cc_sequence;
540
541         rc = cfs_binheap_insert(net->cn_binheap, &nrq->nr_node);
542         if (rc == 0) {
543                 cli->cc_active++;
544                 if (--cli->cc_quantum == 0)
545                         cli->cc_round++;
546         }
547         return rc;
548 }
549
550 /**
551  * Removes request \a nrq from a CRR-N \a policy instance's set of queued
552  * requests.
553  *
554  * \param[in] policy the policy
555  * \param[in] nrq    the request to remove
556  */
557 static void nrs_crrn_req_del(struct ptlrpc_nrs_policy *policy,
558                              struct ptlrpc_nrs_request *nrq)
559 {
560         struct nrs_crrn_net     *net;
561         struct nrs_crrn_client  *cli;
562         bool                     is_root;
563
564         cli = container_of(nrs_request_resource(nrq),
565                            struct nrs_crrn_client, cc_res);
566         net = container_of(nrs_request_resource(nrq)->res_parent,
567                            struct nrs_crrn_net, cn_res);
568
569         LASSERT(nrq->nr_u.crr.cr_round <= cli->cc_round);
570
571         is_root = &nrq->nr_node == cfs_binheap_root(net->cn_binheap);
572
573         cfs_binheap_remove(net->cn_binheap, &nrq->nr_node);
574         cli->cc_active--;
575
576         /**
577          * If we just deleted the node at the root of the binheap, we may have
578          * to adjust round numbers.
579          */
580         if (unlikely(is_root)) {
581                 /** Peek at the next request to be served */
582                 cfs_binheap_node_t *node = cfs_binheap_root(net->cn_binheap);
583
584                 /** No more requests */
585                 if (unlikely(node == NULL)) {
586                         net->cn_round++;
587                 } else {
588                         nrq = container_of(node, struct ptlrpc_nrs_request,
589                                            nr_node);
590
591                         if (net->cn_round < nrq->nr_u.crr.cr_round)
592                                 net->cn_round = nrq->nr_u.crr.cr_round;
593                 }
594         }
595 }
596
597 /**
598  * Called right after the request \a nrq finishes being handled by CRR-N policy
599  * instance \a policy.
600  *
601  * \param[in] policy the policy that handled the request
602  * \param[in] nrq    the request that was handled
603  */
604 static void nrs_crrn_req_stop(struct ptlrpc_nrs_policy *policy,
605                               struct ptlrpc_nrs_request *nrq)
606 {
607         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
608                                                   rq_nrq);
609
610         CDEBUG(D_RPCTRACE,
611                "NRS: finished handling %s request from %s, with round "LPU64
612                "\n", NRS_POL_NAME_CRRN,
613                libcfs_id2str(req->rq_peer), nrq->nr_u.crr.cr_round);
614 }
615
616 #ifdef LPROCFS
617
618 /**
619  * lprocfs interface
620  */
621
622 /**
623  * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
624  * for CRR-N policy instances on both the regular and high-priority NRS head
625  * of a service, as long as a policy instance is not in the
626  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
627  * state are skipped later by nrs_crrn_ctl().
628  *
629  * Quantum values are in # of RPCs, and output is in YAML format.
630  *
631  * For example:
632  *
633  *      reg_quantum:8
634  *      hp_quantum:4
635  */
636 static int ptlrpc_lprocfs_rd_nrs_crrn_quantum(char *page, char **start,
637                                               off_t off, int count, int *eof,
638                                               void *data)
639 {
640         struct ptlrpc_service       *svc = data;
641         __u16                        quantum;
642         int                          rc;
643         int                          rc2 = 0;
644
645         /**
646          * Perform two separate calls to this as only one of the NRS heads'
647          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
648          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
649          */
650         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
651                                        NRS_POL_NAME_CRRN,
652                                        NRS_CTL_CRRN_RD_QUANTUM,
653                                        true, &quantum);
654         if (rc == 0) {
655                 *eof = 1;
656                 rc2 = snprintf(page, count, NRS_LPROCFS_QUANTUM_NAME_REG
657                                "%-5d\n", quantum);
658                 /**
659                  * Ignore -ENODEV as the regular NRS head's policy may be in the
660                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
661                  */
662         } else if (rc != -ENODEV) {
663                 return rc;
664         }
665
666         if (!nrs_svc_has_hp(svc))
667                 goto no_hp;
668
669         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
670                                        NRS_POL_NAME_CRRN,
671                                        NRS_CTL_CRRN_RD_QUANTUM,
672                                        true, &quantum);
673         if (rc == 0) {
674                 *eof = 1;
675                 rc2 += snprintf(page + rc2, count - rc2,
676                                 NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
677                 /**
678                  * Ignore -ENODEV as the high priority NRS head's policy may be
679                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
680                  */
681         } else if (rc != -ENODEV) {
682                 return rc;
683         }
684
685 no_hp:
686
687         return rc2 ? : rc;
688 }
689
690 /**
691  * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
692  * for CRR-N policy instances of a service. The user can set the quantum size
693  * for the regular or high priority NRS head individually by specifying each
694  * value, or both together in a single invocation.
695  *
696  * For example:
697  *
698  * lctl set_param *.*.*.nrs_crrn_quantum=reg_quantum:32, to set the regular
699  * request quantum size on all PTLRPC services to 32
700  *
701  * lctl set_param *.*.*.nrs_crrn_quantum=hp_quantum:16, to set the high
702  * priority request quantum size on all PTLRPC services to 16, and
703  *
704  * lctl set_param *.*.ost_io.nrs_crrn_quantum=16, to set both the regular and
705  * high priority request quantum sizes of the ost_io service to 16.
706  *
707  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
708  * are skipped later by nrs_crrn_ctl().
709  */
710 static int ptlrpc_lprocfs_wr_nrs_crrn_quantum(struct file *file,
711                                               const char *buffer,
712                                               unsigned long count, void *data)
713 {
714         struct ptlrpc_service       *svc = data;
715         enum ptlrpc_nrs_queue_type   queue = 0;
716         char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
717         char                        *val;
718         long                         quantum_reg;
719         long                         quantum_hp;
720         /** lprocfs_find_named_value() modifies its argument, so keep a copy */
721         unsigned long                count_copy;
722         int                          rc = 0;
723         int                          rc2 = 0;
724
725         if (count > (sizeof(kernbuf) - 1))
726                 return -EINVAL;
727
728         if (cfs_copy_from_user(kernbuf, buffer, count))
729                 return -EFAULT;
730
731         kernbuf[count] = '\0';
732
733         count_copy = count;
734
735         /**
736          * Check if the regular quantum value has been specified
737          */
738         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
739                                        &count_copy);
740         if (val != kernbuf) {
741                 quantum_reg = simple_strtol(val, NULL, 10);
742
743                 queue |= PTLRPC_NRS_QUEUE_REG;
744         }
745
746         count_copy = count;
747
748         /**
749          * Check if the high priority quantum value has been specified
750          */
751         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
752                                        &count_copy);
753         if (val != kernbuf) {
754                 if (!nrs_svc_has_hp(svc))
755                         return -ENODEV;
756
757                 quantum_hp = simple_strtol(val, NULL, 10);
758
759                 queue |= PTLRPC_NRS_QUEUE_HP;
760         }
761
762         /**
763          * If none of the queues has been specified, look for a valid numerical
764          * value
765          */
766         if (queue == 0) {
767                 if (!isdigit(kernbuf[0]))
768                         return -EINVAL;
769
770                 quantum_reg = simple_strtol(kernbuf, NULL, 10);
771
772                 queue = PTLRPC_NRS_QUEUE_REG;
773
774                 if (nrs_svc_has_hp(svc)) {
775                         queue |= PTLRPC_NRS_QUEUE_HP;
776                         quantum_hp = quantum_reg;
777                 }
778         }
779
780         if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
781             ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
782             (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
783             ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
784                 return -EINVAL;
785
786         /**
787          * We change the values on regular and HP NRS heads separately, so that
788          * we do not exit early from ptlrpc_nrs_policy_control() with an error
789          * returned by nrs_policy_ctl_locked(), in cases where the user has not
790          * started the policy on either the regular or HP NRS head; i.e. we are
791          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
792          * only if the operation fails with -ENODEV on all heads that have been
793          * specified by the command; if at least one operation succeeds,
794          * success is returned.
795          */
796         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
797                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
798                                                NRS_POL_NAME_CRRN,
799                                                NRS_CTL_CRRN_WR_QUANTUM, false,
800                                                &quantum_reg);
801                 if ((rc < 0 && rc != -ENODEV) ||
802                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
803                         return rc;
804         }
805
806         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
807                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
808                                                 NRS_POL_NAME_CRRN,
809                                                 NRS_CTL_CRRN_WR_QUANTUM, false,
810                                                 &quantum_hp);
811                 if ((rc2 < 0 && rc2 != -ENODEV) ||
812                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
813                         return rc2;
814         }
815
816         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
817 }
818
819 /**
820  * Initializes a CRR-N policy's lprocfs interface for service \a svc
821  *
822  * \param[in] svc the service
823  *
824  * \retval 0    success
825  * \retval != 0 error
826  */
827 int nrs_crrn_lprocfs_init(struct ptlrpc_service *svc)
828 {
829         int     rc;
830
831         struct lprocfs_vars nrs_crrn_lprocfs_vars[] = {
832                 { .name         = "nrs_crrn_quantum",
833                   .read_fptr    = ptlrpc_lprocfs_rd_nrs_crrn_quantum,
834                   .write_fptr   = ptlrpc_lprocfs_wr_nrs_crrn_quantum,
835                   .data = svc },
836                 { NULL }
837         };
838
839         if (svc->srv_procroot == NULL)
840                 return 0;
841
842         rc = lprocfs_add_vars(svc->srv_procroot, nrs_crrn_lprocfs_vars, NULL);
843
844         return rc;
845 }
846
847 /**
848  * Cleans up a CRR-N policy's lprocfs interface for service \a svc
849  *
850  * \param[in] svc the service
851  */
852 void nrs_crrn_lprocfs_fini(struct ptlrpc_service *svc)
853 {
854         if (svc->srv_procroot == NULL)
855                 return;
856
857         lprocfs_remove_proc_entry("nrs_crrn_quantum", svc->srv_procroot);
858 }
859
860 #endif /* LPROCFS */
861
862 /**
863  * CRR-N policy operations
864  */
865 static const struct ptlrpc_nrs_pol_ops nrs_crrn_ops = {
866         .op_policy_start        = nrs_crrn_start,
867         .op_policy_stop         = nrs_crrn_stop,
868         .op_policy_ctl          = nrs_crrn_ctl,
869         .op_res_get             = nrs_crrn_res_get,
870         .op_res_put             = nrs_crrn_res_put,
871         .op_req_get             = nrs_crrn_req_get,
872         .op_req_enqueue         = nrs_crrn_req_add,
873         .op_req_dequeue         = nrs_crrn_req_del,
874         .op_req_stop            = nrs_crrn_req_stop,
875 #ifdef LPROCFS
876         .op_lprocfs_init        = nrs_crrn_lprocfs_init,
877         .op_lprocfs_fini        = nrs_crrn_lprocfs_fini,
878 #endif
879 };
880
881 /**
882  * CRR-N policy configuration
883  */
884 struct ptlrpc_nrs_pol_conf nrs_conf_crrn = {
885         .nc_name                = NRS_POL_NAME_CRRN,
886         .nc_ops                 = &nrs_crrn_ops,
887         .nc_compat              = nrs_policy_compat_all,
888 };
889
890 /** @} CRR-N policy */
891
892 /** @} nrs */
893
894 #endif /* HAVE_SERVER_SUPPORT */