Whamcloud - gitweb
LU-8901 misc: update Intel copyright messages for 2016
[fs/lustre-release.git] / lustre / ptlrpc / nrs_crr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2016, Intel Corporation.
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs_crr.c
29  *
30  * Network Request Scheduler (NRS) CRR-N policy
31  *
32  * Request ordering in a batched Round-Robin manner over client NIDs
33  *
34  * Author: Liang Zhen <liang@whamcloud.com>
35  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
36  */
37 /**
38  * \addtogoup nrs
39  * @{
40  */
41 #ifdef HAVE_SERVER_SUPPORT
42
43 #define DEBUG_SUBSYSTEM S_RPC
44 #include <obd_support.h>
45 #include <obd_class.h>
46 #include <lustre_net.h>
47 #include <lprocfs_status.h>
48 #include "ptlrpc_internal.h"
49
50 /**
51  * \name CRR-N policy
52  *
53  * Client Round-Robin scheduling over client NIDs
54  *
55  * @{
56  *
57  */
58
59 #define NRS_POL_NAME_CRRN       "crrn"
60
61 /**
62  * Binary heap predicate.
63  *
64  * Uses ptlrpc_nrs_request::nr_u::crr::cr_round and
65  * ptlrpc_nrs_request::nr_u::crr::cr_sequence to compare two binheap nodes and
66  * produce a binary predicate that shows their relative priority, so that the
67  * binary heap can perform the necessary sorting operations.
68  *
69  * \param[in] e1 the first binheap node to compare
70  * \param[in] e2 the second binheap node to compare
71  *
72  * \retval 0 e1 > e2
73  * \retval 1 e1 <= e2
74  */
75 static int
76 crrn_req_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
77 {
78         struct ptlrpc_nrs_request *nrq1;
79         struct ptlrpc_nrs_request *nrq2;
80
81         nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
82         nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
83
84         if (nrq1->nr_u.crr.cr_round < nrq2->nr_u.crr.cr_round)
85                 return 1;
86         else if (nrq1->nr_u.crr.cr_round > nrq2->nr_u.crr.cr_round)
87                 return 0;
88
89         return nrq1->nr_u.crr.cr_sequence < nrq2->nr_u.crr.cr_sequence;
90 }
91
92 static struct cfs_binheap_ops nrs_crrn_heap_ops = {
93         .hop_enter      = NULL,
94         .hop_exit       = NULL,
95         .hop_compare    = crrn_req_compare,
96 };
97
98 /**
99  * libcfs_hash operations for nrs_crrn_net::cn_cli_hash
100  *
101  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
102  * nrs_crrn_client objects.
103  */
104 #define NRS_NID_BKT_BITS        8
105 #define NRS_NID_BITS            16
106
107 static unsigned nrs_crrn_hop_hash(struct cfs_hash *hs, const void *key,
108                                   unsigned mask)
109 {
110         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
111 }
112
113 static int nrs_crrn_hop_keycmp(const void *key, struct hlist_node *hnode)
114 {
115         lnet_nid_t              *nid = (lnet_nid_t *)key;
116         struct nrs_crrn_client  *cli = hlist_entry(hnode,
117                                                        struct nrs_crrn_client,
118                                                        cc_hnode);
119         return *nid == cli->cc_nid;
120 }
121
122 static void *nrs_crrn_hop_key(struct hlist_node *hnode)
123 {
124         struct nrs_crrn_client  *cli = hlist_entry(hnode,
125                                                        struct nrs_crrn_client,
126                                                        cc_hnode);
127         return &cli->cc_nid;
128 }
129
130 static void *nrs_crrn_hop_object(struct hlist_node *hnode)
131 {
132         return hlist_entry(hnode, struct nrs_crrn_client, cc_hnode);
133 }
134
135 static void nrs_crrn_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
136 {
137         struct nrs_crrn_client *cli = hlist_entry(hnode,
138                                                       struct nrs_crrn_client,
139                                                       cc_hnode);
140         atomic_inc(&cli->cc_ref);
141 }
142
143 static void nrs_crrn_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
144 {
145         struct nrs_crrn_client  *cli = hlist_entry(hnode,
146                                                        struct nrs_crrn_client,
147                                                        cc_hnode);
148         atomic_dec(&cli->cc_ref);
149 }
150
151 static void nrs_crrn_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
152 {
153         struct nrs_crrn_client  *cli = hlist_entry(hnode,
154                                                        struct nrs_crrn_client,
155                                                        cc_hnode);
156         LASSERTF(atomic_read(&cli->cc_ref) == 0,
157                  "Busy CRR-N object from client with NID %s, with %d refs\n",
158                  libcfs_nid2str(cli->cc_nid), atomic_read(&cli->cc_ref));
159
160         OBD_FREE_PTR(cli);
161 }
162
163 static struct cfs_hash_ops nrs_crrn_hash_ops = {
164         .hs_hash        = nrs_crrn_hop_hash,
165         .hs_keycmp      = nrs_crrn_hop_keycmp,
166         .hs_key         = nrs_crrn_hop_key,
167         .hs_object      = nrs_crrn_hop_object,
168         .hs_get         = nrs_crrn_hop_get,
169         .hs_put         = nrs_crrn_hop_put,
170         .hs_put_locked  = nrs_crrn_hop_put,
171         .hs_exit        = nrs_crrn_hop_exit,
172 };
173
174 /**
175  * Called when a CRR-N policy instance is started.
176  *
177  * \param[in] policy the policy
178  *
179  * \retval -ENOMEM OOM error
180  * \retval 0       success
181  */
182 static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy, char *arg)
183 {
184         struct nrs_crrn_net    *net;
185         int                     rc = 0;
186         ENTRY;
187
188         OBD_CPT_ALLOC_PTR(net, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
189         if (net == NULL)
190                 RETURN(-ENOMEM);
191
192         net->cn_binheap = cfs_binheap_create(&nrs_crrn_heap_ops,
193                                              CBH_FLAG_ATOMIC_GROW, 4096, NULL,
194                                              nrs_pol2cptab(policy),
195                                              nrs_pol2cptid(policy));
196         if (net->cn_binheap == NULL)
197                 GOTO(failed, rc = -ENOMEM);
198
199         net->cn_cli_hash = cfs_hash_create("nrs_crrn_nid_hash",
200                                            NRS_NID_BITS, NRS_NID_BITS,
201                                            NRS_NID_BKT_BITS, 0,
202                                            CFS_HASH_MIN_THETA,
203                                            CFS_HASH_MAX_THETA,
204                                            &nrs_crrn_hash_ops,
205                                            CFS_HASH_RW_BKTLOCK);
206         if (net->cn_cli_hash == NULL)
207                 GOTO(failed, rc = -ENOMEM);
208
209         /**
210          * Set default quantum value to max_rpcs_in_flight for non-MDS OSCs;
211          * there may be more RPCs pending from each struct nrs_crrn_client even
212          * with the default max_rpcs_in_flight value, as we are scheduling over
213          * NIDs, and there may be more than one mount point per client.
214          */
215         net->cn_quantum = OBD_MAX_RIF_DEFAULT;
216         /**
217          * Set to 1 so that the test inside nrs_crrn_req_add() can evaluate to
218          * true.
219          */
220         net->cn_sequence = 1;
221
222         policy->pol_private = net;
223
224         RETURN(rc);
225
226 failed:
227         if (net->cn_binheap != NULL)
228                 cfs_binheap_destroy(net->cn_binheap);
229
230         OBD_FREE_PTR(net);
231
232         RETURN(rc);
233 }
234
235 /**
236  * Called when a CRR-N policy instance is stopped.
237  *
238  * Called when the policy has been instructed to transition to the
239  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more pending
240  * requests to serve.
241  *
242  * \param[in] policy the policy
243  */
244 static void nrs_crrn_stop(struct ptlrpc_nrs_policy *policy)
245 {
246         struct nrs_crrn_net     *net = policy->pol_private;
247         ENTRY;
248
249         LASSERT(net != NULL);
250         LASSERT(net->cn_binheap != NULL);
251         LASSERT(net->cn_cli_hash != NULL);
252         LASSERT(cfs_binheap_is_empty(net->cn_binheap));
253
254         cfs_binheap_destroy(net->cn_binheap);
255         cfs_hash_putref(net->cn_cli_hash);
256
257         OBD_FREE_PTR(net);
258 }
259
260 /**
261  * Performs a policy-specific ctl function on CRR-N policy instances; similar
262  * to ioctl.
263  *
264  * \param[in]     policy the policy instance
265  * \param[in]     opc    the opcode
266  * \param[in,out] arg    used for passing parameters and information
267  *
268  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
269  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
270  *
271  * \retval 0   operation carried out successfully
272  * \retval -ve error
273  */
274 static int nrs_crrn_ctl(struct ptlrpc_nrs_policy *policy,
275                         enum ptlrpc_nrs_ctl opc,
276                         void *arg)
277 {
278         assert_spin_locked(&policy->pol_nrs->nrs_lock);
279
280         switch((enum nrs_ctl_crr)opc) {
281         default:
282                 RETURN(-EINVAL);
283
284         /**
285          * Read Round Robin quantum size of a policy instance.
286          */
287         case NRS_CTL_CRRN_RD_QUANTUM: {
288                 struct nrs_crrn_net     *net = policy->pol_private;
289
290                 *(__u16 *)arg = net->cn_quantum;
291                 }
292                 break;
293
294         /**
295          * Write Round Robin quantum size of a policy instance.
296          */
297         case NRS_CTL_CRRN_WR_QUANTUM: {
298                 struct nrs_crrn_net     *net = policy->pol_private;
299
300                 net->cn_quantum = *(__u16 *)arg;
301                 LASSERT(net->cn_quantum != 0);
302                 }
303                 break;
304         }
305
306         RETURN(0);
307 }
308
309 /**
310  * Obtains resources from CRR-N policy instances. The top-level resource lives
311  * inside \e nrs_crrn_net and the second-level resource inside
312  * \e nrs_crrn_client object instances.
313  *
314  * \param[in]  policy     the policy for which resources are being taken for
315  *                        request \a nrq
316  * \param[in]  nrq        the request for which resources are being taken
317  * \param[in]  parent     parent resource, embedded in nrs_crrn_net for the
318  *                        CRR-N policy
319  * \param[out] resp       resources references are placed in this array
320  * \param[in]  moving_req signifies limited caller context; used to perform
321  *                        memory allocations in an atomic context in this
322  *                        policy
323  *
324  * \retval 0   we are returning a top-level, parent resource, one that is
325  *             embedded in an nrs_crrn_net object
326  * \retval 1   we are returning a bottom-level resource, one that is embedded
327  *             in an nrs_crrn_client object
328  *
329  * \see nrs_resource_get_safe()
330  */
331 static int nrs_crrn_res_get(struct ptlrpc_nrs_policy *policy,
332                             struct ptlrpc_nrs_request *nrq,
333                             const struct ptlrpc_nrs_resource *parent,
334                             struct ptlrpc_nrs_resource **resp, bool moving_req)
335 {
336         struct nrs_crrn_net     *net;
337         struct nrs_crrn_client  *cli;
338         struct nrs_crrn_client  *tmp;
339         struct ptlrpc_request   *req;
340
341         if (parent == NULL) {
342                 *resp = &((struct nrs_crrn_net *)policy->pol_private)->cn_res;
343                 return 0;
344         }
345
346         net = container_of(parent, struct nrs_crrn_net, cn_res);
347         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
348
349         cli = cfs_hash_lookup(net->cn_cli_hash, &req->rq_peer.nid);
350         if (cli != NULL)
351                 goto out;
352
353         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
354                           sizeof(*cli), moving_req ? GFP_ATOMIC : GFP_NOFS);
355         if (cli == NULL)
356                 return -ENOMEM;
357
358         cli->cc_nid = req->rq_peer.nid;
359
360         atomic_set(&cli->cc_ref, 1);
361         tmp = cfs_hash_findadd_unique(net->cn_cli_hash, &cli->cc_nid,
362                                       &cli->cc_hnode);
363         if (tmp != cli) {
364                 OBD_FREE_PTR(cli);
365                 cli = tmp;
366         }
367 out:
368         *resp = &cli->cc_res;
369
370         return 1;
371 }
372
373 /**
374  * Called when releasing references to the resource hierachy obtained for a
375  * request for scheduling using the CRR-N policy.
376  *
377  * \param[in] policy   the policy the resource belongs to
378  * \param[in] res      the resource to be released
379  */
380 static void nrs_crrn_res_put(struct ptlrpc_nrs_policy *policy,
381                              const struct ptlrpc_nrs_resource *res)
382 {
383         struct nrs_crrn_net     *net;
384         struct nrs_crrn_client  *cli;
385
386         /**
387          * Do nothing for freeing parent, nrs_crrn_net resources
388          */
389         if (res->res_parent == NULL)
390                 return;
391
392         cli = container_of(res, struct nrs_crrn_client, cc_res);
393         net = container_of(res->res_parent, struct nrs_crrn_net, cn_res);
394
395         cfs_hash_put(net->cn_cli_hash, &cli->cc_hnode);
396 }
397
398 /**
399  * Called when getting a request from the CRR-N policy for handlingso that it can be served
400  *
401  * \param[in] policy the policy being polled
402  * \param[in] peek   when set, signifies that we just want to examine the
403  *                   request, and not handle it, so the request is not removed
404  *                   from the policy.
405  * \param[in] force  force the policy to return a request; unused in this policy
406  *
407  * \retval the request to be handled
408  * \retval NULL no request available
409  *
410  * \see ptlrpc_nrs_req_get_nolock()
411  * \see nrs_request_get()
412  */
413 static
414 struct ptlrpc_nrs_request *nrs_crrn_req_get(struct ptlrpc_nrs_policy *policy,
415                                             bool peek, bool force)
416 {
417         struct nrs_crrn_net       *net = policy->pol_private;
418         struct cfs_binheap_node   *node = cfs_binheap_root(net->cn_binheap);
419         struct ptlrpc_nrs_request *nrq;
420
421         nrq = unlikely(node == NULL) ? NULL :
422               container_of(node, struct ptlrpc_nrs_request, nr_node);
423
424         if (likely(!peek && nrq != NULL)) {
425                 struct nrs_crrn_client *cli;
426                 struct ptlrpc_request *req = container_of(nrq,
427                                                           struct ptlrpc_request,
428                                                           rq_nrq);
429
430                 cli = container_of(nrs_request_resource(nrq),
431                                    struct nrs_crrn_client, cc_res);
432
433                 LASSERT(nrq->nr_u.crr.cr_round <= cli->cc_round);
434
435                 cfs_binheap_remove(net->cn_binheap, &nrq->nr_node);
436                 cli->cc_active--;
437
438                 CDEBUG(D_RPCTRACE,
439                        "NRS: starting to handle %s request from %s, with round "
440                        "%llu\n", NRS_POL_NAME_CRRN,
441                        libcfs_id2str(req->rq_peer), nrq->nr_u.crr.cr_round);
442
443                 /** Peek at the next request to be served */
444                 node = cfs_binheap_root(net->cn_binheap);
445
446                 /** No more requests */
447                 if (unlikely(node == NULL)) {
448                         net->cn_round++;
449                 } else {
450                         struct ptlrpc_nrs_request *next;
451
452                         next = container_of(node, struct ptlrpc_nrs_request,
453                                             nr_node);
454
455                         if (net->cn_round < next->nr_u.crr.cr_round)
456                                 net->cn_round = next->nr_u.crr.cr_round;
457                 }
458         }
459
460         return nrq;
461 }
462
463 /**
464  * Adds request \a nrq to a CRR-N \a policy instance's set of queued requests
465  *
466  * A scheduling round is a stream of requests that have been sorted in batches
467  * according to the client that they originate from (as identified by its NID);
468  * there can be only one batch for each client in each round. The batches are of
469  * maximum size nrs_crrn_net:cn_quantum. When a new request arrives for
470  * scheduling from a client that has exhausted its quantum in its current round,
471  * it will start scheduling requests on the next scheduling round. Clients are
472  * allowed to schedule requests against a round until all requests for the round
473  * are serviced, so a client might miss a round if it is not generating requests
474  * for a long enough period of time. Clients that miss a round will continue
475  * with scheduling the next request that they generate, starting at the round
476  * that requests are being dispatched for, at the time of arrival of this new
477  * request.
478  *
479  * Requests are tagged with the round number and a sequence number; the sequence
480  * number indicates the relative ordering amongst the batches of requests in a
481  * round, and is identical for all requests in a batch, as is the round number.
482  * The round and sequence numbers are used by crrn_req_compare() in order to
483  * maintain an ordered set of rounds, with each round consisting of an ordered
484  * set of batches of requests.
485  *
486  * \param[in] policy the policy
487  * \param[in] nrq    the request to add
488  *
489  * \retval 0    request successfully added
490  * \retval != 0 error
491  */
492 static int nrs_crrn_req_add(struct ptlrpc_nrs_policy *policy,
493                             struct ptlrpc_nrs_request *nrq)
494 {
495         struct nrs_crrn_net     *net;
496         struct nrs_crrn_client  *cli;
497         int                      rc;
498
499         cli = container_of(nrs_request_resource(nrq),
500                            struct nrs_crrn_client, cc_res);
501         net = container_of(nrs_request_resource(nrq)->res_parent,
502                            struct nrs_crrn_net, cn_res);
503
504         if (cli->cc_quantum == 0 || cli->cc_round < net->cn_round ||
505             (cli->cc_active == 0 && cli->cc_quantum > 0)) {
506
507                 /**
508                  * If the client has no pending requests, and still some of its
509                  * quantum remaining unused, which implies it has not had a
510                  * chance to schedule up to its maximum allowed batch size of
511                  * requests in the previous round it participated, schedule this
512                  * next request on a new round; this avoids fragmentation of
513                  * request batches caused by client inactivity, at the expense
514                  * of potentially slightly increased service time for the
515                  * request batch this request will be a part of.
516                  */
517                 if (cli->cc_active == 0 && cli->cc_quantum > 0)
518                         cli->cc_round++;
519
520                 /** A new scheduling round has commenced */
521                 if (cli->cc_round < net->cn_round)
522                         cli->cc_round = net->cn_round;
523
524                 /** I was not the last client through here */
525                 if (cli->cc_sequence < net->cn_sequence)
526                         cli->cc_sequence = ++net->cn_sequence;
527                 /**
528                  * Reset the quantum if we have reached the maximum quantum
529                  * size for this batch, or even if we have not managed to
530                  * complete a batch size up to its maximum allowed size.
531                  * XXX: Accessed unlocked
532                  */
533                 cli->cc_quantum = net->cn_quantum;
534         }
535
536         nrq->nr_u.crr.cr_round = cli->cc_round;
537         nrq->nr_u.crr.cr_sequence = cli->cc_sequence;
538
539         rc = cfs_binheap_insert(net->cn_binheap, &nrq->nr_node);
540         if (rc == 0) {
541                 cli->cc_active++;
542                 if (--cli->cc_quantum == 0)
543                         cli->cc_round++;
544         }
545         return rc;
546 }
547
548 /**
549  * Removes request \a nrq from a CRR-N \a policy instance's set of queued
550  * requests.
551  *
552  * \param[in] policy the policy
553  * \param[in] nrq    the request to remove
554  */
555 static void nrs_crrn_req_del(struct ptlrpc_nrs_policy *policy,
556                              struct ptlrpc_nrs_request *nrq)
557 {
558         struct nrs_crrn_net     *net;
559         struct nrs_crrn_client  *cli;
560         bool                     is_root;
561
562         cli = container_of(nrs_request_resource(nrq),
563                            struct nrs_crrn_client, cc_res);
564         net = container_of(nrs_request_resource(nrq)->res_parent,
565                            struct nrs_crrn_net, cn_res);
566
567         LASSERT(nrq->nr_u.crr.cr_round <= cli->cc_round);
568
569         is_root = &nrq->nr_node == cfs_binheap_root(net->cn_binheap);
570
571         cfs_binheap_remove(net->cn_binheap, &nrq->nr_node);
572         cli->cc_active--;
573
574         /**
575          * If we just deleted the node at the root of the binheap, we may have
576          * to adjust round numbers.
577          */
578         if (unlikely(is_root)) {
579                 /** Peek at the next request to be served */
580                 struct cfs_binheap_node *node = cfs_binheap_root(net->cn_binheap);
581
582                 /** No more requests */
583                 if (unlikely(node == NULL)) {
584                         net->cn_round++;
585                 } else {
586                         nrq = container_of(node, struct ptlrpc_nrs_request,
587                                            nr_node);
588
589                         if (net->cn_round < nrq->nr_u.crr.cr_round)
590                                 net->cn_round = nrq->nr_u.crr.cr_round;
591                 }
592         }
593 }
594
595 /**
596  * Called right after the request \a nrq finishes being handled by CRR-N policy
597  * instance \a policy.
598  *
599  * \param[in] policy the policy that handled the request
600  * \param[in] nrq    the request that was handled
601  */
602 static void nrs_crrn_req_stop(struct ptlrpc_nrs_policy *policy,
603                               struct ptlrpc_nrs_request *nrq)
604 {
605         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
606                                                   rq_nrq);
607
608         CDEBUG(D_RPCTRACE,
609                "NRS: finished handling %s request from %s, with round %llu"
610                "\n", NRS_POL_NAME_CRRN,
611                libcfs_id2str(req->rq_peer), nrq->nr_u.crr.cr_round);
612 }
613
614 #ifdef CONFIG_PROC_FS
615
616 /**
617  * lprocfs interface
618  */
619
620 /**
621  * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
622  * for CRR-N policy instances on both the regular and high-priority NRS head
623  * of a service, as long as a policy instance is not in the
624  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
625  * state are skipped later by nrs_crrn_ctl().
626  *
627  * Quantum values are in # of RPCs, and output is in YAML format.
628  *
629  * For example:
630  *
631  *      reg_quantum:8
632  *      hp_quantum:4
633  */
634 static int
635 ptlrpc_lprocfs_nrs_crrn_quantum_seq_show(struct seq_file *m, void *data)
636 {
637         struct ptlrpc_service   *svc = m->private;
638         __u16                   quantum;
639         int                     rc;
640
641         /**
642          * Perform two separate calls to this as only one of the NRS heads'
643          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
644          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
645          */
646         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
647                                        NRS_POL_NAME_CRRN,
648                                        NRS_CTL_CRRN_RD_QUANTUM,
649                                        true, &quantum);
650         if (rc == 0) {
651                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_REG
652                            "%-5d\n", quantum);
653                 /**
654                  * Ignore -ENODEV as the regular NRS head's policy may be in the
655                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
656                  */
657         } else if (rc != -ENODEV) {
658                 return rc;
659         }
660
661         if (!nrs_svc_has_hp(svc))
662                 goto no_hp;
663
664         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
665                                        NRS_POL_NAME_CRRN,
666                                        NRS_CTL_CRRN_RD_QUANTUM,
667                                        true, &quantum);
668         if (rc == 0) {
669                 seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
670                 /**
671                  * Ignore -ENODEV as the high priority NRS head's policy may be
672                  * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
673                  */
674         } else if (rc != -ENODEV) {
675                 return rc;
676         }
677
678 no_hp:
679         return rc;
680 }
681
682 /**
683  * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
684  * for CRR-N policy instances of a service. The user can set the quantum size
685  * for the regular or high priority NRS head individually by specifying each
686  * value, or both together in a single invocation.
687  *
688  * For example:
689  *
690  * lctl set_param *.*.*.nrs_crrn_quantum=reg_quantum:32, to set the regular
691  * request quantum size on all PTLRPC services to 32
692  *
693  * lctl set_param *.*.*.nrs_crrn_quantum=hp_quantum:16, to set the high
694  * priority request quantum size on all PTLRPC services to 16, and
695  *
696  * lctl set_param *.*.ost_io.nrs_crrn_quantum=16, to set both the regular and
697  * high priority request quantum sizes of the ost_io service to 16.
698  *
699  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
700  * are skipped later by nrs_crrn_ctl().
701  */
702 static ssize_t
703 ptlrpc_lprocfs_nrs_crrn_quantum_seq_write(struct file *file,
704                                           const char __user *buffer,
705                                           size_t count,
706                                           loff_t *off)
707 {
708         struct ptlrpc_service       *svc = ((struct seq_file *)file->private_data)->private;
709         enum ptlrpc_nrs_queue_type   queue = 0;
710         char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
711         char                        *val;
712         long                         quantum_reg;
713         long                         quantum_hp;
714         /** lprocfs_find_named_value() modifies its argument, so keep a copy */
715         size_t                       count_copy;
716         int                          rc = 0;
717         int                          rc2 = 0;
718
719         if (count > (sizeof(kernbuf) - 1))
720                 return -EINVAL;
721
722         if (copy_from_user(kernbuf, buffer, count))
723                 return -EFAULT;
724
725         kernbuf[count] = '\0';
726
727         count_copy = count;
728
729         /**
730          * Check if the regular quantum value has been specified
731          */
732         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
733                                        &count_copy);
734         if (val != kernbuf) {
735                 quantum_reg = simple_strtol(val, NULL, 10);
736
737                 queue |= PTLRPC_NRS_QUEUE_REG;
738         }
739
740         count_copy = count;
741
742         /**
743          * Check if the high priority quantum value has been specified
744          */
745         val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
746                                        &count_copy);
747         if (val != kernbuf) {
748                 if (!nrs_svc_has_hp(svc))
749                         return -ENODEV;
750
751                 quantum_hp = simple_strtol(val, NULL, 10);
752
753                 queue |= PTLRPC_NRS_QUEUE_HP;
754         }
755
756         /**
757          * If none of the queues has been specified, look for a valid numerical
758          * value
759          */
760         if (queue == 0) {
761                 if (!isdigit(kernbuf[0]))
762                         return -EINVAL;
763
764                 quantum_reg = simple_strtol(kernbuf, NULL, 10);
765
766                 queue = PTLRPC_NRS_QUEUE_REG;
767
768                 if (nrs_svc_has_hp(svc)) {
769                         queue |= PTLRPC_NRS_QUEUE_HP;
770                         quantum_hp = quantum_reg;
771                 }
772         }
773
774         if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
775             ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
776             (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
777             ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
778                 return -EINVAL;
779
780         /**
781          * We change the values on regular and HP NRS heads separately, so that
782          * we do not exit early from ptlrpc_nrs_policy_control() with an error
783          * returned by nrs_policy_ctl_locked(), in cases where the user has not
784          * started the policy on either the regular or HP NRS head; i.e. we are
785          * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
786          * only if the operation fails with -ENODEV on all heads that have been
787          * specified by the command; if at least one operation succeeds,
788          * success is returned.
789          */
790         if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
791                 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
792                                                NRS_POL_NAME_CRRN,
793                                                NRS_CTL_CRRN_WR_QUANTUM, false,
794                                                &quantum_reg);
795                 if ((rc < 0 && rc != -ENODEV) ||
796                     (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
797                         return rc;
798         }
799
800         if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
801                 rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
802                                                 NRS_POL_NAME_CRRN,
803                                                 NRS_CTL_CRRN_WR_QUANTUM, false,
804                                                 &quantum_hp);
805                 if ((rc2 < 0 && rc2 != -ENODEV) ||
806                     (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
807                         return rc2;
808         }
809
810         return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
811 }
812 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_crrn_quantum);
813
814 /**
815  * Initializes a CRR-N policy's lprocfs interface for service \a svc
816  *
817  * \param[in] svc the service
818  *
819  * \retval 0    success
820  * \retval != 0 error
821  */
822 static int nrs_crrn_lprocfs_init(struct ptlrpc_service *svc)
823 {
824         struct lprocfs_vars nrs_crrn_lprocfs_vars[] = {
825                 { .name         = "nrs_crrn_quantum",
826                   .fops         = &ptlrpc_lprocfs_nrs_crrn_quantum_fops,
827                   .data = svc },
828                 { NULL }
829         };
830
831         if (svc->srv_procroot == NULL)
832                 return 0;
833
834         return lprocfs_add_vars(svc->srv_procroot, nrs_crrn_lprocfs_vars, NULL);
835 }
836
837 /**
838  * Cleans up a CRR-N policy's lprocfs interface for service \a svc
839  *
840  * \param[in] svc the service
841  */
842 static void nrs_crrn_lprocfs_fini(struct ptlrpc_service *svc)
843 {
844         if (svc->srv_procroot == NULL)
845                 return;
846
847         lprocfs_remove_proc_entry("nrs_crrn_quantum", svc->srv_procroot);
848 }
849
850 #endif /* CONFIG_PROC_FS */
851
852 /**
853  * CRR-N policy operations
854  */
855 static const struct ptlrpc_nrs_pol_ops nrs_crrn_ops = {
856         .op_policy_start        = nrs_crrn_start,
857         .op_policy_stop         = nrs_crrn_stop,
858         .op_policy_ctl          = nrs_crrn_ctl,
859         .op_res_get             = nrs_crrn_res_get,
860         .op_res_put             = nrs_crrn_res_put,
861         .op_req_get             = nrs_crrn_req_get,
862         .op_req_enqueue         = nrs_crrn_req_add,
863         .op_req_dequeue         = nrs_crrn_req_del,
864         .op_req_stop            = nrs_crrn_req_stop,
865 #ifdef CONFIG_PROC_FS
866         .op_lprocfs_init        = nrs_crrn_lprocfs_init,
867         .op_lprocfs_fini        = nrs_crrn_lprocfs_fini,
868 #endif
869 };
870
871 /**
872  * CRR-N policy configuration
873  */
874 struct ptlrpc_nrs_pol_conf nrs_conf_crrn = {
875         .nc_name                = NRS_POL_NAME_CRRN,
876         .nc_ops                 = &nrs_crrn_ops,
877         .nc_compat              = nrs_policy_compat_all,
878 };
879
880 /** @} CRR-N policy */
881
882 /** @} nrs */
883
884 #endif /* HAVE_SERVER_SUPPORT */