Whamcloud - gitweb
LU-1866 misc: fix some issues found during LFSCK
[fs/lustre-release.git] / lustre / ptlrpc / nrs.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2011 Intel Corporation
24  *
25  * Copyright 2012 Xyratex Technology Limited
26  */
27 /*
28  * lustre/ptlrpc/nrs.c
29  *
30  * Network Request Scheduler (NRS)
31  *
32  * Allows to reorder the handling of RPCs at servers.
33  *
34  * Author: Liang Zhen <liang@whamcloud.com>
35  * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
36  */
37 /**
38  * \addtogoup nrs
39  * @{
40  */
41
42 #define DEBUG_SUBSYSTEM S_RPC
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_support.h>
47 #include <obd_class.h>
48 #include <lustre_net.h>
49 #include <lprocfs_status.h>
50 #include <libcfs/libcfs.h>
51 #include "ptlrpc_internal.h"
52
53 /* XXX: This is just for liblustre. Remove the #if defined directive when the
54  * "cfs_" prefix is dropped from cfs_list_head. */
55 #if defined (__linux__) && defined(__KERNEL__)
56 extern struct list_head ptlrpc_all_services;
57 #else
58 extern struct cfs_list_head ptlrpc_all_services;
59 #endif
60
61 /**
62  * NRS core object.
63  */
64 struct nrs_core nrs_core;
65
66 static int
67 nrs_policy_init(struct ptlrpc_nrs_policy *policy)
68 {
69         return policy->pol_ops->op_policy_init != NULL ?
70                policy->pol_ops->op_policy_init(policy) : 0;
71 }
72
73 static void
74 nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
75 {
76         LASSERT(policy->pol_ref == 0);
77         LASSERT(policy->pol_req_queued == 0);
78
79         if (policy->pol_ops->op_policy_fini != NULL)
80                 policy->pol_ops->op_policy_fini(policy);
81 }
82
83 static int
84 nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc,
85                       void *arg)
86 {
87         return policy->pol_ops->op_policy_ctl != NULL ?
88                policy->pol_ops->op_policy_ctl(policy, opc, arg) : -ENOSYS;
89 }
90
91 static void
92 nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
93 {
94         struct ptlrpc_nrs *nrs = policy->pol_nrs;
95         ENTRY;
96
97         if (policy->pol_ops->op_policy_stop != NULL) {
98                 spin_unlock(&nrs->nrs_lock);
99
100                 policy->pol_ops->op_policy_stop(policy);
101
102                 spin_lock(&nrs->nrs_lock);
103         }
104
105         LASSERT(cfs_list_empty(&policy->pol_list_queued));
106         LASSERT(policy->pol_req_queued == 0 &&
107                 policy->pol_req_started == 0);
108
109         policy->pol_private = NULL;
110
111         policy->pol_state = NRS_POL_STATE_STOPPED;
112         EXIT;
113 }
114
115 static int
116 nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
117 {
118         struct ptlrpc_nrs *nrs = policy->pol_nrs;
119         ENTRY;
120
121         if (nrs->nrs_policy_fallback == policy && !nrs->nrs_stopping)
122                 RETURN(-EPERM);
123
124         if (policy->pol_state == NRS_POL_STATE_STARTING)
125                 RETURN(-EAGAIN);
126
127         /* In progress or already stopped */
128         if (policy->pol_state != NRS_POL_STATE_STARTED)
129                 RETURN(0);
130
131         policy->pol_state = NRS_POL_STATE_STOPPING;
132
133         /* Immediately make it invisible */
134         if (nrs->nrs_policy_primary == policy) {
135                 nrs->nrs_policy_primary = NULL;
136
137         } else {
138                 LASSERT(nrs->nrs_policy_fallback == policy);
139                 nrs->nrs_policy_fallback = NULL;
140         }
141
142         /* I have the only refcount */
143         if (policy->pol_ref == 1)
144                 nrs_policy_stop0(policy);
145
146         RETURN(0);
147 }
148
149 /**
150  * Transitions the \a nrs NRS head's primary policy to
151  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING and if the policy has no
152  * pending usage references, to ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED.
153  *
154  * \param[in] nrs The NRS head to carry out this operation on
155  */
156 static void
157 nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
158 {
159         struct ptlrpc_nrs_policy *tmp = nrs->nrs_policy_primary;
160         ENTRY;
161
162         if (tmp == NULL) {
163                 /**
164                  * XXX: This should really be RETURN_EXIT, but the latter does
165                  * not currently print anything out, and possibly should be
166                  * fixed to do so.
167                  */
168                 EXIT;
169                 return;
170         }
171
172         nrs->nrs_policy_primary = NULL;
173
174         LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED);
175         tmp->pol_state = NRS_POL_STATE_STOPPING;
176
177         if (tmp->pol_ref == 0)
178                 nrs_policy_stop0(tmp);
179         EXIT;
180 }
181
182 /**
183  * Transitions a policy across the ptlrpc_nrs_pol_state range of values, in
184  * response to an lprocfs command to start a policy.
185  *
186  * If a primary policy different to the current one is specified, this function
187  * will transition the new policy to the
188  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTING and then to
189  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED, and will then transition
190  * the old primary policy (if there is one) to
191  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING, and if there are no outstanding
192  * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED.
193  *
194  * If the fallback policy is specified, this is taken to indicate an instruction
195  * to stop the current primary policy, without substituting it with another
196  * primary policy, so the primary policy (if any) is transitioned to
197  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING, and if there are no outstanding
198  * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. In
199  * this case, the fallback policy is only left active in the NRS head.
200  */
201 static int
202 nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
203 {
204         struct ptlrpc_nrs      *nrs = policy->pol_nrs;
205         int                     rc = 0;
206         ENTRY;
207
208         /**
209          * Don't allow multiple starting which is too complex, and has no real
210          * benefit.
211          */
212         if (nrs->nrs_policy_starting)
213                 RETURN(-EAGAIN);
214
215         LASSERT(policy->pol_state != NRS_POL_STATE_STARTING);
216
217         if (policy->pol_state == NRS_POL_STATE_STOPPING ||
218             policy->pol_state == NRS_POL_STATE_UNAVAIL)
219                 RETURN(-EAGAIN);
220
221         if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
222                 /**
223                  * This is for cases in which the user sets the policy to the
224                  * fallback policy (currently fifo for all services); i.e. the
225                  * user is resetting the policy to the default; so we stop the
226                  * primary policy, if any.
227                  */
228                 if (policy == nrs->nrs_policy_fallback) {
229                         nrs_policy_stop_primary(nrs);
230
231                         RETURN(0);
232                 }
233
234                 /**
235                  * If we reach here, we must be setting up the fallback policy
236                  * at service startup time, and only a single policy with the
237                  * nrs_policy_flags::PTLRPC_NRS_FL_FALLBACK flag set can
238                  * register with NRS core.
239                  */
240                 LASSERT(nrs->nrs_policy_fallback == NULL);
241         } else {
242                 /**
243                  * Shouldn't start primary policy if w/o fallback policy.
244                  */
245                 if (nrs->nrs_policy_fallback == NULL)
246                         RETURN(-EPERM);
247
248                 if (policy->pol_state == NRS_POL_STATE_STARTED)
249                         RETURN(0);
250         }
251
252         /**
253          * Serialize policy starting.across the NRS head
254          */
255         nrs->nrs_policy_starting = 1;
256
257         policy->pol_state = NRS_POL_STATE_STARTING;
258
259         if (policy->pol_ops->op_policy_start) {
260                 spin_unlock(&nrs->nrs_lock);
261
262                 rc = policy->pol_ops->op_policy_start(policy);
263
264                 spin_lock(&nrs->nrs_lock);
265                 if (rc != 0) {
266                         policy->pol_state = NRS_POL_STATE_STOPPED;
267                         GOTO(out, rc);
268                 }
269         }
270
271         policy->pol_state = NRS_POL_STATE_STARTED;
272
273         if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
274                 /**
275                  * This path is only used at PTLRPC service setup time.
276                  */
277                 nrs->nrs_policy_fallback = policy;
278         } else {
279                 /*
280                  * Try to stop the current primary policy if there is one.
281                  */
282                 nrs_policy_stop_primary(nrs);
283
284                 /**
285                  * And set the newly-started policy as the primary one.
286                  */
287                 nrs->nrs_policy_primary = policy;
288         }
289
290 out:
291         nrs->nrs_policy_starting = 0;
292
293         RETURN(rc);
294 }
295
296 /**
297  * Increases the policy's usage reference count.
298  */
299 static void
300 nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
301 {
302         policy->pol_ref++;
303 }
304
305 /**
306  * Decreases the policy's usage reference count, and stops the policy in case it
307  * was already stopping and have no more outstanding usage references (which
308  * indicates it has no more queued or started requests, and can be safely
309  * stopped).
310  */
311 static void
312 nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
313 {
314         LASSERT(policy->pol_ref > 0);
315
316         policy->pol_ref--;
317         if (unlikely(policy->pol_ref == 0 &&
318             policy->pol_state == NRS_POL_STATE_STOPPING))
319                 nrs_policy_stop0(policy);
320 }
321
322 static void
323 nrs_policy_put(struct ptlrpc_nrs_policy *policy)
324 {
325         spin_lock(&policy->pol_nrs->nrs_lock);
326         nrs_policy_put_locked(policy);
327         spin_unlock(&policy->pol_nrs->nrs_lock);
328 }
329
330 /**
331  * Find and return a policy by name.
332  */
333 static struct ptlrpc_nrs_policy *
334 nrs_policy_find_locked(struct ptlrpc_nrs *nrs, char *name)
335 {
336         struct ptlrpc_nrs_policy *tmp;
337
338         cfs_list_for_each_entry(tmp, &(nrs)->nrs_policy_list, pol_list) {
339                 if (strncmp(tmp->pol_name, name, NRS_POL_NAME_MAX) == 0) {
340                         nrs_policy_get_locked(tmp);
341                         return tmp;
342                 }
343         }
344         return NULL;
345 }
346
347 /**
348  * Release references for the resource hierarchy moving upwards towards the
349  * policy instance resource.
350  */
351 static void
352 nrs_resource_put(struct ptlrpc_nrs_resource *res)
353 {
354         struct ptlrpc_nrs_policy *policy = res->res_policy;
355
356         if (policy->pol_ops->op_res_put != NULL) {
357                 struct ptlrpc_nrs_resource *parent;
358
359                 for (; res != NULL; res = parent) {
360                         parent = res->res_parent;
361                         policy->pol_ops->op_res_put(policy, res);
362                 }
363         }
364 }
365
366 /**
367  * Obtains references for each resource in the resource hierarchy for request
368  * \a nrq if it is to be handled by \a policy.
369  *
370  * \param[in] policy      The policy
371  * \param[in] nrq         The request
372  * \param[in] moving_req  Denotes whether this is a call to the function by
373  *                        ldlm_lock_reorder_req(), in order to move \a nrq to
374  *                        the high-priority NRS head; we should not sleep when
375  *                        set.
376  *
377  * \retval NULL Resource hierarchy references not obtained
378  * \retval valid-pointer  The bottom level of the resource hierarchy
379  *
380  * \see ptlrpc_nrs_pol_ops::op_res_get()
381  */
382 static struct ptlrpc_nrs_resource *
383 nrs_resource_get(struct ptlrpc_nrs_policy *policy,
384                  struct ptlrpc_nrs_request *nrq, bool moving_req)
385 {
386         /**
387          * Set to NULL to traverse the resource hierarchy from the top.
388          */
389         struct ptlrpc_nrs_resource *res = NULL;
390         struct ptlrpc_nrs_resource *tmp = NULL;
391         int                         rc;
392
393         while (1) {
394                 rc = policy->pol_ops->op_res_get(policy, nrq, res, &tmp,
395                                                  moving_req);
396                 if (rc < 0) {
397                         if (res != NULL)
398                                 nrs_resource_put(res);
399                         return NULL;
400                 }
401
402                 LASSERT(tmp != NULL);
403                 tmp->res_parent = res;
404                 tmp->res_policy = policy;
405                 res = tmp;
406                 tmp = NULL;
407                 /**
408                  * Return once we have obtained a reference to the bottom level
409                  * of the resource hierarchy.
410                  */
411                 if (rc > 0)
412                         return res;
413         }
414 }
415
416 /**
417  * Obtains resources for the resource hierarchies and policy references for
418  * the fallback and current primary policy (if any), that will later be used
419  * to handle request \a nrq.
420  *
421  * \param[in]  nrs  The NRS head instance that will be handling request \a nrq.
422  * \param[in]  nrq  The request that is being handled.
423  * \param[out] resp The array where references to the resource hierarchy are
424  *                  stored.
425  * \param[in]  moving_req  Is set when obtaining resources while moving a
426  *                         request from a policy on the regular NRS head to a
427  *                         policy on the HP NRS head (via
428  *                         ldlm_lock_reorder_req()). It signifies that
429  *                         allocations to get resources should be atomic; for
430  *                         a full explanation, see comment in
431  *                         ptlrpc_nrs_pol_ops::op_res_get().
432  */
433 static void
434 nrs_resource_get_safe(struct ptlrpc_nrs *nrs, struct ptlrpc_nrs_request *nrq,
435                       struct ptlrpc_nrs_resource **resp, bool moving_req)
436 {
437         struct ptlrpc_nrs_policy   *primary = NULL;
438         struct ptlrpc_nrs_policy   *fallback = NULL;
439
440         memset(resp, 0, sizeof(resp[0]) * NRS_RES_MAX);
441
442         /**
443          * Obtain policy references.
444          */
445         spin_lock(&nrs->nrs_lock);
446
447         fallback = nrs->nrs_policy_fallback;
448         nrs_policy_get_locked(fallback);
449
450         primary = nrs->nrs_policy_primary;
451         if (primary != NULL)
452                 nrs_policy_get_locked(primary);
453
454         spin_unlock(&nrs->nrs_lock);
455
456         /**
457          * Obtain resource hierarchy references.
458          */
459         resp[NRS_RES_FALLBACK] = nrs_resource_get(fallback, nrq, moving_req);
460         LASSERT(resp[NRS_RES_FALLBACK] != NULL);
461
462         if (primary != NULL) {
463                 resp[NRS_RES_PRIMARY] = nrs_resource_get(primary, nrq,
464                                                          moving_req);
465                 /**
466                  * A primary policy may exist which may not wish to serve a
467                  * particular request for different reasons; release the
468                  * reference on the policy as it will not be used for this
469                  * request.
470                  */
471                 if (resp[NRS_RES_PRIMARY] == NULL)
472                         nrs_policy_put(primary);
473         }
474 }
475
476 /**
477  * Releases references to resource hierarchies and policies, because they are no
478  * longer required; used when request handling has been completed, ot the
479  * request is moving to the high priority NRS head.
480  *
481  * \param resp The resource hierarchy that is being released
482  *
483  * \see ptlrpcnrs_req_hp_move()
484  * \see ptlrpc_nrs_req_finalize()
485  */
486 static void
487 nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
488 {
489         struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
490         struct ptlrpc_nrs        *nrs = NULL;
491         int                       i;
492
493         for (i = 0; i < NRS_RES_MAX; i++) {
494                 if (resp[i] != NULL) {
495                         pols[i] = resp[i]->res_policy;
496                         nrs_resource_put(resp[i]);
497                         resp[i] = NULL;
498                 } else {
499                         pols[i] = NULL;
500                 }
501         }
502
503         for (i = 0; i < NRS_RES_MAX; i++) {
504                 if (pols[i] == NULL)
505                         continue;
506
507                 if (nrs == NULL) {
508                         nrs = pols[i]->pol_nrs;
509                         spin_lock(&nrs->nrs_lock);
510                 }
511                 nrs_policy_put_locked(pols[i]);
512         }
513
514         if (nrs != NULL)
515                 spin_unlock(&nrs->nrs_lock);
516 }
517
518 /**
519  * Obtains an NRS request from \a policy for handling via polling.
520  *
521  * \param[in] policy    The policy being polled
522  * \param[in,out] arg   Reserved parameter
523  */
524 static struct ptlrpc_nrs_request *
525 nrs_request_poll(struct ptlrpc_nrs_policy *policy)
526 {
527         struct ptlrpc_nrs_request *nrq;
528
529         LASSERT(policy->pol_req_queued > 0);
530
531         nrq = policy->pol_ops->op_req_poll(policy);
532
533         LASSERT(nrq != NULL);
534         LASSERT(nrs_request_policy(nrq) == policy);
535
536         return nrq;
537 }
538
539 /**
540  * Enqueues request \a nrq for later handling, via one one the policies for
541  * which resources where earlier obtained via nrs_resource_get_safe(). The
542  * function attempts to enqueue the request first on the primary policy
543  * (if any), since this is the preferred choice.
544  *
545  * \param nrq The request being enqueued
546  *
547  * \see nrs_resource_get_safe()
548  */
549 static void
550 nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
551 {
552         struct ptlrpc_nrs_policy *policy;
553         int                       rc;
554         int                       i;
555
556         /**
557          * Try in descending order, because the primary policy (if any) is
558          * the preferred choice.
559          */
560         for (i = NRS_RES_MAX - 1; i >= 0; i--) {
561                 if (nrq->nr_res_ptrs[i] == NULL)
562                         continue;
563
564                 nrq->nr_res_idx = i;
565                 policy = nrq->nr_res_ptrs[i]->res_policy;
566
567                 rc = policy->pol_ops->op_req_enqueue(policy, nrq);
568                 if (rc == 0) {
569                         policy->pol_nrs->nrs_req_queued++;
570                         policy->pol_req_queued++;
571                         return;
572                 }
573         }
574         /**
575          * Should never get here, as at least the primary policy's
576          * ptlrpc_nrs_pol_ops::op_req_enqueue() implementation should always
577          * succeed.
578          */
579         LBUG();
580 }
581
582 /**
583  * Dequeues request \a nrq from the policy which was used for handling it
584  *
585  * \param nrq The request being dequeued
586  *
587  * \see ptlrpc_nrs_req_del_nolock()
588  */
589 static void
590 nrs_request_dequeue(struct ptlrpc_nrs_request *nrq)
591 {
592         struct ptlrpc_nrs_policy *policy;
593
594         policy = nrs_request_policy(nrq);
595
596         policy->pol_ops->op_req_dequeue(policy, nrq);
597
598         LASSERT(policy->pol_nrs->nrs_req_queued > 0);
599         LASSERT(policy->pol_req_queued > 0);
600
601         policy->pol_nrs->nrs_req_queued--;
602         policy->pol_req_queued--;
603 }
604
605 /**
606  * Is called when the request starts being handled, after it has been enqueued,
607  * polled and dequeued.
608  *
609  * \param[in] nrs The NRS request that is starting to be handled; can be used
610  *                for job/resource control.
611  *
612  * \see ptlrpc_nrs_req_start_nolock()
613  */
614 static void
615 nrs_request_start(struct ptlrpc_nrs_request *nrq)
616 {
617         struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
618
619         policy->pol_req_started++;
620         policy->pol_nrs->nrs_req_started++;
621         if (policy->pol_ops->op_req_start)
622                 policy->pol_ops->op_req_start(policy, nrq);
623 }
624
625 /**
626  * Called when a request has been handled
627  *
628  * \param[in] nrs The request that has been handled; can be used for
629  *                job/resource control.
630  *
631  * \see ptlrpc_nrs_req_stop_nolock()
632  */
633 static void
634 nrs_request_stop(struct ptlrpc_nrs_request *nrq)
635 {
636         struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
637
638         if (policy->pol_ops->op_req_stop)
639                 policy->pol_ops->op_req_stop(policy, nrq);
640
641         LASSERT(policy->pol_nrs->nrs_req_started > 0);
642         LASSERT(policy->pol_req_started > 0);
643
644         policy->pol_nrs->nrs_req_started--;
645         policy->pol_req_started--;
646 }
647
648 /**
649  * Handler for operations that can be carried out on policies.
650  *
651  * Handles opcodes that are common to all policy types within NRS core, and
652  * passes any unknown opcodes to the policy-specific control function.
653  *
654  * \param[in]     nrs  The NRS head this policy belongs to.
655  * \param[in]     name The human-readable policy name; should be the same as
656  *                     ptlrpc_nrs_pol_desc::pd_name.
657  * \param[in]     opc  The opcode of the operation being carried out.
658  * \param[in,out] arg  Can be used to pass information in and out between when
659  *                     carrying an operation; usually data that is private to
660  *                     the policy at some level, or generic policy status
661  *                     information.
662  *
663  * \retval -ve error condition
664  * \retval   0 operation was carried out successfully
665  */
666 static int
667 nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name, enum ptlrpc_nrs_ctl opc,
668                void *arg)
669 {
670         struct ptlrpc_nrs_policy       *policy;
671         int                             rc = 0;
672
673         spin_lock(&nrs->nrs_lock);
674
675         policy = nrs_policy_find_locked(nrs, name);
676         if (policy == NULL)
677                 GOTO(out, rc = -ENOENT);
678
679         switch (opc) {
680                 /**
681                  * Unknown opcode, pass it down to the policy-specific control
682                  * function for handling.
683                  */
684         default:
685                 rc = nrs_policy_ctl_locked(policy, opc, arg);
686                 break;
687
688                 /**
689                  * Start \e policy
690                  */
691         case PTLRPC_NRS_CTL_START:
692                 rc = nrs_policy_start_locked(policy);
693                 break;
694
695                 /**
696                  * TODO: This may need to be augmented for resource deallocation
697                  * used by the policies.
698                  */
699         case PTLRPC_NRS_CTL_SHRINK:
700                 rc = -ENOSYS;
701                 break;
702         }
703 out:
704         if (policy != NULL)
705                 nrs_policy_put_locked(policy);
706
707         spin_unlock(&nrs->nrs_lock);
708
709         return rc;
710 }
711
712 /**
713  * Unregisters a policy by name.
714  *
715  * \param[in] nrs  The NRS head this policy belongs to.
716  * \param[in] name The human-readable policy name; should be the same as
717  *                 ptlrpc_nrs_pol_desc::pd_name
718  *
719  * \retval -ve error
720  * \retval   0 success
721  */
722 static int
723 nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
724 {
725         struct ptlrpc_nrs_policy *policy = NULL;
726         ENTRY;
727
728         spin_lock(&nrs->nrs_lock);
729
730         policy = nrs_policy_find_locked(nrs, name);
731         if (policy == NULL) {
732                 spin_unlock(&nrs->nrs_lock);
733
734                 CERROR("Can't find NRS policy %s\n", name);
735                 RETURN(-ENOENT);
736         }
737
738         if (policy->pol_ref > 1) {
739                 CERROR("Policy %s is busy with %d references\n", name,
740                        (int)policy->pol_ref);
741                 nrs_policy_put_locked(policy);
742
743                 spin_unlock(&nrs->nrs_lock);
744                 RETURN(-EBUSY);
745         }
746
747         LASSERT(policy->pol_req_queued == 0);
748         LASSERT(policy->pol_req_started == 0);
749
750         if (policy->pol_state != NRS_POL_STATE_STOPPED) {
751                 nrs_policy_stop_locked(policy);
752                 LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED);
753         }
754
755         cfs_list_del(&policy->pol_list);
756         nrs->nrs_num_pols--;
757
758         nrs_policy_put_locked(policy);
759
760         spin_unlock(&nrs->nrs_lock);
761
762         nrs_policy_fini(policy);
763
764         LASSERT(policy->pol_private == NULL);
765         OBD_FREE_PTR(policy);
766
767         RETURN(0);
768 }
769
770 /**
771  * Register a policy from \policy descriptor \a desc with NRS head \a nrs.
772  *
773  * \param[in] nrs   The NRS head on which the policy will be registered.
774  * \param[in] desc  The policy descriptor from which the information will be
775  *                  obtained to register the policy.
776  *
777  * \retval -ve error
778  * \retval   0 success
779  */
780 static int
781 nrs_policy_register(struct ptlrpc_nrs *nrs,
782                     struct ptlrpc_nrs_pol_desc *desc)
783 {
784         struct ptlrpc_nrs_policy       *policy;
785         struct ptlrpc_nrs_policy       *tmp;
786         struct ptlrpc_service_part     *svcpt = nrs->nrs_svcpt;
787         int                             rc;
788         ENTRY;
789
790         LASSERT(svcpt != NULL);
791         LASSERT(desc->pd_ops != NULL);
792         LASSERT(desc->pd_ops->op_res_get != NULL);
793         LASSERT(desc->pd_ops->op_req_poll != NULL);
794         LASSERT(desc->pd_ops->op_req_enqueue != NULL);
795         LASSERT(desc->pd_ops->op_req_dequeue != NULL);
796         LASSERT(desc->pd_compat != NULL);
797
798         OBD_CPT_ALLOC_GFP(policy, svcpt->scp_service->srv_cptable,
799                           svcpt->scp_cpt, sizeof(*policy), CFS_ALLOC_IO);
800         if (policy == NULL)
801                 RETURN(-ENOMEM);
802
803         policy->pol_nrs     = nrs;
804         policy->pol_name    = desc->pd_name;
805         policy->pol_ops     = desc->pd_ops;
806         policy->pol_state   = desc->pd_flags & PTLRPC_NRS_FL_REG_EXTERN ?
807                               NRS_POL_STATE_UNAVAIL : NRS_POL_STATE_STOPPED;
808         policy->pol_flags   = desc->pd_flags & ~PTLRPC_NRS_FL_REG_EXTERN;
809
810         CFS_INIT_LIST_HEAD(&policy->pol_list);
811         CFS_INIT_LIST_HEAD(&policy->pol_list_queued);
812
813         rc = nrs_policy_init(policy);
814         if (rc != 0) {
815                 OBD_FREE_PTR(policy);
816                 RETURN(rc);
817         }
818
819         spin_lock(&nrs->nrs_lock);
820
821         tmp = nrs_policy_find_locked(nrs, policy->pol_name);
822         if (tmp != NULL) {
823                 CERROR("NRS policy %s has been registered, can't register it "
824                        "for %s\n",
825                        policy->pol_name, svcpt->scp_service->srv_name);
826                 nrs_policy_put_locked(tmp);
827
828                 spin_unlock(&nrs->nrs_lock);
829                 nrs_policy_fini(policy);
830                 OBD_FREE_PTR(policy);
831
832                 RETURN(-EEXIST);
833         }
834
835         cfs_list_add_tail(&policy->pol_list, &nrs->nrs_policy_list);
836         nrs->nrs_num_pols++;
837
838         if (policy->pol_flags & PTLRPC_NRS_FL_REG_START)
839                 rc = nrs_policy_start_locked(policy);
840
841         spin_unlock(&nrs->nrs_lock);
842
843         if (rc != 0)
844                 (void) nrs_policy_unregister(nrs, policy->pol_name);
845
846         RETURN(rc);
847 }
848
849 /**
850  * Enqueue request \a req using one of the policies its resources are referring
851  * to.
852  *
853  * \param[in] req The request to enqueue.
854  */
855 static void
856 ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
857 {
858         struct ptlrpc_nrs_policy       *policy;
859
860         LASSERT(req->rq_nrq.nr_initialized);
861         LASSERT(!req->rq_nrq.nr_enqueued);
862
863         nrs_request_enqueue(&req->rq_nrq);
864         req->rq_nrq.nr_enqueued = 1;
865
866         policy = nrs_request_policy(&req->rq_nrq);
867         /**
868          * Add the policy to the NRS head's list of policies with enqueued
869          * requests, if it has not been added there.
870          */
871         if (cfs_list_empty(&policy->pol_list_queued))
872                 cfs_list_add_tail(&policy->pol_list_queued,
873                                   &policy->pol_nrs->nrs_policy_queued);
874 }
875
876 /**
877  * Enqueue a request on the high priority NRS head.
878  *
879  * \param req The request to enqueue.
880  */
881 static void
882 ptlrpc_nrs_hpreq_add_nolock(struct ptlrpc_request *req)
883 {
884         int     opc = lustre_msg_get_opc(req->rq_reqmsg);
885         ENTRY;
886
887         spin_lock(&req->rq_lock);
888         req->rq_hp = 1;
889         ptlrpc_nrs_req_add_nolock(req);
890         if (opc != OBD_PING)
891                 DEBUG_REQ(D_NET, req, "high priority req");
892         spin_unlock(&req->rq_lock);
893         EXIT;
894 }
895
896 /* ptlrpc/nrs_fifo.c */
897 extern struct ptlrpc_nrs_pol_desc ptlrpc_nrs_fifo_desc;
898
899 /**
900  * Array of policies that ship alongside NRS core; i.e. ones that do not
901  * register externally using ptlrpc_nrs_policy_register().
902  */
903 static struct ptlrpc_nrs_pol_desc *nrs_pols_builtin[] = {
904         &ptlrpc_nrs_fifo_desc,
905 };
906
907 /**
908  * Returns a boolean predicate indicating whether the policy described by
909  * \a desc is adequate for use with service \a svc.
910  *
911  * \param[in] nrs    The service
912  * \param[in] desc  The policy descriptor
913  *
914  * \retval false The policy is not compatible with the service partition
915  * \retval true  The policy is compatible with the service partition
916  */
917 static inline bool
918 nrs_policy_compatible(struct ptlrpc_service *svc,
919                       const struct ptlrpc_nrs_pol_desc *desc)
920 {
921         return desc->pd_compat(svc, desc);
922 }
923
924 /**
925  * Registers all compatible policies in nrs_core.nrs_policies, for NRS head
926  * \a nrs.
927  *
928  * \param[in] nrs The NRS head
929  *
930  * \retval -ve error
931  * \retval   0 success
932  *
933  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
934  *
935  * \see ptlrpc_service_nrs_setup()
936  */
937 static int
938 nrs_register_policies_locked(struct ptlrpc_nrs *nrs)
939 {
940         struct ptlrpc_nrs_pol_desc *desc;
941         /* For convenience */
942         struct ptlrpc_service_part       *svcpt = nrs->nrs_svcpt;
943         struct ptlrpc_service            *svc = svcpt->scp_service;
944         int                               rc = -EINVAL;
945         ENTRY;
946
947         LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
948
949         cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
950                 if (nrs_policy_compatible(svc, desc)) {
951                         rc = nrs_policy_register(nrs, desc);
952                         if (rc != 0) {
953                                 CERROR("Failed to register NRS policy %s for "
954                                        "partition %d of service %s: %d\n",
955                                        desc->pd_name, svcpt->scp_cpt,
956                                        svc->srv_name, rc);
957                                 /**
958                                  * Fail registration if any of the policies'
959                                  * registration fails.
960                                  */
961                                 break;
962                         }
963                 }
964         }
965
966         RETURN(rc);
967 }
968
969 /**
970  * Initializes NRS head \a nrs of service partition \a svcpt, and registers all
971  * compatible policies in NRS core, with the NRS head.
972  *
973  * \param[in] nrs   The NRS head
974  * \param[in] svcpt The PTLRPC service partition to setup
975  *
976  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
977  */
978 static int
979 nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
980                         struct ptlrpc_service_part *svcpt)
981 {
982         int                             rc;
983         enum ptlrpc_nrs_queue_type      queue;
984
985         LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
986
987         if (nrs == &svcpt->scp_nrs_reg)
988                 queue = PTLRPC_NRS_QUEUE_REG;
989         else if (nrs == svcpt->scp_nrs_hp)
990                 queue = PTLRPC_NRS_QUEUE_HP;
991         else
992                 LBUG();
993
994         nrs->nrs_svcpt = svcpt;
995         nrs->nrs_queue_type = queue;
996         spin_lock_init(&nrs->nrs_lock);
997         CFS_INIT_LIST_HEAD(&nrs->nrs_heads);
998         CFS_INIT_LIST_HEAD(&nrs->nrs_policy_list);
999         CFS_INIT_LIST_HEAD(&nrs->nrs_policy_queued);
1000
1001         cfs_list_add_tail(&nrs->nrs_heads, &nrs_core.nrs_heads);
1002
1003         rc = nrs_register_policies_locked(nrs);
1004
1005         RETURN(rc);
1006 }
1007
1008 /**
1009  * Allocates a regular and optionally a high-priority NRS head (if the service
1010  * handles high-priority RPCs), and then registers all available compatible
1011  * policies on those NRS heads.
1012  *
1013  * \param[n] svcpt The PTLRPC service partition to setup
1014  *
1015  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
1016  */
1017 static int
1018 nrs_svcpt_setup_locked(struct ptlrpc_service_part *svcpt)
1019 {
1020         struct ptlrpc_nrs              *nrs;
1021         int                             rc;
1022         ENTRY;
1023
1024         LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
1025
1026         /**
1027          * Initialize the regular NRS head.
1028          */
1029         nrs = nrs_svcpt2nrs(svcpt, false);
1030         rc = nrs_svcpt_setup_locked0(nrs, svcpt);
1031         if (rc)
1032                 GOTO(out, rc);
1033
1034         /**
1035          * Optionally allocate a high-priority NRS head.
1036          */
1037         if (svcpt->scp_service->srv_ops.so_hpreq_handler == NULL)
1038                 GOTO(out, rc);
1039
1040         OBD_CPT_ALLOC_PTR(svcpt->scp_nrs_hp,
1041                           svcpt->scp_service->srv_cptable,
1042                           svcpt->scp_cpt);
1043         if (svcpt->scp_nrs_hp == NULL)
1044                 GOTO(out, rc = -ENOMEM);
1045
1046         nrs = nrs_svcpt2nrs(svcpt, true);
1047         rc = nrs_svcpt_setup_locked0(nrs, svcpt);
1048
1049 out:
1050         RETURN(rc);
1051 }
1052
1053 /**
1054  * Unregisters all policies on all available NRS heads in a service partition;
1055  * called at PTLRPC service unregistration time.
1056  *
1057  * \param[in] svcpt The PTLRPC service partition
1058  *
1059  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
1060  */
1061 static void
1062 nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt)
1063 {
1064         struct ptlrpc_nrs              *nrs;
1065         struct ptlrpc_nrs_policy       *policy;
1066         struct ptlrpc_nrs_policy       *tmp;
1067         int                             rc;
1068         bool                            hp = false;
1069         ENTRY;
1070
1071         LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
1072
1073 again:
1074         nrs = nrs_svcpt2nrs(svcpt, hp);
1075         nrs->nrs_stopping = 1;
1076
1077         cfs_list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list,
1078                                      pol_list) {
1079                 rc = nrs_policy_unregister(nrs, policy->pol_name);
1080                 LASSERT(rc == 0);
1081         }
1082
1083         cfs_list_del(&nrs->nrs_heads);
1084
1085         /**
1086          * If the service partition has an HP NRS head, clean that up as well.
1087          */
1088         if (!hp && nrs_svcpt_has_hp(svcpt)) {
1089                 hp = true;
1090                 goto again;
1091         }
1092
1093         if (hp)
1094                 OBD_FREE_PTR(nrs);
1095
1096         EXIT;
1097 }
1098
1099 /**
1100  * Checks whether the policy in \a desc has been added to NRS core's list of
1101  * policies, \e nrs_core.nrs_policies.
1102  *
1103  * \param[in] desc The policy descriptor
1104  *
1105  * \retval true The policy is present
1106  * \retval false The policy is not present
1107  */
1108 static bool
1109 nrs_policy_exists_locked(const struct ptlrpc_nrs_pol_desc *desc)
1110 {
1111         struct ptlrpc_nrs_pol_desc     *tmp;
1112         ENTRY;
1113
1114         cfs_list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) {
1115                 if (strncmp(tmp->pd_name, desc->pd_name, NRS_POL_NAME_MAX) == 0)
1116                         RETURN(true);
1117         }
1118         RETURN(false);
1119 }
1120
1121 /**
1122  * Removes the policy from all supported NRS heads.
1123  *
1124  * \param[in] desc The policy descriptor to unregister
1125  *
1126  * \retval -ve error
1127  * \retval  0  successfully unregistered policy on all supported NRS heads
1128  *
1129  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
1130  */
1131 static int
1132 nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc)
1133 {
1134         struct ptlrpc_nrs      *nrs;
1135         int                     rc = 0;
1136         ENTRY;
1137
1138         LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
1139
1140         cfs_list_for_each_entry(nrs, &nrs_core.nrs_heads, nrs_heads) {
1141                 if (!nrs_policy_compatible(nrs->nrs_svcpt->scp_service, desc)) {
1142                         /**
1143                          * The policy may only have registered on compatible
1144                          * NRS heads.
1145                          */
1146                         continue;
1147                 }
1148
1149                 rc = nrs_policy_unregister(nrs, desc->pd_name);
1150
1151                 /**
1152                  * Ignore -ENOENT as the policy may not have registered
1153                  * successfully on all service partitions.
1154                  */
1155                 if (rc == -ENOENT) {
1156                         rc = 0;
1157                 } else if (rc != 0) {
1158                         CERROR("Failed to unregister NRS policy %s for "
1159                                "partition %d of service %s: %d\n",
1160                                desc->pd_name, nrs->nrs_svcpt->scp_cpt,
1161                                nrs->nrs_svcpt->scp_service->srv_name, rc);
1162                         break;
1163                 }
1164         }
1165         RETURN(rc);
1166 }
1167
1168 /**
1169  * Transitions a policy from ptlrpc_nrs_pol_state::NRS_POL_STATE_UNAVAIL to
1170  * ptlrpc_nrs_pol_state::STOPPED; is used to prevent policies that are
1171  * registering externally using ptlrpc_nrs_policy_register from starting
1172  * before they have successfully registered on all compatible service
1173  * partitions.
1174  *
1175  * \param[in] nrs  The NRS head that the policy belongs to
1176  * \param[in] name The human-readable policy name
1177  */
1178 static void
1179 nrs_pol_make_available0(struct ptlrpc_nrs *nrs, char *name)
1180 {
1181         struct ptlrpc_nrs_policy *pol;
1182
1183         LASSERT(nrs);
1184         LASSERT(name);
1185
1186         spin_lock(&nrs->nrs_lock);
1187         pol = nrs_policy_find_locked(nrs, name);
1188         if (pol) {
1189                 LASSERT(pol->pol_state == NRS_POL_STATE_UNAVAIL);
1190                 pol->pol_state = NRS_POL_STATE_STOPPED;
1191                 nrs_policy_put_locked(pol);
1192         }
1193         spin_unlock(&nrs->nrs_lock);
1194 }
1195
1196 /**
1197  * Make the policy available on all compatible service partitions of all PTLRPC
1198  * services.
1199  *
1200  * \param[in] desc The descriptor for the policy that is to be made available
1201  *
1202  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
1203  *
1204  * \see nrs_pol_make_available0()
1205  */
1206 static void
1207 nrs_pol_make_available_locked(struct ptlrpc_nrs_pol_desc *desc)
1208 {
1209         struct ptlrpc_nrs        *nrs;
1210         ENTRY;
1211
1212         LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
1213
1214          /**
1215           * Cycle through all registered instances of the policy and place them
1216           * at the STOPPED state.
1217           */
1218         cfs_list_for_each_entry(nrs, &nrs_core.nrs_heads, nrs_heads) {
1219                 if (!nrs_policy_compatible(nrs->nrs_svcpt->scp_service, desc))
1220                         continue;
1221                 nrs_pol_make_available0(nrs, desc->pd_name);
1222         }
1223         EXIT;
1224 }
1225
1226 /**
1227  * Registers a new policy with NRS core.
1228  *
1229  * Used for policies that register externally with NRS core, i.e. ones that are
1230  * not part of \e nrs_pols_builtin[]. The function will only succeed if policy
1231  * registration with all compatible service partitions is successful.
1232  *
1233  * \param[in] desc The policy descriptor to register
1234  *
1235  * \retval -ve error
1236  * \retval   0 success
1237  */
1238 int
1239 ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_desc *desc)
1240 {
1241         struct ptlrpc_nrs              *nrs;
1242         struct ptlrpc_service          *svc;
1243         struct ptlrpc_service_part     *svcpt;
1244         int                             i;
1245         int                             rc;
1246         int                             rc2;
1247         ENTRY;
1248
1249         LASSERT(desc != NULL);
1250
1251         desc->pd_name[NRS_POL_NAME_MAX - 1] = '\0';
1252
1253         if (desc->pd_flags & (PTLRPC_NRS_FL_FALLBACK |
1254                               PTLRPC_NRS_FL_REG_START)) {
1255                 CERROR("Failing to register NRS policy %s; re-check policy "
1256                        "flags, externally-registered policies cannot act as "
1257                        "fallback policies or be started immediately without "
1258                        "interaction with lprocfs.\n", desc->pd_name);
1259                 RETURN(-EINVAL);
1260         }
1261
1262         desc->pd_flags |= PTLRPC_NRS_FL_REG_EXTERN;
1263
1264         mutex_lock(&nrs_core.nrs_mutex);
1265
1266         rc = nrs_policy_exists_locked(desc);
1267         if (rc) {
1268                 CERROR("Failing to register NRS policy %s which has "
1269                        "already been registered with NRS core!\n",
1270                        desc->pd_name);
1271                 GOTO(fail, rc = -EEXIST);
1272         }
1273
1274         /**
1275          * Register the new policy on all compatible services
1276          */
1277         mutex_lock(&ptlrpc_all_services_mutex);
1278
1279         cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
1280
1281                 if (unlikely(svc->srv_is_stopping)) {
1282                         mutex_unlock(&ptlrpc_all_services_mutex);
1283                         GOTO(fail, rc = -ESRCH);
1284                 }
1285
1286                 if (!nrs_policy_compatible(svc, desc)) {
1287                         /**
1288                          * Attempt to register the policy if it is
1289                          * compatible, otherwise try the next service.
1290                          */
1291                         continue;
1292                 }
1293                 ptlrpc_service_for_each_part(svcpt, i, svc) {
1294                         bool hp = false;
1295
1296 again:
1297                         nrs = nrs_svcpt2nrs(svcpt, hp);
1298                         rc = nrs_policy_register(nrs, desc);
1299                         if (rc != 0) {
1300                                 CERROR("Failed to register NRS policy %s for "
1301                                        "partition %d of service %s: %d\n",
1302                                        desc->pd_name, nrs->nrs_svcpt->scp_cpt,
1303                                        nrs->nrs_svcpt->scp_service->srv_name,
1304                                        rc);
1305
1306                                 rc2 = nrs_policy_unregister_locked(desc);
1307                                 /**
1308                                  * Should not fail at this point
1309                                  */
1310                                 LASSERT(rc2 == 0);
1311                                 mutex_unlock(&ptlrpc_all_services_mutex);
1312                                 GOTO(fail, rc);
1313                         }
1314
1315                         if (!hp && nrs_svc_has_hp(svc)) {
1316                                 hp = true;
1317                                 goto again;
1318                         }
1319                 }
1320                 if (desc->pd_ops->op_lprocfs_init != NULL) {
1321                         rc = desc->pd_ops->op_lprocfs_init(svc);
1322                         if (rc != 0) {
1323                                 rc2 = nrs_policy_unregister_locked(desc);
1324                                 /**
1325                                  * Should not fail at this point
1326                                  */
1327                                 LASSERT(rc2 == 0);
1328                                 mutex_unlock(&ptlrpc_all_services_mutex);
1329                                 GOTO(fail, rc);
1330                         }
1331                 }
1332         }
1333
1334         mutex_unlock(&ptlrpc_all_services_mutex);
1335
1336         /**
1337          * The policy has successfully registered with all service partitions,
1338          * so mark the policy instances at the NRS heads as available.
1339          */
1340         nrs_pol_make_available_locked(desc);
1341
1342         cfs_list_add_tail(&desc->pd_list, &nrs_core.nrs_policies);
1343 fail:
1344         mutex_unlock(&nrs_core.nrs_mutex);
1345
1346         RETURN(rc);
1347 }
1348 EXPORT_SYMBOL(ptlrpc_nrs_policy_register);
1349
1350 /**
1351  * Unregisters a previously registered policy with NRS core. All instances of
1352  * the policy on all NRS heads of all supported services are removed.
1353  *
1354  * \param[in] desc The descriptor of the policy to unregister
1355  *
1356  * \retval -ve error
1357  * \retval   0 success
1358  */
1359 int
1360 ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_desc *desc)
1361 {
1362         int                    rc;
1363         struct ptlrpc_service *svc;
1364         ENTRY;
1365
1366         LASSERT(desc != NULL);
1367
1368         if (desc->pd_flags & PTLRPC_NRS_FL_FALLBACK) {
1369                 CERROR("Unable to unregister a fallback policy, unless the "
1370                        "PTLRPC service is stopping.\n");
1371                 RETURN(-EPERM);
1372         }
1373
1374         desc->pd_name[NRS_POL_NAME_MAX - 1] = '\0';
1375
1376         mutex_lock(&nrs_core.nrs_mutex);
1377
1378         rc = nrs_policy_exists_locked(desc);
1379         if (!rc) {
1380                 CERROR("Failing to unregister NRS policy %s which has "
1381                        "not been registered with NRS core!\n",
1382                        desc->pd_name);
1383                 GOTO(fail, rc = -ENOENT);
1384         }
1385
1386         rc = nrs_policy_unregister_locked(desc);
1387         if (rc == -EBUSY) {
1388                 CERROR("Please first stop policy %s on all service partitions "
1389                        "and then retry to unregister the policy.\n",
1390                        desc->pd_name);
1391                 GOTO(fail, rc);
1392         }
1393         CDEBUG(D_INFO, "Unregistering policy %s from NRS core.\n",
1394                desc->pd_name);
1395
1396         cfs_list_del(&desc->pd_list);
1397
1398         /**
1399          * Unregister the policy's lprocfs interface from all compatible
1400          * services.
1401          */
1402         mutex_lock(&ptlrpc_all_services_mutex);
1403
1404         cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
1405                 if (!nrs_policy_compatible(svc, desc))
1406                         continue;
1407
1408                 if (desc->pd_ops->op_lprocfs_fini != NULL)
1409                         desc->pd_ops->op_lprocfs_fini(svc);
1410         }
1411
1412         mutex_unlock(&ptlrpc_all_services_mutex);
1413
1414 fail:
1415         mutex_unlock(&nrs_core.nrs_mutex);
1416
1417         RETURN(rc);
1418 }
1419 EXPORT_SYMBOL(ptlrpc_nrs_policy_unregister);
1420
1421 /**
1422  * Setup NRS heads on all service partitions of service \a svc, and register
1423  * all compatible policies on those NRS heads.
1424  *
1425  * \param[in] svc  The service to setup
1426  *
1427  * \retval -ve error, the calling logic should eventually call
1428  *                    ptlrpc_service_nrs_cleanup() to undo any work performed
1429  *                    by this function.
1430  *
1431  * \see ptlrpc_register_service()
1432  * \see ptlrpc_service_nrs_cleanup()
1433  */
1434 int
1435 ptlrpc_service_nrs_setup(struct ptlrpc_service *svc)
1436 {
1437         struct ptlrpc_service_part             *svcpt;
1438         const struct ptlrpc_nrs_pol_desc       *desc;
1439         int                                     i;
1440         int                                     rc = 0;
1441
1442         mutex_lock(&nrs_core.nrs_mutex);
1443
1444         /**
1445          * Initialize NRS heads on all service CPTs.
1446          */
1447         ptlrpc_service_for_each_part(svcpt, i, svc) {
1448                 rc = nrs_svcpt_setup_locked(svcpt);
1449                 if (rc != 0)
1450                         GOTO(failed, rc);
1451         }
1452
1453         /*
1454          * Set up lprocfs interfaces for all supported policies for the
1455          * service.
1456          */
1457         cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
1458                 if (!nrs_policy_compatible(svc, desc))
1459                         continue;
1460
1461                 if (desc->pd_ops->op_lprocfs_init != NULL) {
1462                         rc = desc->pd_ops->op_lprocfs_init(svc);
1463                         if (rc != 0)
1464                                 GOTO(failed, rc);
1465                 }
1466         }
1467
1468 failed:
1469
1470         mutex_unlock(&nrs_core.nrs_mutex);
1471
1472         RETURN(rc);
1473 }
1474
1475 /**
1476  * Unregisters all policies on all service partitions of service \a svc.
1477  *
1478  * \param[in] svc The PTLRPC service to unregister
1479  */
1480 void
1481 ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc)
1482 {
1483         struct ptlrpc_service_part           *svcpt;
1484         const struct ptlrpc_nrs_pol_desc     *desc;
1485         int                                   i;
1486
1487         mutex_lock(&nrs_core.nrs_mutex);
1488
1489         /**
1490          * Clean up NRS heads on all service partitions
1491          */
1492         ptlrpc_service_for_each_part(svcpt, i, svc)
1493                 nrs_svcpt_cleanup_locked(svcpt);
1494
1495         /**
1496          * Clean up lprocfs interfaces for all supported policies for the
1497          * service.
1498          */
1499         cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
1500                 if (!nrs_policy_compatible(svc, desc))
1501                         continue;
1502
1503                 if (desc->pd_ops->op_lprocfs_fini != NULL)
1504                         desc->pd_ops->op_lprocfs_fini(svc);
1505         }
1506
1507         mutex_unlock(&nrs_core.nrs_mutex);
1508 }
1509
1510 /**
1511  * Obtains NRS head resources for request \a req.
1512  *
1513  * These could be either on the regular or HP NRS head of \a svcpt; resources
1514  * taken on the regular head can later be swapped for HP head resources by
1515  * ldlm_lock_reorder_req().
1516  *
1517  * \param[in] svcpt The service partition
1518  * \param[in] req   The request
1519  * \param[in] hp    Which NRS head of \a svcpt to use
1520  */
1521 void
1522 ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
1523                           struct ptlrpc_request *req, bool hp)
1524 {
1525         struct ptlrpc_nrs       *nrs = nrs_svcpt2nrs(svcpt, hp);
1526
1527         memset(&req->rq_nrq, 0, sizeof(req->rq_nrq));
1528         nrs_resource_get_safe(nrs, &req->rq_nrq, req->rq_nrq.nr_res_ptrs,
1529                               false);
1530
1531         /**
1532          * It is fine to access \e nr_initialized without locking as there is
1533          * no contention at this early stage.
1534          */
1535         req->rq_nrq.nr_initialized = 1;
1536 }
1537
1538 /**
1539  * Releases resources for a request; is called after the request has been
1540  * handled.
1541  *
1542  * \param[in] req The request
1543  *
1544  * \see ptlrpc_server_finish_request()
1545  */
1546 void
1547 ptlrpc_nrs_req_finalize(struct ptlrpc_request *req)
1548 {
1549         if (req->rq_nrq.nr_initialized) {
1550                 nrs_resource_put_safe(req->rq_nrq.nr_res_ptrs);
1551                 /* no protection on bit nr_initialized because no
1552                  * contention at this late stage */
1553                 req->rq_nrq.nr_finalized = 1;
1554         }
1555 }
1556
1557 void
1558 ptlrpc_nrs_req_start_nolock(struct ptlrpc_request *req)
1559 {
1560         req->rq_nrq.nr_started = 1;
1561         nrs_request_start(&req->rq_nrq);
1562 }
1563
1564 void
1565 ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req)
1566 {
1567         if (req->rq_nrq.nr_started)
1568                 nrs_request_stop(&req->rq_nrq);
1569 }
1570
1571 /**
1572  * Enqueues request \a req on either the regular or high-priority NRS head
1573  * of service partition \a svcpt.
1574  *
1575  * \param[in] svcpt The service partition
1576  * \param[in] req   The request to be enqueued
1577  * \param[in] hp    Whether to enqueue the request on the regular or
1578  *                  high-priority NRS head.
1579  */
1580 void
1581 ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
1582                    struct ptlrpc_request *req, bool hp)
1583 {
1584         spin_lock(&svcpt->scp_req_lock);
1585
1586         if (hp)
1587                 ptlrpc_nrs_hpreq_add_nolock(req);
1588         else
1589                 ptlrpc_nrs_req_add_nolock(req);
1590
1591         spin_unlock(&svcpt->scp_req_lock);
1592 }
1593
1594 /**
1595  * Obtains a request for handling from an NRS head of service partition
1596  * \a svcpt.
1597  *
1598  * \param[in] svcpt The service partition
1599  * \param[in] hp    Whether to obtain a request from the regular or
1600  *                  high-priority NRS head.
1601  *
1602  * \retval the request to be handled
1603  * \retval NULL on failure
1604  */
1605 struct ptlrpc_request *
1606 ptlrpc_nrs_req_poll_nolock(struct ptlrpc_service_part *svcpt, bool hp)
1607 {
1608         struct ptlrpc_nrs         *nrs = nrs_svcpt2nrs(svcpt, hp);
1609         struct ptlrpc_nrs_policy  *policy;
1610         struct ptlrpc_nrs_request *nrq;
1611
1612         if (unlikely(nrs->nrs_req_queued == 0))
1613                 return NULL;
1614
1615         /**
1616          * Always try to drain requests from all NRS polices even if they are
1617          * inactive, because the user can change policy status at runtime.
1618          */
1619         cfs_list_for_each_entry(policy, &(nrs)->nrs_policy_queued,
1620                                 pol_list_queued) {
1621                 nrq = nrs_request_poll(policy);
1622                 if (likely(nrq != NULL))
1623                         return container_of(nrq, struct ptlrpc_request, rq_nrq);
1624         }
1625
1626         return NULL;
1627 }
1628
1629 /**
1630  * Dequeues a request that was previously obtained via ptlrpc_nrs_req_poll() and
1631  * is about to be handled.
1632  *
1633  * \param[in] req The request
1634  */
1635 void
1636 ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req)
1637 {
1638         struct ptlrpc_nrs_policy  *policy;
1639
1640         LASSERT(req->rq_nrq.nr_enqueued);
1641         LASSERT(!req->rq_nrq.nr_dequeued);
1642
1643         policy = nrs_request_policy(&req->rq_nrq);
1644         nrs_request_dequeue(&req->rq_nrq);
1645         req->rq_nrq.nr_dequeued = 1;
1646
1647         /**
1648          * If the policy has no more requests queued, remove it from
1649          * ptlrpc_nrs::nrs_policy_queued.
1650          */
1651         if (policy->pol_req_queued == 0) {
1652                 cfs_list_del_init(&policy->pol_list_queued);
1653
1654                 /**
1655                  * If there are other policies with queued requests, move the
1656                  * current policy to the end so that we can round robin over
1657                  * all policies and drain the requests.
1658                  */
1659         } else if (policy->pol_req_queued != policy->pol_nrs->nrs_req_queued) {
1660                 LASSERT(policy->pol_req_queued <
1661                         policy->pol_nrs->nrs_req_queued);
1662
1663                 cfs_list_move_tail(&policy->pol_list_queued,
1664                                    &policy->pol_nrs->nrs_policy_queued);
1665         }
1666 }
1667
1668 /**
1669  * Returns whether there are any requests currently enqueued on any of the
1670  * policies of service partition's \a svcpt NRS head specified by \a hp. Should
1671  * be called while holding ptlrpc_service_part::scp_req_lock to get a reliable
1672  * result.
1673  *
1674  * \param[in] svcpt The service partition to enquire.
1675  * \param[in] hp    Whether the regular or high-priority NRS head is to be
1676  *                  enquired.
1677  *
1678  * \retval false The indicated NRS head has no enqueued requests.
1679  * \retval true  The indicated NRS head has some enqueued requests.
1680  */
1681 bool
1682 ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
1683 {
1684         struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
1685
1686         return nrs->nrs_req_queued > 0;
1687 };
1688
1689 /**
1690  * Moves request \a req from the regular to the high-priority NRS head.
1691  *
1692  * \param[in] req The request to move
1693  */
1694 void
1695 ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req)
1696 {
1697         struct ptlrpc_service_part      *svcpt = req->rq_rqbd->rqbd_svcpt;
1698         struct ptlrpc_nrs               *nrs = nrs_svcpt2nrs(svcpt, true);
1699         struct ptlrpc_nrs_request       *nrq = &req->rq_nrq;
1700         struct ptlrpc_nrs_resource      *res1[NRS_RES_MAX];
1701         struct ptlrpc_nrs_resource      *res2[NRS_RES_MAX];
1702         ENTRY;
1703
1704         /**
1705          * Obtain the high-priority NRS head resources.
1706          * XXX: Maybe want to remove nrs_resource[get|put]_safe() dance
1707          * when request cannot actually move; move this further down?
1708          */
1709         nrs_resource_get_safe(nrs, nrq, res1, true);
1710
1711         spin_lock(&svcpt->scp_req_lock);
1712
1713         if (!ptlrpc_nrs_req_can_move(req))
1714                 goto out;
1715
1716         ptlrpc_nrs_req_del_nolock(req);
1717         nrq->nr_enqueued = nrq->nr_dequeued = 0;
1718
1719         memcpy(res2, nrq->nr_res_ptrs, NRS_RES_MAX * sizeof(res2[0]));
1720         memcpy(nrq->nr_res_ptrs, res1, NRS_RES_MAX * sizeof(res1[0]));
1721
1722         ptlrpc_nrs_hpreq_add_nolock(req);
1723
1724         memcpy(res1, res2, NRS_RES_MAX * sizeof(res1[0]));
1725 out:
1726         spin_unlock(&svcpt->scp_req_lock);
1727
1728         /**
1729          * Release either the regular NRS head resources if we moved the
1730          * request, or the high-priority NRS head resources if we took a
1731          * reference earlier in this function and ptlrpc_nrs_req_can_move()
1732          * returned false.
1733          */
1734         nrs_resource_put_safe(res1);
1735         EXIT;
1736 }
1737
1738 /**
1739  * Carries out a control operation \a opc on the policy identified by the
1740  * human-readable \a name, on either all partitions, or only on the first
1741  * partition of service \a svc.
1742  *
1743  * \param[in]     svc    The service the policy belongs to.
1744  * \param[in]     queue  Whether to carry out the command on the policy which
1745  *                       belongs to the regular, high-priority, or both NRS
1746  *                       heads of service partitions of \a svc.
1747  * \param[in]     name   The policy to act upon, by human-readable name
1748  * \param[in]     opc    The opcode of the operation to carry out
1749  * \param[in]     single When set, the operation will only be carried out on the
1750  *                       NRS heads of the first service partition of \a svc.
1751  *                       This is useful for some policies which e.g. share
1752  *                       identical values on the same parameters of different
1753  *                       service partitions; when reading these parameters via
1754  *                       lprocfs, these policies may just want to obtain and
1755  *                       print out the values from the first service partition.
1756  *                       Storing these values centrally elsewhere then could be
1757  *                       another solution for this.
1758  * \param[in,out] arg    Can be used as a generic in/out buffer between control
1759  *                       operations and the user environment.
1760  *
1761  *\retval -ve error condition
1762  *\retval   0 operation was carried out successfully
1763  */
1764 int
1765 ptlrpc_nrs_policy_control(struct ptlrpc_service *svc,
1766                           enum ptlrpc_nrs_queue_type queue, char *name,
1767                           enum ptlrpc_nrs_ctl opc, bool single, void *arg)
1768 {
1769         struct ptlrpc_service_part     *svcpt;
1770         int                             i;
1771         int                             rc = 0;
1772         ENTRY;
1773
1774         ptlrpc_service_for_each_part(svcpt, i, svc) {
1775                 switch (queue) {
1776                 default:
1777                         return -EINVAL;
1778
1779                 case PTLRPC_NRS_QUEUE_BOTH:
1780                 case PTLRPC_NRS_QUEUE_REG:
1781                         rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, false), name,
1782                                             opc, arg);
1783                         if (rc != 0 || (queue == PTLRPC_NRS_QUEUE_REG &&
1784                                         single))
1785                                 GOTO(out, rc);
1786
1787                         if (queue == PTLRPC_NRS_QUEUE_REG)
1788                                 break;
1789
1790                         /* fallthrough */
1791
1792                 case PTLRPC_NRS_QUEUE_HP:
1793                         /**
1794                          * XXX: We could optionally check for
1795                          * nrs_svc_has_hp(svc) here, and return an error if it
1796                          * is false. Right now we rely on the policies' lprocfs
1797                          * handlers that call the present function to make this
1798                          * check; if they fail to do so, they might hit the
1799                          * assertion inside nrs_svcpt2nrs() below.
1800                          */
1801                         rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, true), name,
1802                                             opc, arg);
1803                         if (rc != 0 || single)
1804                                 GOTO(out, rc);
1805
1806                         break;
1807                 }
1808         }
1809 out:
1810         RETURN(rc);
1811 }
1812
1813 /**
1814  * Adds all policies that ship with NRS, i.e. those in the \e nrs_pols_builtin
1815  * array, to NRS core's list of policies \e nrs_core.nrs_policies.
1816  *
1817  * \retval 0 All policy descriptors in \e nrs_pols_builtin have been added
1818  *           successfully to \e nrs_core.nrs_policies
1819  */
1820 int
1821 ptlrpc_nrs_init(void)
1822 {
1823         int     rc = -EINVAL;
1824         int     i;
1825         ENTRY;
1826
1827         /**
1828          * Initialize the NRS core object.
1829          */
1830         mutex_init(&nrs_core.nrs_mutex);
1831         CFS_INIT_LIST_HEAD(&nrs_core.nrs_heads);
1832         CFS_INIT_LIST_HEAD(&nrs_core.nrs_policies);
1833
1834         for (i = 0; i < ARRAY_SIZE(nrs_pols_builtin); i++) {
1835                 /**
1836                  * No need to take nrs_core.nrs_mutex as there is no contention at
1837                  * this early stage.
1838                  */
1839                 rc = nrs_policy_exists_locked(nrs_pols_builtin[i]);
1840                 /**
1841                  * This should not fail for in-tree policies.
1842                  */
1843                 LASSERT(rc == false);
1844                 cfs_list_add_tail(&nrs_pols_builtin[i]->pd_list,
1845                                   &nrs_core.nrs_policies);
1846         }
1847
1848         RETURN(rc);
1849 }
1850
1851 /**
1852  * Stub finalization function
1853  */
1854 void
1855 ptlrpc_nrs_fini(void)
1856 {
1857 }
1858
1859 /** @} nrs */