Whamcloud - gitweb
LU-6426 lustre: remove EIOCBRETRY handling
[fs/lustre-release.git] / lustre / ptlrpc / nrs.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2011, 2014, Intel Corporation.
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs.c
29  *
30  * Network Request Scheduler (NRS)
31  *
32  * Allows to reorder the handling of RPCs at servers.
33  *
34  * Author: Liang Zhen <liang@whamcloud.com>
35  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
36  */
37 /**
38  * \addtogoup nrs
39  * @{
40  */
41
42 #define DEBUG_SUBSYSTEM S_RPC
43 #include <obd_support.h>
44 #include <obd_class.h>
45 #include <lustre_net.h>
46 #include <lprocfs_status.h>
47 #include <libcfs/libcfs.h>
48 #include "ptlrpc_internal.h"
49
50 /**
51  * NRS core object.
52  */
53 struct nrs_core nrs_core;
54
55 static int nrs_policy_init(struct ptlrpc_nrs_policy *policy)
56 {
57         return policy->pol_desc->pd_ops->op_policy_init != NULL ?
58                policy->pol_desc->pd_ops->op_policy_init(policy) : 0;
59 }
60
61 static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
62 {
63         LASSERT(policy->pol_ref == 0);
64         LASSERT(policy->pol_req_queued == 0);
65
66         if (policy->pol_desc->pd_ops->op_policy_fini != NULL)
67                 policy->pol_desc->pd_ops->op_policy_fini(policy);
68 }
69
70 static int nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy,
71                                  enum ptlrpc_nrs_ctl opc, void *arg)
72 {
73         /**
74          * The policy may be stopped, but the lprocfs files and
75          * ptlrpc_nrs_policy instances remain present until unregistration time.
76          * Do not perform the ctl operation if the policy is stopped, as
77          * policy->pol_private will be NULL in such a case.
78          */
79         if (policy->pol_state == NRS_POL_STATE_STOPPED)
80                 RETURN(-ENODEV);
81
82         RETURN(policy->pol_desc->pd_ops->op_policy_ctl != NULL ?
83                policy->pol_desc->pd_ops->op_policy_ctl(policy, opc, arg) :
84                -ENOSYS);
85 }
86
87 static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
88 {
89         struct ptlrpc_nrs *nrs = policy->pol_nrs;
90         ENTRY;
91
92         if (policy->pol_desc->pd_ops->op_policy_stop != NULL) {
93                 spin_unlock(&nrs->nrs_lock);
94
95                 policy->pol_desc->pd_ops->op_policy_stop(policy);
96
97                 spin_lock(&nrs->nrs_lock);
98         }
99
100         LASSERT(list_empty(&policy->pol_list_queued));
101         LASSERT(policy->pol_req_queued == 0 &&
102                 policy->pol_req_started == 0);
103
104         policy->pol_private = NULL;
105
106         policy->pol_state = NRS_POL_STATE_STOPPED;
107
108         if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
109                 module_put(policy->pol_desc->pd_owner);
110
111         EXIT;
112 }
113
114 static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
115 {
116         struct ptlrpc_nrs *nrs = policy->pol_nrs;
117         ENTRY;
118
119         if (nrs->nrs_policy_fallback == policy && !nrs->nrs_stopping)
120                 RETURN(-EPERM);
121
122         if (policy->pol_state == NRS_POL_STATE_STARTING)
123                 RETURN(-EAGAIN);
124
125         /* In progress or already stopped */
126         if (policy->pol_state != NRS_POL_STATE_STARTED)
127                 RETURN(0);
128
129         policy->pol_state = NRS_POL_STATE_STOPPING;
130
131         /* Immediately make it invisible */
132         if (nrs->nrs_policy_primary == policy) {
133                 nrs->nrs_policy_primary = NULL;
134
135         } else {
136                 LASSERT(nrs->nrs_policy_fallback == policy);
137                 nrs->nrs_policy_fallback = NULL;
138         }
139
140         /* I have the only refcount */
141         if (policy->pol_ref == 1)
142                 nrs_policy_stop0(policy);
143
144         RETURN(0);
145 }
146
147 /**
148  * Transitions the \a nrs NRS head's primary policy to
149  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING and if the policy has no
150  * pending usage references, to ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED.
151  *
152  * \param[in] nrs the NRS head to carry out this operation on
153  */
154 static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
155 {
156         struct ptlrpc_nrs_policy *tmp = nrs->nrs_policy_primary;
157         ENTRY;
158
159         if (tmp == NULL) {
160                 /**
161                  * XXX: This should really be RETURN_EXIT, but the latter does
162                  * not currently print anything out, and possibly should be
163                  * fixed to do so.
164                  */
165                 EXIT;
166                 return;
167         }
168
169         nrs->nrs_policy_primary = NULL;
170
171         LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED);
172         tmp->pol_state = NRS_POL_STATE_STOPPING;
173
174         if (tmp->pol_ref == 0)
175                 nrs_policy_stop0(tmp);
176         EXIT;
177 }
178
179 /**
180  * Transitions a policy across the ptlrpc_nrs_pol_state range of values, in
181  * response to an lprocfs command to start a policy.
182  *
183  * If a primary policy different to the current one is specified, this function
184  * will transition the new policy to the
185  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTING and then to
186  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED, and will then transition
187  * the old primary policy (if there is one) to
188  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING, and if there are no outstanding
189  * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED.
190  *
191  * If the fallback policy is specified, this is taken to indicate an instruction
192  * to stop the current primary policy, without substituting it with another
193  * primary policy, so the primary policy (if any) is transitioned to
194  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING, and if there are no outstanding
195  * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. In
196  * this case, the fallback policy is only left active in the NRS head.
197  */
198 static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg)
199 {
200         struct ptlrpc_nrs      *nrs = policy->pol_nrs;
201         int                     rc = 0;
202         ENTRY;
203
204         /**
205          * Don't allow multiple starting which is too complex, and has no real
206          * benefit.
207          */
208         if (nrs->nrs_policy_starting)
209                 RETURN(-EAGAIN);
210
211         LASSERT(policy->pol_state != NRS_POL_STATE_STARTING);
212
213         if (policy->pol_state == NRS_POL_STATE_STOPPING)
214                 RETURN(-EAGAIN);
215
216         if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
217                 /**
218                  * This is for cases in which the user sets the policy to the
219                  * fallback policy (currently fifo for all services); i.e. the
220                  * user is resetting the policy to the default; so we stop the
221                  * primary policy, if any.
222                  */
223                 if (policy == nrs->nrs_policy_fallback) {
224                         nrs_policy_stop_primary(nrs);
225                         RETURN(0);
226                 }
227
228                 /**
229                  * If we reach here, we must be setting up the fallback policy
230                  * at service startup time, and only a single policy with the
231                  * nrs_policy_flags::PTLRPC_NRS_FL_FALLBACK flag set can
232                  * register with NRS core.
233                  */
234                 LASSERT(nrs->nrs_policy_fallback == NULL);
235         } else {
236                 /**
237                  * Shouldn't start primary policy if w/o fallback policy.
238                  */
239                 if (nrs->nrs_policy_fallback == NULL)
240                         RETURN(-EPERM);
241
242                 if (policy->pol_state == NRS_POL_STATE_STARTED)
243                         RETURN(0);
244         }
245
246         /**
247          * Increase the module usage count for policies registering from other
248          * modules.
249          */
250         if (atomic_inc_return(&policy->pol_desc->pd_refs) == 1 &&
251             !try_module_get(policy->pol_desc->pd_owner)) {
252                 atomic_dec(&policy->pol_desc->pd_refs);
253                 CERROR("NRS: cannot get module for policy %s; is it alive?\n",
254                        policy->pol_desc->pd_name);
255                 RETURN(-ENODEV);
256         }
257
258         /**
259          * Serialize policy starting across the NRS head
260          */
261         nrs->nrs_policy_starting = 1;
262
263         policy->pol_state = NRS_POL_STATE_STARTING;
264
265         if (policy->pol_desc->pd_ops->op_policy_start) {
266                 spin_unlock(&nrs->nrs_lock);
267
268                 rc = policy->pol_desc->pd_ops->op_policy_start(policy, arg);
269
270                 spin_lock(&nrs->nrs_lock);
271                 if (rc != 0) {
272                         if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
273                                 module_put(policy->pol_desc->pd_owner);
274
275                         policy->pol_state = NRS_POL_STATE_STOPPED;
276                         GOTO(out, rc);
277                 }
278         }
279
280         policy->pol_state = NRS_POL_STATE_STARTED;
281
282         if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
283                 /**
284                  * This path is only used at PTLRPC service setup time.
285                  */
286                 nrs->nrs_policy_fallback = policy;
287         } else {
288                 /*
289                  * Try to stop the current primary policy if there is one.
290                  */
291                 nrs_policy_stop_primary(nrs);
292
293                 /**
294                  * And set the newly-started policy as the primary one.
295                  */
296                 nrs->nrs_policy_primary = policy;
297         }
298
299 out:
300         nrs->nrs_policy_starting = 0;
301
302         RETURN(rc);
303 }
304
305 /**
306  * Increases the policy's usage reference count.
307  */
308 static inline void nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
309 {
310         policy->pol_ref++;
311 }
312
313 /**
314  * Decreases the policy's usage reference count, and stops the policy in case it
315  * was already stopping and have no more outstanding usage references (which
316  * indicates it has no more queued or started requests, and can be safely
317  * stopped).
318  */
319 static void nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
320 {
321         LASSERT(policy->pol_ref > 0);
322
323         policy->pol_ref--;
324         if (unlikely(policy->pol_ref == 0 &&
325             policy->pol_state == NRS_POL_STATE_STOPPING))
326                 nrs_policy_stop0(policy);
327 }
328
329 static void nrs_policy_put(struct ptlrpc_nrs_policy *policy)
330 {
331         spin_lock(&policy->pol_nrs->nrs_lock);
332         nrs_policy_put_locked(policy);
333         spin_unlock(&policy->pol_nrs->nrs_lock);
334 }
335
336 /**
337  * Find and return a policy by name.
338  */
339 static struct ptlrpc_nrs_policy * nrs_policy_find_locked(struct ptlrpc_nrs *nrs,
340                                                          char *name)
341 {
342         struct ptlrpc_nrs_policy *tmp;
343
344         list_for_each_entry(tmp, &nrs->nrs_policy_list, pol_list) {
345                 if (strncmp(tmp->pol_desc->pd_name, name,
346                             NRS_POL_NAME_MAX) == 0) {
347                         nrs_policy_get_locked(tmp);
348                         return tmp;
349                 }
350         }
351         return NULL;
352 }
353
354 /**
355  * Release references for the resource hierarchy moving upwards towards the
356  * policy instance resource.
357  */
358 static void nrs_resource_put(struct ptlrpc_nrs_resource *res)
359 {
360         struct ptlrpc_nrs_policy *policy = res->res_policy;
361
362         if (policy->pol_desc->pd_ops->op_res_put != NULL) {
363                 struct ptlrpc_nrs_resource *parent;
364
365                 for (; res != NULL; res = parent) {
366                         parent = res->res_parent;
367                         policy->pol_desc->pd_ops->op_res_put(policy, res);
368                 }
369         }
370 }
371
372 /**
373  * Obtains references for each resource in the resource hierarchy for request
374  * \a nrq if it is to be handled by \a policy.
375  *
376  * \param[in] policy      the policy
377  * \param[in] nrq         the request
378  * \param[in] moving_req  denotes whether this is a call to the function by
379  *                        ldlm_lock_reorder_req(), in order to move \a nrq to
380  *                        the high-priority NRS head; we should not sleep when
381  *                        set.
382  *
383  * \retval NULL           resource hierarchy references not obtained
384  * \retval valid-pointer  the bottom level of the resource hierarchy
385  *
386  * \see ptlrpc_nrs_pol_ops::op_res_get()
387  */
388 static
389 struct ptlrpc_nrs_resource * nrs_resource_get(struct ptlrpc_nrs_policy *policy,
390                                               struct ptlrpc_nrs_request *nrq,
391                                               bool moving_req)
392 {
393         /**
394          * Set to NULL to traverse the resource hierarchy from the top.
395          */
396         struct ptlrpc_nrs_resource *res = NULL;
397         struct ptlrpc_nrs_resource *tmp = NULL;
398         int                         rc;
399
400         while (1) {
401                 rc = policy->pol_desc->pd_ops->op_res_get(policy, nrq, res,
402                                                           &tmp, moving_req);
403                 if (rc < 0) {
404                         if (res != NULL)
405                                 nrs_resource_put(res);
406                         return NULL;
407                 }
408
409                 LASSERT(tmp != NULL);
410                 tmp->res_parent = res;
411                 tmp->res_policy = policy;
412                 res = tmp;
413                 tmp = NULL;
414                 /**
415                  * Return once we have obtained a reference to the bottom level
416                  * of the resource hierarchy.
417                  */
418                 if (rc > 0)
419                         return res;
420         }
421 }
422
423 /**
424  * Obtains resources for the resource hierarchies and policy references for
425  * the fallback and current primary policy (if any), that will later be used
426  * to handle request \a nrq.
427  *
428  * \param[in]  nrs  the NRS head instance that will be handling request \a nrq.
429  * \param[in]  nrq  the request that is being handled.
430  * \param[out] resp the array where references to the resource hierarchy are
431  *                  stored.
432  * \param[in]  moving_req  is set when obtaining resources while moving a
433  *                         request from a policy on the regular NRS head to a
434  *                         policy on the HP NRS head (via
435  *                         ldlm_lock_reorder_req()). It signifies that
436  *                         allocations to get resources should be atomic; for
437  *                         a full explanation, see comment in
438  *                         ptlrpc_nrs_pol_ops::op_res_get().
439  */
440 static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
441                                   struct ptlrpc_nrs_request *nrq,
442                                   struct ptlrpc_nrs_resource **resp,
443                                   bool moving_req)
444 {
445         struct ptlrpc_nrs_policy   *primary = NULL;
446         struct ptlrpc_nrs_policy   *fallback = NULL;
447
448         memset(resp, 0, sizeof(resp[0]) * NRS_RES_MAX);
449
450         /**
451          * Obtain policy references.
452          */
453         spin_lock(&nrs->nrs_lock);
454
455         fallback = nrs->nrs_policy_fallback;
456         nrs_policy_get_locked(fallback);
457
458         primary = nrs->nrs_policy_primary;
459         if (primary != NULL)
460                 nrs_policy_get_locked(primary);
461
462         spin_unlock(&nrs->nrs_lock);
463
464         /**
465          * Obtain resource hierarchy references.
466          */
467         resp[NRS_RES_FALLBACK] = nrs_resource_get(fallback, nrq, moving_req);
468         LASSERT(resp[NRS_RES_FALLBACK] != NULL);
469
470         if (primary != NULL) {
471                 resp[NRS_RES_PRIMARY] = nrs_resource_get(primary, nrq,
472                                                          moving_req);
473                 /**
474                  * A primary policy may exist which may not wish to serve a
475                  * particular request for different reasons; release the
476                  * reference on the policy as it will not be used for this
477                  * request.
478                  */
479                 if (resp[NRS_RES_PRIMARY] == NULL)
480                         nrs_policy_put(primary);
481         }
482 }
483
484 /**
485  * Releases references to resource hierarchies and policies, because they are no
486  * longer required; used when request handling has been completed, or the
487  * request is moving to the high priority NRS head.
488  *
489  * \param resp  the resource hierarchy that is being released
490  *
491  * \see ptlrpcnrs_req_hp_move()
492  * \see ptlrpc_nrs_req_finalize()
493  */
494 static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
495 {
496         struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
497         struct ptlrpc_nrs        *nrs = NULL;
498         int                       i;
499
500         for (i = 0; i < NRS_RES_MAX; i++) {
501                 if (resp[i] != NULL) {
502                         pols[i] = resp[i]->res_policy;
503                         nrs_resource_put(resp[i]);
504                         resp[i] = NULL;
505                 } else {
506                         pols[i] = NULL;
507                 }
508         }
509
510         for (i = 0; i < NRS_RES_MAX; i++) {
511                 if (pols[i] == NULL)
512                         continue;
513
514                 if (nrs == NULL) {
515                         nrs = pols[i]->pol_nrs;
516                         spin_lock(&nrs->nrs_lock);
517                 }
518                 nrs_policy_put_locked(pols[i]);
519         }
520
521         if (nrs != NULL)
522                 spin_unlock(&nrs->nrs_lock);
523 }
524
525 /**
526  * Obtains an NRS request from \a policy for handling or examination; the
527  * request should be removed in the 'handling' case.
528  *
529  * Calling into this function implies we already know the policy has a request
530  * waiting to be handled.
531  *
532  * \param[in] policy the policy from which a request
533  * \param[in] peek   when set, signifies that we just want to examine the
534  *                   request, and not handle it, so the request is not removed
535  *                   from the policy.
536  * \param[in] force  when set, it will force a policy to return a request if it
537  *                   has one pending
538  *
539  * \retval the NRS request to be handled
540  */
541 static inline
542 struct ptlrpc_nrs_request * nrs_request_get(struct ptlrpc_nrs_policy *policy,
543                                             bool peek, bool force)
544 {
545         struct ptlrpc_nrs_request *nrq;
546
547         LASSERT(policy->pol_req_queued > 0);
548
549         nrq = policy->pol_desc->pd_ops->op_req_get(policy, peek, force);
550
551         LASSERT(ergo(nrq != NULL, nrs_request_policy(nrq) == policy));
552
553         return nrq;
554 }
555
556 /**
557  * Enqueues request \a nrq for later handling, via one one the policies for
558  * which resources where earlier obtained via nrs_resource_get_safe(). The
559  * function attempts to enqueue the request first on the primary policy
560  * (if any), since this is the preferred choice.
561  *
562  * \param nrq the request being enqueued
563  *
564  * \see nrs_resource_get_safe()
565  */
566 static inline void nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
567 {
568         struct ptlrpc_nrs_policy *policy;
569         int                       rc;
570         int                       i;
571
572         /**
573          * Try in descending order, because the primary policy (if any) is
574          * the preferred choice.
575          */
576         for (i = NRS_RES_MAX - 1; i >= 0; i--) {
577                 if (nrq->nr_res_ptrs[i] == NULL)
578                         continue;
579
580                 nrq->nr_res_idx = i;
581                 policy = nrq->nr_res_ptrs[i]->res_policy;
582
583                 rc = policy->pol_desc->pd_ops->op_req_enqueue(policy, nrq);
584                 if (rc == 0) {
585                         policy->pol_nrs->nrs_req_queued++;
586                         policy->pol_req_queued++;
587                         return;
588                 }
589         }
590         /**
591          * Should never get here, as at least the primary policy's
592          * ptlrpc_nrs_pol_ops::op_req_enqueue() implementation should always
593          * succeed.
594          */
595         LBUG();
596 }
597
598 /**
599  * Called when a request has been handled
600  *
601  * \param[in] nrs the request that has been handled; can be used for
602  *                job/resource control.
603  *
604  * \see ptlrpc_nrs_req_stop_nolock()
605  */
606 static inline void nrs_request_stop(struct ptlrpc_nrs_request *nrq)
607 {
608         struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
609
610         if (policy->pol_desc->pd_ops->op_req_stop)
611                 policy->pol_desc->pd_ops->op_req_stop(policy, nrq);
612
613         LASSERT(policy->pol_nrs->nrs_req_started > 0);
614         LASSERT(policy->pol_req_started > 0);
615
616         policy->pol_nrs->nrs_req_started--;
617         policy->pol_req_started--;
618 }
619
620 /**
621  * Handler for operations that can be carried out on policies.
622  *
623  * Handles opcodes that are common to all policy types within NRS core, and
624  * passes any unknown opcodes to the policy-specific control function.
625  *
626  * \param[in]     nrs  the NRS head this policy belongs to.
627  * \param[in]     name the human-readable policy name; should be the same as
628  *                     ptlrpc_nrs_pol_desc::pd_name.
629  * \param[in]     opc  the opcode of the operation being carried out.
630  * \param[in,out] arg  can be used to pass information in and out between when
631  *                     carrying an operation; usually data that is private to
632  *                     the policy at some level, or generic policy status
633  *                     information.
634  *
635  * \retval -ve error condition
636  * \retval   0 operation was carried out successfully
637  */
638 static int nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name,
639                           enum ptlrpc_nrs_ctl opc, void *arg)
640 {
641         struct ptlrpc_nrs_policy       *policy;
642         int                             rc = 0;
643         ENTRY;
644
645         spin_lock(&nrs->nrs_lock);
646
647         policy = nrs_policy_find_locked(nrs, name);
648         if (policy == NULL)
649                 GOTO(out, rc = -ENOENT);
650
651         switch (opc) {
652                 /**
653                  * Unknown opcode, pass it down to the policy-specific control
654                  * function for handling.
655                  */
656         default:
657                 rc = nrs_policy_ctl_locked(policy, opc, arg);
658                 break;
659
660                 /**
661                  * Start \e policy
662                  */
663         case PTLRPC_NRS_CTL_START:
664                 rc = nrs_policy_start_locked(policy, arg);
665                 break;
666         }
667 out:
668         if (policy != NULL)
669                 nrs_policy_put_locked(policy);
670
671         spin_unlock(&nrs->nrs_lock);
672
673         RETURN(rc);
674 }
675
676 /**
677  * Unregisters a policy by name.
678  *
679  * \param[in] nrs  the NRS head this policy belongs to.
680  * \param[in] name the human-readable policy name; should be the same as
681  *                 ptlrpc_nrs_pol_desc::pd_name
682  *
683  * \retval -ve error
684  * \retval   0 success
685  */
686 static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
687 {
688         struct ptlrpc_nrs_policy *policy = NULL;
689         ENTRY;
690
691         spin_lock(&nrs->nrs_lock);
692
693         policy = nrs_policy_find_locked(nrs, name);
694         if (policy == NULL) {
695                 spin_unlock(&nrs->nrs_lock);
696
697                 CERROR("Can't find NRS policy %s\n", name);
698                 RETURN(-ENOENT);
699         }
700
701         if (policy->pol_ref > 1) {
702                 CERROR("Policy %s is busy with %d references\n", name,
703                        (int)policy->pol_ref);
704                 nrs_policy_put_locked(policy);
705
706                 spin_unlock(&nrs->nrs_lock);
707                 RETURN(-EBUSY);
708         }
709
710         LASSERT(policy->pol_req_queued == 0);
711         LASSERT(policy->pol_req_started == 0);
712
713         if (policy->pol_state != NRS_POL_STATE_STOPPED) {
714                 nrs_policy_stop_locked(policy);
715                 LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED);
716         }
717
718         list_del(&policy->pol_list);
719         nrs->nrs_num_pols--;
720
721         nrs_policy_put_locked(policy);
722
723         spin_unlock(&nrs->nrs_lock);
724
725         nrs_policy_fini(policy);
726
727         LASSERT(policy->pol_private == NULL);
728         OBD_FREE_PTR(policy);
729
730         RETURN(0);
731 }
732
733 /**
734  * Register a policy from \policy descriptor \a desc with NRS head \a nrs.
735  *
736  * \param[in] nrs   the NRS head on which the policy will be registered.
737  * \param[in] desc  the policy descriptor from which the information will be
738  *                  obtained to register the policy.
739  *
740  * \retval -ve error
741  * \retval   0 success
742  */
743 static int nrs_policy_register(struct ptlrpc_nrs *nrs,
744                                struct ptlrpc_nrs_pol_desc *desc)
745 {
746         struct ptlrpc_nrs_policy       *policy;
747         struct ptlrpc_nrs_policy       *tmp;
748         struct ptlrpc_service_part     *svcpt = nrs->nrs_svcpt;
749         int                             rc;
750         ENTRY;
751
752         LASSERT(svcpt != NULL);
753         LASSERT(desc->pd_ops != NULL);
754         LASSERT(desc->pd_ops->op_res_get != NULL);
755         LASSERT(desc->pd_ops->op_req_get != NULL);
756         LASSERT(desc->pd_ops->op_req_enqueue != NULL);
757         LASSERT(desc->pd_ops->op_req_dequeue != NULL);
758         LASSERT(desc->pd_compat != NULL);
759
760         OBD_CPT_ALLOC_GFP(policy, svcpt->scp_service->srv_cptable,
761                           svcpt->scp_cpt, sizeof(*policy), GFP_NOFS);
762         if (policy == NULL)
763                 RETURN(-ENOMEM);
764
765         policy->pol_nrs     = nrs;
766         policy->pol_desc    = desc;
767         policy->pol_state   = NRS_POL_STATE_STOPPED;
768         policy->pol_flags   = desc->pd_flags;
769
770         INIT_LIST_HEAD(&policy->pol_list);
771         INIT_LIST_HEAD(&policy->pol_list_queued);
772
773         rc = nrs_policy_init(policy);
774         if (rc != 0) {
775                 OBD_FREE_PTR(policy);
776                 RETURN(rc);
777         }
778
779         spin_lock(&nrs->nrs_lock);
780
781         tmp = nrs_policy_find_locked(nrs, policy->pol_desc->pd_name);
782         if (tmp != NULL) {
783                 CERROR("NRS policy %s has been registered, can't register it "
784                        "for %s\n", policy->pol_desc->pd_name,
785                        svcpt->scp_service->srv_name);
786                 nrs_policy_put_locked(tmp);
787
788                 spin_unlock(&nrs->nrs_lock);
789                 nrs_policy_fini(policy);
790                 OBD_FREE_PTR(policy);
791
792                 RETURN(-EEXIST);
793         }
794
795         list_add_tail(&policy->pol_list, &nrs->nrs_policy_list);
796         nrs->nrs_num_pols++;
797
798         if (policy->pol_flags & PTLRPC_NRS_FL_REG_START)
799                 rc = nrs_policy_start_locked(policy, NULL);
800
801         spin_unlock(&nrs->nrs_lock);
802
803         if (rc != 0)
804                 (void) nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
805
806         RETURN(rc);
807 }
808
809 /**
810  * Enqueue request \a req using one of the policies its resources are referring
811  * to.
812  *
813  * \param[in] req the request to enqueue.
814  */
815 static void ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
816 {
817         struct ptlrpc_nrs_policy       *policy;
818
819         LASSERT(req->rq_nrq.nr_initialized);
820         LASSERT(!req->rq_nrq.nr_enqueued);
821
822         nrs_request_enqueue(&req->rq_nrq);
823         req->rq_nrq.nr_enqueued = 1;
824
825         policy = nrs_request_policy(&req->rq_nrq);
826         /**
827          * Add the policy to the NRS head's list of policies with enqueued
828          * requests, if it has not been added there.
829          */
830         if (unlikely(list_empty(&policy->pol_list_queued)))
831                 list_add_tail(&policy->pol_list_queued,
832                                   &policy->pol_nrs->nrs_policy_queued);
833 }
834
835 /**
836  * Enqueue a request on the high priority NRS head.
837  *
838  * \param req the request to enqueue.
839  */
840 static void ptlrpc_nrs_hpreq_add_nolock(struct ptlrpc_request *req)
841 {
842         int     opc = lustre_msg_get_opc(req->rq_reqmsg);
843         ENTRY;
844
845         spin_lock(&req->rq_lock);
846         req->rq_hp = 1;
847         ptlrpc_nrs_req_add_nolock(req);
848         if (opc != OBD_PING)
849                 DEBUG_REQ(D_NET, req, "high priority req");
850         spin_unlock(&req->rq_lock);
851         EXIT;
852 }
853
854 /**
855  * Returns a boolean predicate indicating whether the policy described by
856  * \a desc is adequate for use with service \a svc.
857  *
858  * \param[in] svc  the service
859  * \param[in] desc the policy descriptor
860  *
861  * \retval false the policy is not compatible with the service
862  * \retval true  the policy is compatible with the service
863  */
864 static inline bool nrs_policy_compatible(const struct ptlrpc_service *svc,
865                                          const struct ptlrpc_nrs_pol_desc *desc)
866 {
867         return desc->pd_compat(svc, desc);
868 }
869
870 /**
871  * Registers all compatible policies in nrs_core.nrs_policies, for NRS head
872  * \a nrs.
873  *
874  * \param[in] nrs the NRS head
875  *
876  * \retval -ve error
877  * \retval   0 success
878  *
879  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
880  *
881  * \see ptlrpc_service_nrs_setup()
882  */
883 static int nrs_register_policies_locked(struct ptlrpc_nrs *nrs)
884 {
885         struct ptlrpc_nrs_pol_desc *desc;
886         /* for convenience */
887         struct ptlrpc_service_part       *svcpt = nrs->nrs_svcpt;
888         struct ptlrpc_service            *svc = svcpt->scp_service;
889         int                               rc = -EINVAL;
890         ENTRY;
891
892         LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
893
894         list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
895                 if (nrs_policy_compatible(svc, desc)) {
896                         rc = nrs_policy_register(nrs, desc);
897                         if (rc != 0) {
898                                 CERROR("Failed to register NRS policy %s for "
899                                        "partition %d of service %s: %d\n",
900                                        desc->pd_name, svcpt->scp_cpt,
901                                        svc->srv_name, rc);
902                                 /**
903                                  * Fail registration if any of the policies'
904                                  * registration fails.
905                                  */
906                                 break;
907                         }
908                 }
909         }
910
911         RETURN(rc);
912 }
913
914 /**
915  * Initializes NRS head \a nrs of service partition \a svcpt, and registers all
916  * compatible policies in NRS core, with the NRS head.
917  *
918  * \param[in] nrs   the NRS head
919  * \param[in] svcpt the PTLRPC service partition to setup
920  *
921  * \retval -ve error
922  * \retval   0 success
923  *
924  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
925  */
926 static int nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
927                                    struct ptlrpc_service_part *svcpt)
928 {
929         int                             rc;
930         enum ptlrpc_nrs_queue_type      queue;
931
932         LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
933
934         if (nrs == &svcpt->scp_nrs_reg)
935                 queue = PTLRPC_NRS_QUEUE_REG;
936         else if (nrs == svcpt->scp_nrs_hp)
937                 queue = PTLRPC_NRS_QUEUE_HP;
938         else
939                 LBUG();
940
941         nrs->nrs_svcpt = svcpt;
942         nrs->nrs_queue_type = queue;
943         spin_lock_init(&nrs->nrs_lock);
944         INIT_LIST_HEAD(&nrs->nrs_policy_list);
945         INIT_LIST_HEAD(&nrs->nrs_policy_queued);
946         nrs->nrs_throttling = 0;
947
948         rc = nrs_register_policies_locked(nrs);
949
950         RETURN(rc);
951 }
952
953 /**
954  * Allocates a regular and optionally a high-priority NRS head (if the service
955  * handles high-priority RPCs), and then registers all available compatible
956  * policies on those NRS heads.
957  *
958  * \param[in,out] svcpt the PTLRPC service partition to setup
959  *
960  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
961  */
962 static int nrs_svcpt_setup_locked(struct ptlrpc_service_part *svcpt)
963 {
964         struct ptlrpc_nrs              *nrs;
965         int                             rc;
966         ENTRY;
967
968         LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
969
970         /**
971          * Initialize the regular NRS head.
972          */
973         nrs = nrs_svcpt2nrs(svcpt, false);
974         rc = nrs_svcpt_setup_locked0(nrs, svcpt);
975         if (rc < 0)
976                 GOTO(out, rc);
977
978         /**
979          * Optionally allocate a high-priority NRS head.
980          */
981         if (svcpt->scp_service->srv_ops.so_hpreq_handler == NULL)
982                 GOTO(out, rc);
983
984         OBD_CPT_ALLOC_PTR(svcpt->scp_nrs_hp,
985                           svcpt->scp_service->srv_cptable,
986                           svcpt->scp_cpt);
987         if (svcpt->scp_nrs_hp == NULL)
988                 GOTO(out, rc = -ENOMEM);
989
990         nrs = nrs_svcpt2nrs(svcpt, true);
991         rc = nrs_svcpt_setup_locked0(nrs, svcpt);
992
993 out:
994         RETURN(rc);
995 }
996
997 /**
998  * Unregisters all policies on all available NRS heads in a service partition;
999  * called at PTLRPC service unregistration time.
1000  *
1001  * \param[in] svcpt the PTLRPC service partition
1002  *
1003  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
1004  */
1005 static void nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt)
1006 {
1007         struct ptlrpc_nrs              *nrs;
1008         struct ptlrpc_nrs_policy       *policy;
1009         struct ptlrpc_nrs_policy       *tmp;
1010         int                             rc;
1011         bool                            hp = false;
1012         ENTRY;
1013
1014         LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
1015
1016 again:
1017         /* scp_nrs_hp could be NULL due to short of memory. */
1018         nrs = hp ? svcpt->scp_nrs_hp : &svcpt->scp_nrs_reg;
1019         /* check the nrs_svcpt to see if nrs is initialized. */
1020         if (!nrs || !nrs->nrs_svcpt) {
1021                 EXIT;
1022                 return;
1023         }
1024         nrs->nrs_stopping = 1;
1025
1026         list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list,
1027                                      pol_list) {
1028                 rc = nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
1029                 LASSERT(rc == 0);
1030         }
1031
1032         /**
1033          * If the service partition has an HP NRS head, clean that up as well.
1034          */
1035         if (!hp && nrs_svcpt_has_hp(svcpt)) {
1036                 hp = true;
1037                 goto again;
1038         }
1039
1040         if (hp)
1041                 OBD_FREE_PTR(nrs);
1042
1043         EXIT;
1044 }
1045
1046 /**
1047  * Returns the descriptor for a policy as identified by by \a name.
1048  *
1049  * \param[in] name the policy name
1050  *
1051  * \retval the policy descriptor
1052  * \retval NULL
1053  */
1054 static struct ptlrpc_nrs_pol_desc *nrs_policy_find_desc_locked(const char *name)
1055 {
1056         struct ptlrpc_nrs_pol_desc     *tmp;
1057         ENTRY;
1058
1059         list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) {
1060                 if (strncmp(tmp->pd_name, name, NRS_POL_NAME_MAX) == 0)
1061                         RETURN(tmp);
1062         }
1063         RETURN(NULL);
1064 }
1065
1066 /**
1067  * Removes the policy from all supported NRS heads of all partitions of all
1068  * PTLRPC services.
1069  *
1070  * \param[in] desc the policy descriptor to unregister
1071  *
1072  * \retval -ve error
1073  * \retval  0  successfully unregistered policy on all supported NRS heads
1074  *
1075  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
1076  * \pre mutex_is_locked(&ptlrpc_all_services_mutex)
1077  */
1078 static int nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc)
1079 {
1080         struct ptlrpc_nrs              *nrs;
1081         struct ptlrpc_service          *svc;
1082         struct ptlrpc_service_part     *svcpt;
1083         int                             i;
1084         int                             rc = 0;
1085         ENTRY;
1086
1087         LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
1088         LASSERT(mutex_is_locked(&ptlrpc_all_services_mutex));
1089
1090         list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
1091
1092                 if (!nrs_policy_compatible(svc, desc) ||
1093                     unlikely(svc->srv_is_stopping))
1094                         continue;
1095
1096                 ptlrpc_service_for_each_part(svcpt, i, svc) {
1097                         bool hp = false;
1098
1099 again:
1100                         nrs = nrs_svcpt2nrs(svcpt, hp);
1101                         rc = nrs_policy_unregister(nrs, desc->pd_name);
1102                         /**
1103                          * Ignore -ENOENT as the policy may not have registered
1104                          * successfully on all service partitions.
1105                          */
1106                         if (rc == -ENOENT) {
1107                                 rc = 0;
1108                         } else if (rc != 0) {
1109                                 CERROR("Failed to unregister NRS policy %s for "
1110                                        "partition %d of service %s: %d\n",
1111                                        desc->pd_name, svcpt->scp_cpt,
1112                                        svcpt->scp_service->srv_name, rc);
1113                                 RETURN(rc);
1114                         }
1115
1116                         if (!hp && nrs_svc_has_hp(svc)) {
1117                                 hp = true;
1118                                 goto again;
1119                         }
1120                 }
1121
1122                 if (desc->pd_ops->op_lprocfs_fini != NULL)
1123                         desc->pd_ops->op_lprocfs_fini(svc);
1124         }
1125
1126         RETURN(rc);
1127 }
1128
1129 /**
1130  * Registers a new policy with NRS core.
1131  *
1132  * The function will only succeed if policy registration with all compatible
1133  * service partitions (if any) is successful.
1134  *
1135  * N.B. This function should be called either at ptlrpc module initialization
1136  *      time when registering a policy that ships with NRS core, or in a
1137  *      module's init() function for policies registering from other modules.
1138  *
1139  * \param[in] conf configuration information for the new policy to register
1140  *
1141  * \retval -ve error
1142  * \retval   0 success
1143  */
1144 int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
1145 {
1146         struct ptlrpc_service          *svc;
1147         struct ptlrpc_nrs_pol_desc     *desc;
1148         int                             rc = 0;
1149         ENTRY;
1150
1151         LASSERT(conf != NULL);
1152         LASSERT(conf->nc_ops != NULL);
1153         LASSERT(conf->nc_compat != NULL);
1154         LASSERT(ergo(conf->nc_compat == nrs_policy_compat_one,
1155                 conf->nc_compat_svc_name != NULL));
1156         LASSERT(ergo((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0,
1157                      conf->nc_owner != NULL));
1158
1159         conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0';
1160
1161         /**
1162          * External policies are not allowed to start immediately upon
1163          * registration, as there is a relatively higher chance that their
1164          * registration might fail. In such a case, some policy instances may
1165          * already have requests queued wen unregistration needs to happen as
1166          * part o cleanup; since there is currently no way to drain requests
1167          * from a policy unless the service is unregistering, we just disallow
1168          * this.
1169          */
1170         if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) &&
1171             (conf->nc_flags & (PTLRPC_NRS_FL_FALLBACK |
1172                                PTLRPC_NRS_FL_REG_START))) {
1173                 CERROR("NRS: failing to register policy %s. Please check "
1174                        "policy flags; external policies cannot act as fallback "
1175                        "policies, or be started immediately upon registration "
1176                        "without interaction with lprocfs\n", conf->nc_name);
1177                 RETURN(-EINVAL);
1178         }
1179
1180         mutex_lock(&nrs_core.nrs_mutex);
1181
1182         if (nrs_policy_find_desc_locked(conf->nc_name) != NULL) {
1183                 CERROR("NRS: failing to register policy %s which has already "
1184                        "been registered with NRS core!\n",
1185                        conf->nc_name);
1186                 GOTO(fail, rc = -EEXIST);
1187         }
1188
1189         OBD_ALLOC_PTR(desc);
1190         if (desc == NULL)
1191                 GOTO(fail, rc = -ENOMEM);
1192
1193         if (strlcpy(desc->pd_name, conf->nc_name, sizeof(desc->pd_name)) >=
1194             sizeof(desc->pd_name)) {
1195                 OBD_FREE_PTR(desc);
1196                 GOTO(fail, rc = -E2BIG);
1197         }
1198         desc->pd_ops             = conf->nc_ops;
1199         desc->pd_compat          = conf->nc_compat;
1200         desc->pd_compat_svc_name = conf->nc_compat_svc_name;
1201         if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0)
1202                 desc->pd_owner   = conf->nc_owner;
1203         desc->pd_flags           = conf->nc_flags;
1204         atomic_set(&desc->pd_refs, 0);
1205
1206         /**
1207          * For policies that are held in the same module as NRS (currently
1208          * ptlrpc), do not register the policy with all compatible services,
1209          * as the services will not have started at this point, since we are
1210          * calling from ptlrpc module initialization code. In such cases each
1211          * service will register all compatible policies later, via
1212          * ptlrpc_service_nrs_setup().
1213          */
1214         if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) == 0)
1215                 goto internal;
1216
1217         /**
1218          * Register the new policy on all compatible services
1219          */
1220         mutex_lock(&ptlrpc_all_services_mutex);
1221
1222         list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
1223                 struct ptlrpc_service_part     *svcpt;
1224                 int                             i;
1225                 int                             rc2;
1226
1227                 if (!nrs_policy_compatible(svc, desc) ||
1228                     unlikely(svc->srv_is_stopping))
1229                         continue;
1230
1231                 ptlrpc_service_for_each_part(svcpt, i, svc) {
1232                         struct ptlrpc_nrs      *nrs;
1233                         bool                    hp = false;
1234 again:
1235                         nrs = nrs_svcpt2nrs(svcpt, hp);
1236                         rc = nrs_policy_register(nrs, desc);
1237                         if (rc != 0) {
1238                                 CERROR("Failed to register NRS policy %s for "
1239                                        "partition %d of service %s: %d\n",
1240                                        desc->pd_name, svcpt->scp_cpt,
1241                                        svcpt->scp_service->srv_name, rc);
1242
1243                                 rc2 = nrs_policy_unregister_locked(desc);
1244                                 /**
1245                                  * Should not fail at this point
1246                                  */
1247                                 LASSERT(rc2 == 0);
1248                                 mutex_unlock(&ptlrpc_all_services_mutex);
1249                                 OBD_FREE_PTR(desc);
1250                                 GOTO(fail, rc);
1251                         }
1252
1253                         if (!hp && nrs_svc_has_hp(svc)) {
1254                                 hp = true;
1255                                 goto again;
1256                         }
1257                 }
1258
1259                 /**
1260                  * No need to take a reference to other modules here, as we
1261                  * will be calling from the module's init() function.
1262                  */
1263                 if (desc->pd_ops->op_lprocfs_init != NULL) {
1264                         rc = desc->pd_ops->op_lprocfs_init(svc);
1265                         if (rc != 0) {
1266                                 rc2 = nrs_policy_unregister_locked(desc);
1267                                 /**
1268                                  * Should not fail at this point
1269                                  */
1270                                 LASSERT(rc2 == 0);
1271                                 mutex_unlock(&ptlrpc_all_services_mutex);
1272                                 OBD_FREE_PTR(desc);
1273                                 GOTO(fail, rc);
1274                         }
1275                 }
1276         }
1277
1278         mutex_unlock(&ptlrpc_all_services_mutex);
1279 internal:
1280         list_add_tail(&desc->pd_list, &nrs_core.nrs_policies);
1281 fail:
1282         mutex_unlock(&nrs_core.nrs_mutex);
1283
1284         RETURN(rc);
1285 }
1286 EXPORT_SYMBOL(ptlrpc_nrs_policy_register);
1287
1288 /**
1289  * Unregisters a previously registered policy with NRS core. All instances of
1290  * the policy on all NRS heads of all supported services are removed.
1291  *
1292  * N.B. This function should only be called from a module's exit() function.
1293  *      Although it can be used for policies that ship alongside NRS core, the
1294  *      function is primarily intended for policies that register externally,
1295  *      from other modules.
1296  *
1297  * \param[in] conf configuration information for the policy to unregister
1298  *
1299  * \retval -ve error
1300  * \retval   0 success
1301  */
1302 int ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_conf *conf)
1303 {
1304         struct ptlrpc_nrs_pol_desc      *desc;
1305         int                              rc;
1306         ENTRY;
1307
1308         LASSERT(conf != NULL);
1309
1310         if (conf->nc_flags & PTLRPC_NRS_FL_FALLBACK) {
1311                 CERROR("Unable to unregister a fallback policy, unless the "
1312                        "PTLRPC service is stopping.\n");
1313                 RETURN(-EPERM);
1314         }
1315
1316         conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0';
1317
1318         mutex_lock(&nrs_core.nrs_mutex);
1319
1320         desc = nrs_policy_find_desc_locked(conf->nc_name);
1321         if (desc == NULL) {
1322                 CERROR("Failing to unregister NRS policy %s which has "
1323                        "not been registered with NRS core!\n",
1324                        conf->nc_name);
1325                 GOTO(not_exist, rc = -ENOENT);
1326         }
1327
1328         mutex_lock(&ptlrpc_all_services_mutex);
1329
1330         rc = nrs_policy_unregister_locked(desc);
1331         if (rc < 0) {
1332                 if (rc == -EBUSY)
1333                         CERROR("Please first stop policy %s on all service "
1334                                "partitions and then retry to unregister the "
1335                                "policy.\n", conf->nc_name);
1336                 GOTO(fail, rc);
1337         }
1338
1339         CDEBUG(D_INFO, "Unregistering policy %s from NRS core.\n",
1340                conf->nc_name);
1341
1342         list_del(&desc->pd_list);
1343         OBD_FREE_PTR(desc);
1344
1345 fail:
1346         mutex_unlock(&ptlrpc_all_services_mutex);
1347
1348 not_exist:
1349         mutex_unlock(&nrs_core.nrs_mutex);
1350
1351         RETURN(rc);
1352 }
1353 EXPORT_SYMBOL(ptlrpc_nrs_policy_unregister);
1354
1355 /**
1356  * Setup NRS heads on all service partitions of service \a svc, and register
1357  * all compatible policies on those NRS heads.
1358  *
1359  * To be called from withing ptl
1360  * \param[in] svc the service to setup
1361  *
1362  * \retval -ve error, the calling logic should eventually call
1363  *                    ptlrpc_service_nrs_cleanup() to undo any work performed
1364  *                    by this function.
1365  *
1366  * \see ptlrpc_register_service()
1367  * \see ptlrpc_service_nrs_cleanup()
1368  */
1369 int ptlrpc_service_nrs_setup(struct ptlrpc_service *svc)
1370 {
1371         struct ptlrpc_service_part             *svcpt;
1372         const struct ptlrpc_nrs_pol_desc       *desc;
1373         int                                     i;
1374         int                                     rc = 0;
1375
1376         mutex_lock(&nrs_core.nrs_mutex);
1377
1378         /**
1379          * Initialize NRS heads on all service CPTs.
1380          */
1381         ptlrpc_service_for_each_part(svcpt, i, svc) {
1382                 rc = nrs_svcpt_setup_locked(svcpt);
1383                 if (rc != 0)
1384                         GOTO(failed, rc);
1385         }
1386
1387         /**
1388          * Set up lprocfs interfaces for all supported policies for the
1389          * service.
1390          */
1391         list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
1392                 if (!nrs_policy_compatible(svc, desc))
1393                         continue;
1394
1395                 if (desc->pd_ops->op_lprocfs_init != NULL) {
1396                         rc = desc->pd_ops->op_lprocfs_init(svc);
1397                         if (rc != 0)
1398                                 GOTO(failed, rc);
1399                 }
1400         }
1401
1402 failed:
1403
1404         mutex_unlock(&nrs_core.nrs_mutex);
1405
1406         RETURN(rc);
1407 }
1408
1409 /**
1410  * Unregisters all policies on all service partitions of service \a svc.
1411  *
1412  * \param[in] svc the PTLRPC service to unregister
1413  */
1414 void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc)
1415 {
1416         struct ptlrpc_service_part           *svcpt;
1417         const struct ptlrpc_nrs_pol_desc     *desc;
1418         int                                   i;
1419
1420         mutex_lock(&nrs_core.nrs_mutex);
1421
1422         /**
1423          * Clean up NRS heads on all service partitions
1424          */
1425         ptlrpc_service_for_each_part(svcpt, i, svc)
1426                 nrs_svcpt_cleanup_locked(svcpt);
1427
1428         /**
1429          * Clean up lprocfs interfaces for all supported policies for the
1430          * service.
1431          */
1432         list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
1433                 if (!nrs_policy_compatible(svc, desc))
1434                         continue;
1435
1436                 if (desc->pd_ops->op_lprocfs_fini != NULL)
1437                         desc->pd_ops->op_lprocfs_fini(svc);
1438         }
1439
1440         mutex_unlock(&nrs_core.nrs_mutex);
1441 }
1442
1443 /**
1444  * Obtains NRS head resources for request \a req.
1445  *
1446  * These could be either on the regular or HP NRS head of \a svcpt; resources
1447  * taken on the regular head can later be swapped for HP head resources by
1448  * ldlm_lock_reorder_req().
1449  *
1450  * \param[in] svcpt the service partition
1451  * \param[in] req   the request
1452  * \param[in] hp    which NRS head of \a svcpt to use
1453  */
1454 void ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
1455                                struct ptlrpc_request *req, bool hp)
1456 {
1457         struct ptlrpc_nrs       *nrs = nrs_svcpt2nrs(svcpt, hp);
1458
1459         memset(&req->rq_nrq, 0, sizeof(req->rq_nrq));
1460         nrs_resource_get_safe(nrs, &req->rq_nrq, req->rq_nrq.nr_res_ptrs,
1461                               false);
1462
1463         /**
1464          * It is fine to access \e nr_initialized without locking as there is
1465          * no contention at this early stage.
1466          */
1467         req->rq_nrq.nr_initialized = 1;
1468 }
1469
1470 /**
1471  * Releases resources for a request; is called after the request has been
1472  * handled.
1473  *
1474  * \param[in] req the request
1475  *
1476  * \see ptlrpc_server_finish_request()
1477  */
1478 void ptlrpc_nrs_req_finalize(struct ptlrpc_request *req)
1479 {
1480         if (req->rq_nrq.nr_initialized) {
1481                 nrs_resource_put_safe(req->rq_nrq.nr_res_ptrs);
1482                 /* no protection on bit nr_initialized because no
1483                  * contention at this late stage */
1484                 req->rq_nrq.nr_finalized = 1;
1485         }
1486 }
1487
1488 void ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req)
1489 {
1490         if (req->rq_nrq.nr_started)
1491                 nrs_request_stop(&req->rq_nrq);
1492 }
1493
1494 /**
1495  * Enqueues request \a req on either the regular or high-priority NRS head
1496  * of service partition \a svcpt.
1497  *
1498  * \param[in] svcpt the service partition
1499  * \param[in] req   the request to be enqueued
1500  * \param[in] hp    whether to enqueue the request on the regular or
1501  *                  high-priority NRS head.
1502  */
1503 void ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
1504                         struct ptlrpc_request *req, bool hp)
1505 {
1506         spin_lock(&svcpt->scp_req_lock);
1507
1508         if (hp)
1509                 ptlrpc_nrs_hpreq_add_nolock(req);
1510         else
1511                 ptlrpc_nrs_req_add_nolock(req);
1512
1513         spin_unlock(&svcpt->scp_req_lock);
1514 }
1515
1516 static void nrs_request_removed(struct ptlrpc_nrs_policy *policy)
1517 {
1518         LASSERT(policy->pol_nrs->nrs_req_queued > 0);
1519         LASSERT(policy->pol_req_queued > 0);
1520
1521         policy->pol_nrs->nrs_req_queued--;
1522         policy->pol_req_queued--;
1523
1524         /**
1525          * If the policy has no more requests queued, remove it from
1526          * ptlrpc_nrs::nrs_policy_queued.
1527          */
1528         if (unlikely(policy->pol_req_queued == 0)) {
1529                 list_del_init(&policy->pol_list_queued);
1530
1531                 /**
1532                  * If there are other policies with queued requests, move the
1533                  * current policy to the end so that we can round robin over
1534                  * all policies and drain the requests.
1535                  */
1536         } else if (policy->pol_req_queued != policy->pol_nrs->nrs_req_queued) {
1537                 LASSERT(policy->pol_req_queued <
1538                         policy->pol_nrs->nrs_req_queued);
1539
1540                 list_move_tail(&policy->pol_list_queued,
1541                                    &policy->pol_nrs->nrs_policy_queued);
1542         }
1543 }
1544
1545 /**
1546  * Obtains a request for handling from an NRS head of service partition
1547  * \a svcpt.
1548  *
1549  * \param[in] svcpt the service partition
1550  * \param[in] hp    whether to obtain a request from the regular or
1551  *                  high-priority NRS head.
1552  * \param[in] peek  when set, signifies that we just want to examine the
1553  *                  request, and not handle it, so the request is not removed
1554  *                  from the policy.
1555  * \param[in] force when set, it will force a policy to return a request if it
1556  *                  has one pending
1557  *
1558  * \retval the  request to be handled
1559  * \retval NULL the head has no requests to serve
1560  */
1561 struct ptlrpc_request *
1562 ptlrpc_nrs_req_get_nolock0(struct ptlrpc_service_part *svcpt, bool hp,
1563                            bool peek, bool force)
1564 {
1565         struct ptlrpc_nrs         *nrs = nrs_svcpt2nrs(svcpt, hp);
1566         struct ptlrpc_nrs_policy  *policy;
1567         struct ptlrpc_nrs_request *nrq;
1568
1569         /**
1570          * Always try to drain requests from all NRS polices even if they are
1571          * inactive, because the user can change policy status at runtime.
1572          */
1573         list_for_each_entry(policy, &nrs->nrs_policy_queued,
1574                                 pol_list_queued) {
1575                 nrq = nrs_request_get(policy, peek, force);
1576                 if (nrq != NULL) {
1577                         if (likely(!peek)) {
1578                                 nrq->nr_started = 1;
1579
1580                                 policy->pol_req_started++;
1581                                 policy->pol_nrs->nrs_req_started++;
1582
1583                                 nrs_request_removed(policy);
1584                         }
1585
1586                         return container_of(nrq, struct ptlrpc_request, rq_nrq);
1587                 }
1588         }
1589
1590         return NULL;
1591 }
1592
1593 /**
1594  * Dequeues request \a req from the policy it has been enqueued on.
1595  *
1596  * \param[in] req the request
1597  */
1598 void ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req)
1599 {
1600         struct ptlrpc_nrs_policy *policy = nrs_request_policy(&req->rq_nrq);
1601
1602         policy->pol_desc->pd_ops->op_req_dequeue(policy, &req->rq_nrq);
1603
1604         req->rq_nrq.nr_enqueued = 0;
1605
1606         nrs_request_removed(policy);
1607 }
1608
1609 /**
1610  * Returns whether there are any requests currently enqueued on any of the
1611  * policies of service partition's \a svcpt NRS head specified by \a hp. Should
1612  * be called while holding ptlrpc_service_part::scp_req_lock to get a reliable
1613  * result.
1614  *
1615  * \param[in] svcpt the service partition to enquire.
1616  * \param[in] hp    whether the regular or high-priority NRS head is to be
1617  *                  enquired.
1618  *
1619  * \retval false the indicated NRS head has no enqueued requests.
1620  * \retval true  the indicated NRS head has some enqueued requests.
1621  */
1622 bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
1623 {
1624         struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
1625
1626         return nrs->nrs_req_queued > 0;
1627 };
1628
1629 /**
1630  * Returns whether NRS policy is throttling reqeust
1631  *
1632  * \param[in] svcpt the service partition to enquire.
1633  * \param[in] hp    whether the regular or high-priority NRS head is to be
1634  *                  enquired.
1635  *
1636  * \retval false the indicated NRS head has no enqueued requests.
1637  * \retval true  the indicated NRS head has some enqueued requests.
1638  */
1639 bool ptlrpc_nrs_req_throttling_nolock(struct ptlrpc_service_part *svcpt,
1640                                       bool hp)
1641 {
1642         struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
1643
1644         return !!nrs->nrs_throttling;
1645 };
1646
1647 /**
1648  * Moves request \a req from the regular to the high-priority NRS head.
1649  *
1650  * \param[in] req the request to move
1651  */
1652 void ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req)
1653 {
1654         struct ptlrpc_service_part      *svcpt = req->rq_rqbd->rqbd_svcpt;
1655         struct ptlrpc_nrs_request       *nrq = &req->rq_nrq;
1656         struct ptlrpc_nrs_resource      *res1[NRS_RES_MAX];
1657         struct ptlrpc_nrs_resource      *res2[NRS_RES_MAX];
1658         ENTRY;
1659
1660         /**
1661          * Obtain the high-priority NRS head resources.
1662          */
1663         nrs_resource_get_safe(nrs_svcpt2nrs(svcpt, true), nrq, res1, true);
1664
1665         spin_lock(&svcpt->scp_req_lock);
1666
1667         if (!ptlrpc_nrs_req_can_move(req))
1668                 goto out;
1669
1670         ptlrpc_nrs_req_del_nolock(req);
1671
1672         memcpy(res2, nrq->nr_res_ptrs, NRS_RES_MAX * sizeof(res2[0]));
1673         memcpy(nrq->nr_res_ptrs, res1, NRS_RES_MAX * sizeof(res1[0]));
1674
1675         ptlrpc_nrs_hpreq_add_nolock(req);
1676
1677         memcpy(res1, res2, NRS_RES_MAX * sizeof(res1[0]));
1678 out:
1679         spin_unlock(&svcpt->scp_req_lock);
1680
1681         /**
1682          * Release either the regular NRS head resources if we moved the
1683          * request, or the high-priority NRS head resources if we took a
1684          * reference earlier in this function and ptlrpc_nrs_req_can_move()
1685          * returned false.
1686          */
1687         nrs_resource_put_safe(res1);
1688         EXIT;
1689 }
1690
1691 /**
1692  * Carries out a control operation \a opc on the policy identified by the
1693  * human-readable \a name, on either all partitions, or only on the first
1694  * partition of service \a svc.
1695  *
1696  * \param[in]     svc    the service the policy belongs to.
1697  * \param[in]     queue  whether to carry out the command on the policy which
1698  *                       belongs to the regular, high-priority, or both NRS
1699  *                       heads of service partitions of \a svc.
1700  * \param[in]     name   the policy to act upon, by human-readable name
1701  * \param[in]     opc    the opcode of the operation to carry out
1702  * \param[in]     single when set, the operation will only be carried out on the
1703  *                       NRS heads of the first service partition of \a svc.
1704  *                       This is useful for some policies which e.g. share
1705  *                       identical values on the same parameters of different
1706  *                       service partitions; when reading these parameters via
1707  *                       lprocfs, these policies may just want to obtain and
1708  *                       print out the values from the first service partition.
1709  *                       Storing these values centrally elsewhere then could be
1710  *                       another solution for this.
1711  * \param[in,out] arg    can be used as a generic in/out buffer between control
1712  *                       operations and the user environment.
1713  *
1714  *\retval -ve error condition
1715  *\retval   0 operation was carried out successfully
1716  */
1717 int ptlrpc_nrs_policy_control(const struct ptlrpc_service *svc,
1718                               enum ptlrpc_nrs_queue_type queue, char *name,
1719                               enum ptlrpc_nrs_ctl opc, bool single, void *arg)
1720 {
1721         struct ptlrpc_service_part     *svcpt;
1722         int                             i;
1723         int                             rc = 0;
1724         ENTRY;
1725
1726         LASSERT(opc != PTLRPC_NRS_CTL_INVALID);
1727
1728         if ((queue & PTLRPC_NRS_QUEUE_BOTH) == 0)
1729                 return -EINVAL;
1730
1731         ptlrpc_service_for_each_part(svcpt, i, svc) {
1732                 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1733                         rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, false), name,
1734                                             opc, arg);
1735                         if (rc != 0 || (queue == PTLRPC_NRS_QUEUE_REG &&
1736                                         single))
1737                                 GOTO(out, rc);
1738                 }
1739
1740                 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1741                         /**
1742                          * XXX: We could optionally check for
1743                          * nrs_svc_has_hp(svc) here, and return an error if it
1744                          * is false. Right now we rely on the policies' lprocfs
1745                          * handlers that call the present function to make this
1746                          * check; if they fail to do so, they might hit the
1747                          * assertion inside nrs_svcpt2nrs() below.
1748                          */
1749                         rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, true), name,
1750                                             opc, arg);
1751                         if (rc != 0 || single)
1752                                 GOTO(out, rc);
1753                 }
1754         }
1755 out:
1756         RETURN(rc);
1757 }
1758
1759 /**
1760  * Adds all policies that ship with the ptlrpc module, to NRS core's list of
1761  * policies \e nrs_core.nrs_policies.
1762  *
1763  * \retval 0 all policies have been registered successfully
1764  * \retval -ve error
1765  */
1766 int ptlrpc_nrs_init(void)
1767 {
1768         int     rc;
1769         ENTRY;
1770
1771         mutex_init(&nrs_core.nrs_mutex);
1772         INIT_LIST_HEAD(&nrs_core.nrs_policies);
1773
1774         rc = ptlrpc_nrs_policy_register(&nrs_conf_fifo);
1775         if (rc != 0)
1776                 GOTO(fail, rc);
1777
1778 #ifdef HAVE_SERVER_SUPPORT
1779         rc = ptlrpc_nrs_policy_register(&nrs_conf_crrn);
1780         if (rc != 0)
1781                 GOTO(fail, rc);
1782
1783         rc = ptlrpc_nrs_policy_register(&nrs_conf_orr);
1784         if (rc != 0)
1785                 GOTO(fail, rc);
1786
1787         rc = ptlrpc_nrs_policy_register(&nrs_conf_trr);
1788         if (rc != 0)
1789                 GOTO(fail, rc);
1790         rc = ptlrpc_nrs_policy_register(&nrs_conf_tbf);
1791         if (rc != 0)
1792                 GOTO(fail, rc);
1793 #endif /* HAVE_SERVER_SUPPORT */
1794
1795         RETURN(rc);
1796 fail:
1797         /**
1798          * Since no PTLRPC services have been started at this point, all we need
1799          * to do for cleanup is to free the descriptors.
1800          */
1801         ptlrpc_nrs_fini();
1802
1803         RETURN(rc);
1804 }
1805
1806 /**
1807  * Removes all policy descriptors from nrs_core::nrs_policies, and frees the
1808  * policy descriptors.
1809  *
1810  * Since all PTLRPC services are stopped at this point, there are no more
1811  * instances of any policies, because each service will have stopped its policy
1812  * instances in ptlrpc_service_nrs_cleanup(), so we just need to free the
1813  * descriptors here.
1814  */
1815 void ptlrpc_nrs_fini(void)
1816 {
1817         struct ptlrpc_nrs_pol_desc *desc;
1818         struct ptlrpc_nrs_pol_desc *tmp;
1819
1820         list_for_each_entry_safe(desc, tmp, &nrs_core.nrs_policies,
1821                                      pd_list) {
1822                 list_del_init(&desc->pd_list);
1823                 OBD_FREE_PTR(desc);
1824         }
1825 }
1826
1827 /** @} nrs */