Whamcloud - gitweb
2223c14f10adedcafd69eb7539927318728ed8a1
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 /*
66  * support functions: we could use inter-module communication, but this
67  * is more portable to other OS's
68  */
69 static struct obd_device *obd_device_alloc(void)
70 {
71         struct obd_device *obd;
72
73         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74         if (obd != NULL) {
75                 obd->obd_magic = OBD_DEVICE_MAGIC;
76         }
77         return obd;
78 }
79
80 static void obd_device_free(struct obd_device *obd)
81 {
82         LASSERT(obd != NULL);
83         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85         if (obd->obd_namespace != NULL) {
86                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87                        obd, obd->obd_namespace, obd->obd_force);
88                 LBUG();
89         }
90         lu_ref_fini(&obd->obd_reference);
91         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
92 }
93
94 struct obd_type *class_search_type(const char *name)
95 {
96         struct kobject *kobj = kset_find_obj(lustre_kset, name);
97
98         if (kobj && kobj->ktype == &class_ktype)
99                 return container_of(kobj, struct obd_type, typ_kobj);
100
101         kobject_put(kobj);
102         return NULL;
103 }
104 EXPORT_SYMBOL(class_search_type);
105
106 struct obd_type *class_get_type(const char *name)
107 {
108         struct obd_type *type;
109
110         type = class_search_type(name);
111 #ifdef HAVE_MODULE_LOADING_SUPPORT
112         if (!type) {
113                 const char *modname = name;
114
115 #ifdef HAVE_SERVER_SUPPORT
116                 if (strcmp(modname, "obdfilter") == 0)
117                         modname = "ofd";
118
119                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
120                         modname = LUSTRE_OSP_NAME;
121
122                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
123                         modname = LUSTRE_MDT_NAME;
124 #endif /* HAVE_SERVER_SUPPORT */
125
126                 if (!request_module("%s", modname)) {
127                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
128                         type = class_search_type(name);
129                 } else {
130                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
131                                            modname);
132                 }
133         }
134 #endif
135         if (type) {
136                 if (try_module_get(type->typ_dt_ops->o_owner)) {
137                         atomic_inc(&type->typ_refcnt);
138                         /* class_search_type() returned a counted reference,
139                          * but we don't need that count any more as
140                          * we have one through typ_refcnt.
141                          */
142                         kobject_put(&type->typ_kobj);
143                 } else {
144                         kobject_put(&type->typ_kobj);
145                         type = NULL;
146                 }
147         }
148         return type;
149 }
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         module_put(type->typ_dt_ops->o_owner);
155         atomic_dec(&type->typ_refcnt);
156 }
157
158 static void class_sysfs_release(struct kobject *kobj)
159 {
160         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
161
162         debugfs_remove_recursive(type->typ_debugfs_entry);
163         type->typ_debugfs_entry = NULL;
164
165         if (type->typ_lu)
166                 lu_device_type_fini(type->typ_lu);
167
168 #ifdef CONFIG_PROC_FS
169         if (type->typ_name && type->typ_procroot)
170                 remove_proc_subtree(type->typ_name, proc_lustre_root);
171 #endif
172         OBD_FREE(type, sizeof(*type));
173 }
174
175 static struct kobj_type class_ktype = {
176         .sysfs_ops      = &lustre_sysfs_ops,
177         .release        = class_sysfs_release,
178 };
179
180 #ifdef HAVE_SERVER_SUPPORT
181 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
182 {
183         struct dentry *symlink;
184         struct obd_type *type;
185         int rc;
186
187         type = class_search_type(name);
188         if (type) {
189                 kobject_put(&type->typ_kobj);
190                 return ERR_PTR(-EEXIST);
191         }
192
193         OBD_ALLOC(type, sizeof(*type));
194         if (!type)
195                 return ERR_PTR(-ENOMEM);
196
197         type->typ_kobj.kset = lustre_kset;
198         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
199                                   &lustre_kset->kobj, "%s", name);
200         if (rc)
201                 return ERR_PTR(rc);
202
203         symlink = debugfs_create_dir(name, debugfs_lustre_root);
204         type->typ_debugfs_entry = symlink;
205         type->typ_sym_filter = true;
206
207         if (enable_proc) {
208                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
209                                                       NULL, NULL);
210                 if (IS_ERR(type->typ_procroot)) {
211                         CERROR("%s: can't create compat proc entry: %d\n",
212                                name, (int)PTR_ERR(type->typ_procroot));
213                         type->typ_procroot = NULL;
214                 }
215         }
216
217         return type;
218 }
219 EXPORT_SYMBOL(class_add_symlinks);
220 #endif /* HAVE_SERVER_SUPPORT */
221
222 #define CLASS_MAX_NAME 1024
223
224 int class_register_type(const struct obd_ops *dt_ops,
225                         const struct md_ops *md_ops,
226                         bool enable_proc, struct lprocfs_vars *vars,
227                         const char *name, struct lu_device_type *ldt)
228 {
229         struct obd_type *type;
230         int rc;
231
232         ENTRY;
233         /* sanity check */
234         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
235
236         type = class_search_type(name);
237         if (type) {
238 #ifdef HAVE_SERVER_SUPPORT
239                 if (type->typ_sym_filter)
240                         goto dir_exist;
241 #endif /* HAVE_SERVER_SUPPORT */
242                 kobject_put(&type->typ_kobj);
243                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
244                 RETURN(-EEXIST);
245         }
246
247         OBD_ALLOC(type, sizeof(*type));
248         if (type == NULL)
249                 RETURN(-ENOMEM);
250
251         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
252         type->typ_kobj.kset = lustre_kset;
253         kobject_init(&type->typ_kobj, &class_ktype);
254 #ifdef HAVE_SERVER_SUPPORT
255 dir_exist:
256 #endif /* HAVE_SERVER_SUPPORT */
257
258         type->typ_dt_ops = dt_ops;
259         type->typ_md_ops = md_ops;
260
261 #ifdef HAVE_SERVER_SUPPORT
262         if (type->typ_sym_filter) {
263                 type->typ_sym_filter = false;
264                 kobject_put(&type->typ_kobj);
265                 goto setup_ldt;
266         }
267 #endif
268 #ifdef CONFIG_PROC_FS
269         if (enable_proc && !type->typ_procroot) {
270                 type->typ_procroot = lprocfs_register(name,
271                                                       proc_lustre_root,
272                                                       NULL, type);
273                 if (IS_ERR(type->typ_procroot)) {
274                         rc = PTR_ERR(type->typ_procroot);
275                         type->typ_procroot = NULL;
276                         GOTO(failed, rc);
277                 }
278         }
279 #endif
280         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
281         ldebugfs_add_vars(type->typ_debugfs_entry, vars, type);
282
283         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
284         if (rc)
285                 GOTO(failed, rc);
286 #ifdef HAVE_SERVER_SUPPORT
287 setup_ldt:
288 #endif
289         if (ldt) {
290                 rc = lu_device_type_init(ldt);
291                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
292                 wake_up_var(&type->typ_lu);
293                 if (rc)
294                         GOTO(failed, rc);
295         }
296
297         RETURN(0);
298
299 failed:
300         kobject_put(&type->typ_kobj);
301
302         RETURN(rc);
303 }
304 EXPORT_SYMBOL(class_register_type);
305
306 int class_unregister_type(const char *name)
307 {
308         struct obd_type *type = class_search_type(name);
309         int rc = 0;
310         ENTRY;
311
312         if (!type) {
313                 CERROR("unknown obd type\n");
314                 RETURN(-EINVAL);
315         }
316
317         if (atomic_read(&type->typ_refcnt)) {
318                 CERROR("type %s has refcount (%d)\n", name,
319                        atomic_read(&type->typ_refcnt));
320                 /* This is a bad situation, let's make the best of it */
321                 /* Remove ops, but leave the name for debugging */
322                 type->typ_dt_ops = NULL;
323                 type->typ_md_ops = NULL;
324                 GOTO(out_put, rc = -EBUSY);
325         }
326
327         /* Put the final ref */
328         kobject_put(&type->typ_kobj);
329 out_put:
330         /* Put the ref returned by class_search_type() */
331         kobject_put(&type->typ_kobj);
332
333         RETURN(rc);
334 } /* class_unregister_type */
335 EXPORT_SYMBOL(class_unregister_type);
336
337 /**
338  * Create a new obd device.
339  *
340  * Allocate the new obd_device and initialize it.
341  *
342  * \param[in] type_name obd device type string.
343  * \param[in] name      obd device name.
344  * \param[in] uuid      obd device UUID
345  *
346  * \retval newdev         pointer to created obd_device
347  * \retval ERR_PTR(errno) on error
348  */
349 struct obd_device *class_newdev(const char *type_name, const char *name,
350                                 const char *uuid)
351 {
352         struct obd_device *newdev;
353         struct obd_type *type = NULL;
354         ENTRY;
355
356         if (strlen(name) >= MAX_OBD_NAME) {
357                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
358                 RETURN(ERR_PTR(-EINVAL));
359         }
360
361         type = class_get_type(type_name);
362         if (type == NULL){
363                 CERROR("OBD: unknown type: %s\n", type_name);
364                 RETURN(ERR_PTR(-ENODEV));
365         }
366
367         newdev = obd_device_alloc();
368         if (newdev == NULL) {
369                 class_put_type(type);
370                 RETURN(ERR_PTR(-ENOMEM));
371         }
372         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
373         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
374         newdev->obd_type = type;
375         newdev->obd_minor = -1;
376
377         rwlock_init(&newdev->obd_pool_lock);
378         newdev->obd_pool_limit = 0;
379         newdev->obd_pool_slv = 0;
380
381         INIT_LIST_HEAD(&newdev->obd_exports);
382         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
383         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
384         INIT_LIST_HEAD(&newdev->obd_exports_timed);
385         INIT_LIST_HEAD(&newdev->obd_nid_stats);
386         spin_lock_init(&newdev->obd_nid_lock);
387         spin_lock_init(&newdev->obd_dev_lock);
388         mutex_init(&newdev->obd_dev_mutex);
389         spin_lock_init(&newdev->obd_osfs_lock);
390         /* newdev->obd_osfs_age must be set to a value in the distant
391          * past to guarantee a fresh statfs is fetched on mount. */
392         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
393
394         /* XXX belongs in setup not attach  */
395         init_rwsem(&newdev->obd_observer_link_sem);
396         /* recovery data */
397         spin_lock_init(&newdev->obd_recovery_task_lock);
398         init_waitqueue_head(&newdev->obd_next_transno_waitq);
399         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
400         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
401         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
402         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
403         INIT_LIST_HEAD(&newdev->obd_evict_list);
404         INIT_LIST_HEAD(&newdev->obd_lwp_list);
405
406         llog_group_init(&newdev->obd_olg);
407         /* Detach drops this */
408         atomic_set(&newdev->obd_refcount, 1);
409         lu_ref_init(&newdev->obd_reference);
410         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
411
412         newdev->obd_conn_inprogress = 0;
413
414         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
415
416         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
417                newdev->obd_name, newdev);
418
419         return newdev;
420 }
421
422 /**
423  * Free obd device.
424  *
425  * \param[in] obd obd_device to be freed
426  *
427  * \retval none
428  */
429 void class_free_dev(struct obd_device *obd)
430 {
431         struct obd_type *obd_type = obd->obd_type;
432
433         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
434                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
435         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
436                  "obd %p != obd_devs[%d] %p\n",
437                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
438         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
439                  "obd_refcount should be 0, not %d\n",
440                  atomic_read(&obd->obd_refcount));
441         LASSERT(obd_type != NULL);
442
443         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
444                obd->obd_name, obd->obd_type->typ_name);
445
446         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
447                          obd->obd_name, obd->obd_uuid.uuid);
448         if (obd->obd_stopping) {
449                 int err;
450
451                 /* If we're not stopping, we were never set up */
452                 err = obd_cleanup(obd);
453                 if (err)
454                         CERROR("Cleanup %s returned %d\n",
455                                 obd->obd_name, err);
456         }
457
458         obd_device_free(obd);
459
460         class_put_type(obd_type);
461 }
462
463 /**
464  * Unregister obd device.
465  *
466  * Free slot in obd_dev[] used by \a obd.
467  *
468  * \param[in] new_obd obd_device to be unregistered
469  *
470  * \retval none
471  */
472 void class_unregister_device(struct obd_device *obd)
473 {
474         write_lock(&obd_dev_lock);
475         if (obd->obd_minor >= 0) {
476                 LASSERT(obd_devs[obd->obd_minor] == obd);
477                 obd_devs[obd->obd_minor] = NULL;
478                 obd->obd_minor = -1;
479         }
480         write_unlock(&obd_dev_lock);
481 }
482
483 /**
484  * Register obd device.
485  *
486  * Find free slot in obd_devs[], fills it with \a new_obd.
487  *
488  * \param[in] new_obd obd_device to be registered
489  *
490  * \retval 0          success
491  * \retval -EEXIST    device with this name is registered
492  * \retval -EOVERFLOW obd_devs[] is full
493  */
494 int class_register_device(struct obd_device *new_obd)
495 {
496         int ret = 0;
497         int i;
498         int new_obd_minor = 0;
499         bool minor_assign = false;
500         bool retried = false;
501
502 again:
503         write_lock(&obd_dev_lock);
504         for (i = 0; i < class_devno_max(); i++) {
505                 struct obd_device *obd = class_num2obd(i);
506
507                 if (obd != NULL &&
508                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
509
510                         if (!retried) {
511                                 write_unlock(&obd_dev_lock);
512
513                                 /* the obd_device could be waited to be
514                                  * destroyed by the "obd_zombie_impexp_thread".
515                                  */
516                                 obd_zombie_barrier();
517                                 retried = true;
518                                 goto again;
519                         }
520
521                         CERROR("%s: already exists, won't add\n",
522                                obd->obd_name);
523                         /* in case we found a free slot before duplicate */
524                         minor_assign = false;
525                         ret = -EEXIST;
526                         break;
527                 }
528                 if (!minor_assign && obd == NULL) {
529                         new_obd_minor = i;
530                         minor_assign = true;
531                 }
532         }
533
534         if (minor_assign) {
535                 new_obd->obd_minor = new_obd_minor;
536                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
537                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
538                 obd_devs[new_obd_minor] = new_obd;
539         } else {
540                 if (ret == 0) {
541                         ret = -EOVERFLOW;
542                         CERROR("%s: all %u/%u devices used, increase "
543                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
544                                i, class_devno_max(), ret);
545                 }
546         }
547         write_unlock(&obd_dev_lock);
548
549         RETURN(ret);
550 }
551
552 static int class_name2dev_nolock(const char *name)
553 {
554         int i;
555
556         if (!name)
557                 return -1;
558
559         for (i = 0; i < class_devno_max(); i++) {
560                 struct obd_device *obd = class_num2obd(i);
561
562                 if (obd && strcmp(name, obd->obd_name) == 0) {
563                         /* Make sure we finished attaching before we give
564                            out any references */
565                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
566                         if (obd->obd_attached) {
567                                 return i;
568                         }
569                         break;
570                 }
571         }
572
573         return -1;
574 }
575
576 int class_name2dev(const char *name)
577 {
578         int i;
579
580         if (!name)
581                 return -1;
582
583         read_lock(&obd_dev_lock);
584         i = class_name2dev_nolock(name);
585         read_unlock(&obd_dev_lock);
586
587         return i;
588 }
589 EXPORT_SYMBOL(class_name2dev);
590
591 struct obd_device *class_name2obd(const char *name)
592 {
593         int dev = class_name2dev(name);
594
595         if (dev < 0 || dev > class_devno_max())
596                 return NULL;
597         return class_num2obd(dev);
598 }
599 EXPORT_SYMBOL(class_name2obd);
600
601 int class_uuid2dev_nolock(struct obd_uuid *uuid)
602 {
603         int i;
604
605         for (i = 0; i < class_devno_max(); i++) {
606                 struct obd_device *obd = class_num2obd(i);
607
608                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
609                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
610                         return i;
611                 }
612         }
613
614         return -1;
615 }
616
617 int class_uuid2dev(struct obd_uuid *uuid)
618 {
619         int i;
620
621         read_lock(&obd_dev_lock);
622         i = class_uuid2dev_nolock(uuid);
623         read_unlock(&obd_dev_lock);
624
625         return i;
626 }
627 EXPORT_SYMBOL(class_uuid2dev);
628
629 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
630 {
631         int dev = class_uuid2dev(uuid);
632         if (dev < 0)
633                 return NULL;
634         return class_num2obd(dev);
635 }
636 EXPORT_SYMBOL(class_uuid2obd);
637
638 /**
639  * Get obd device from ::obd_devs[]
640  *
641  * \param num [in] array index
642  *
643  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
644  *         otherwise return the obd device there.
645  */
646 struct obd_device *class_num2obd(int num)
647 {
648         struct obd_device *obd = NULL;
649
650         if (num < class_devno_max()) {
651                 obd = obd_devs[num];
652                 if (obd == NULL)
653                         return NULL;
654
655                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
656                          "%p obd_magic %08x != %08x\n",
657                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
658                 LASSERTF(obd->obd_minor == num,
659                          "%p obd_minor %0d != %0d\n",
660                          obd, obd->obd_minor, num);
661         }
662
663         return obd;
664 }
665 EXPORT_SYMBOL(class_num2obd);
666
667 /**
668  * Find obd in obd_dev[] by name or uuid.
669  *
670  * Increment obd's refcount if found.
671  *
672  * \param[in] str obd name or uuid
673  *
674  * \retval NULL    if not found
675  * \retval target  pointer to found obd_device
676  */
677 struct obd_device *class_dev_by_str(const char *str)
678 {
679         struct obd_device *target = NULL;
680         struct obd_uuid tgtuuid;
681         int rc;
682
683         obd_str2uuid(&tgtuuid, str);
684
685         read_lock(&obd_dev_lock);
686         rc = class_uuid2dev_nolock(&tgtuuid);
687         if (rc < 0)
688                 rc = class_name2dev_nolock(str);
689
690         if (rc >= 0)
691                 target = class_num2obd(rc);
692
693         if (target != NULL)
694                 class_incref(target, "find", current);
695         read_unlock(&obd_dev_lock);
696
697         RETURN(target);
698 }
699 EXPORT_SYMBOL(class_dev_by_str);
700
701 /**
702  * Get obd devices count. Device in any
703  *    state are counted
704  * \retval obd device count
705  */
706 int get_devices_count(void)
707 {
708         int index, max_index = class_devno_max(), dev_count = 0;
709
710         read_lock(&obd_dev_lock);
711         for (index = 0; index <= max_index; index++) {
712                 struct obd_device *obd = class_num2obd(index);
713                 if (obd != NULL)
714                         dev_count++;
715         }
716         read_unlock(&obd_dev_lock);
717
718         return dev_count;
719 }
720 EXPORT_SYMBOL(get_devices_count);
721
722 void class_obd_list(void)
723 {
724         char *status;
725         int i;
726
727         read_lock(&obd_dev_lock);
728         for (i = 0; i < class_devno_max(); i++) {
729                 struct obd_device *obd = class_num2obd(i);
730
731                 if (obd == NULL)
732                         continue;
733                 if (obd->obd_stopping)
734                         status = "ST";
735                 else if (obd->obd_set_up)
736                         status = "UP";
737                 else if (obd->obd_attached)
738                         status = "AT";
739                 else
740                         status = "--";
741                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
742                          i, status, obd->obd_type->typ_name,
743                          obd->obd_name, obd->obd_uuid.uuid,
744                          atomic_read(&obd->obd_refcount));
745         }
746         read_unlock(&obd_dev_lock);
747 }
748
749 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
750  * specified, then only the client with that uuid is returned,
751  * otherwise any client connected to the tgt is returned.
752  */
753 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
754                                          const char *type_name,
755                                          struct obd_uuid *grp_uuid)
756 {
757         int i;
758
759         read_lock(&obd_dev_lock);
760         for (i = 0; i < class_devno_max(); i++) {
761                 struct obd_device *obd = class_num2obd(i);
762
763                 if (obd == NULL)
764                         continue;
765                 if ((strncmp(obd->obd_type->typ_name, type_name,
766                              strlen(type_name)) == 0)) {
767                         if (obd_uuid_equals(tgt_uuid,
768                                             &obd->u.cli.cl_target_uuid) &&
769                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
770                                                          &obd->obd_uuid) : 1)) {
771                                 read_unlock(&obd_dev_lock);
772                                 return obd;
773                         }
774                 }
775         }
776         read_unlock(&obd_dev_lock);
777
778         return NULL;
779 }
780 EXPORT_SYMBOL(class_find_client_obd);
781
782 /* Iterate the obd_device list looking devices have grp_uuid. Start
783  * searching at *next, and if a device is found, the next index to look
784  * at is saved in *next. If next is NULL, then the first matching device
785  * will always be returned.
786  */
787 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
788 {
789         int i;
790
791         if (next == NULL)
792                 i = 0;
793         else if (*next >= 0 && *next < class_devno_max())
794                 i = *next;
795         else
796                 return NULL;
797
798         read_lock(&obd_dev_lock);
799         for (; i < class_devno_max(); i++) {
800                 struct obd_device *obd = class_num2obd(i);
801
802                 if (obd == NULL)
803                         continue;
804                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
805                         if (next != NULL)
806                                 *next = i+1;
807                         read_unlock(&obd_dev_lock);
808                         return obd;
809                 }
810         }
811         read_unlock(&obd_dev_lock);
812
813         return NULL;
814 }
815 EXPORT_SYMBOL(class_devices_in_group);
816
817 /**
818  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
819  * adjust sptlrpc settings accordingly.
820  */
821 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
822 {
823         struct obd_device  *obd;
824         const char         *type;
825         int                 i, rc = 0, rc2;
826
827         LASSERT(namelen > 0);
828
829         read_lock(&obd_dev_lock);
830         for (i = 0; i < class_devno_max(); i++) {
831                 obd = class_num2obd(i);
832
833                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
834                         continue;
835
836                 /* only notify mdc, osc, osp, lwp, mdt, ost
837                  * because only these have a -sptlrpc llog */
838                 type = obd->obd_type->typ_name;
839                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
840                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
841                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
842                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
843                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
844                     strcmp(type, LUSTRE_OST_NAME) != 0)
845                         continue;
846
847                 if (strncmp(obd->obd_name, fsname, namelen))
848                         continue;
849
850                 class_incref(obd, __FUNCTION__, obd);
851                 read_unlock(&obd_dev_lock);
852                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
853                                          sizeof(KEY_SPTLRPC_CONF),
854                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
855                 rc = rc ? rc : rc2;
856                 class_decref(obd, __FUNCTION__, obd);
857                 read_lock(&obd_dev_lock);
858         }
859         read_unlock(&obd_dev_lock);
860         return rc;
861 }
862 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
863
864 void obd_cleanup_caches(void)
865 {
866         ENTRY;
867         if (obd_device_cachep) {
868                 kmem_cache_destroy(obd_device_cachep);
869                 obd_device_cachep = NULL;
870         }
871
872         EXIT;
873 }
874
875 int obd_init_caches(void)
876 {
877         int rc;
878         ENTRY;
879
880         LASSERT(obd_device_cachep == NULL);
881         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
882                                 sizeof(struct obd_device),
883                                 0, 0, 0, sizeof(struct obd_device), NULL);
884         if (!obd_device_cachep)
885                 GOTO(out, rc = -ENOMEM);
886
887         RETURN(0);
888 out:
889         obd_cleanup_caches();
890         RETURN(rc);
891 }
892
893 static const char export_handle_owner[] = "export";
894
895 /* map connection to client */
896 struct obd_export *class_conn2export(struct lustre_handle *conn)
897 {
898         struct obd_export *export;
899         ENTRY;
900
901         if (!conn) {
902                 CDEBUG(D_CACHE, "looking for null handle\n");
903                 RETURN(NULL);
904         }
905
906         if (conn->cookie == -1) {  /* this means assign a new connection */
907                 CDEBUG(D_CACHE, "want a new connection\n");
908                 RETURN(NULL);
909         }
910
911         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
912         export = class_handle2object(conn->cookie, export_handle_owner);
913         RETURN(export);
914 }
915 EXPORT_SYMBOL(class_conn2export);
916
917 struct obd_device *class_exp2obd(struct obd_export *exp)
918 {
919         if (exp)
920                 return exp->exp_obd;
921         return NULL;
922 }
923 EXPORT_SYMBOL(class_exp2obd);
924
925 struct obd_import *class_exp2cliimp(struct obd_export *exp)
926 {
927         struct obd_device *obd = exp->exp_obd;
928         if (obd == NULL)
929                 return NULL;
930         return obd->u.cli.cl_import;
931 }
932 EXPORT_SYMBOL(class_exp2cliimp);
933
934 /* Export management functions */
935 static void class_export_destroy(struct obd_export *exp)
936 {
937         struct obd_device *obd = exp->exp_obd;
938         ENTRY;
939
940         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
941         LASSERT(obd != NULL);
942
943         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
944                exp->exp_client_uuid.uuid, obd->obd_name);
945
946         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
947         ptlrpc_connection_put(exp->exp_connection);
948
949         LASSERT(list_empty(&exp->exp_outstanding_replies));
950         LASSERT(list_empty(&exp->exp_uncommitted_replies));
951         LASSERT(list_empty(&exp->exp_req_replay_queue));
952         LASSERT(list_empty(&exp->exp_hp_rpcs));
953         obd_destroy_export(exp);
954         /* self export doesn't hold a reference to an obd, although it
955          * exists until freeing of the obd */
956         if (exp != obd->obd_self_export)
957                 class_decref(obd, "export", exp);
958
959         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
960         kfree_rcu(exp, exp_handle.h_rcu);
961         EXIT;
962 }
963
964 struct obd_export *class_export_get(struct obd_export *exp)
965 {
966         refcount_inc(&exp->exp_handle.h_ref);
967         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
968                refcount_read(&exp->exp_handle.h_ref));
969         return exp;
970 }
971 EXPORT_SYMBOL(class_export_get);
972
973 void class_export_put(struct obd_export *exp)
974 {
975         LASSERT(exp != NULL);
976         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
977         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
978         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
979                refcount_read(&exp->exp_handle.h_ref) - 1);
980
981         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
982                 struct obd_device *obd = exp->exp_obd;
983
984                 CDEBUG(D_IOCTL, "final put %p/%s\n",
985                        exp, exp->exp_client_uuid.uuid);
986
987                 /* release nid stat refererence */
988                 lprocfs_exp_cleanup(exp);
989
990                 if (exp == obd->obd_self_export) {
991                         /* self export should be destroyed without
992                          * zombie thread as it doesn't hold a
993                          * reference to obd and doesn't hold any
994                          * resources */
995                         class_export_destroy(exp);
996                         /* self export is destroyed, no class
997                          * references exist and it is safe to free
998                          * obd */
999                         class_free_dev(obd);
1000                 } else {
1001                         LASSERT(!list_empty(&exp->exp_obd_chain));
1002                         obd_zombie_export_add(exp);
1003                 }
1004
1005         }
1006 }
1007 EXPORT_SYMBOL(class_export_put);
1008
1009 static void obd_zombie_exp_cull(struct work_struct *ws)
1010 {
1011         struct obd_export *export;
1012
1013         export = container_of(ws, struct obd_export, exp_zombie_work);
1014         class_export_destroy(export);
1015 }
1016
1017 /* Creates a new export, adds it to the hash table, and returns a
1018  * pointer to it. The refcount is 2: one for the hash reference, and
1019  * one for the pointer returned by this function. */
1020 struct obd_export *__class_new_export(struct obd_device *obd,
1021                                       struct obd_uuid *cluuid, bool is_self)
1022 {
1023         struct obd_export *export;
1024         int rc = 0;
1025         ENTRY;
1026
1027         OBD_ALLOC_PTR(export);
1028         if (!export)
1029                 return ERR_PTR(-ENOMEM);
1030
1031         export->exp_conn_cnt = 0;
1032         export->exp_lock_hash = NULL;
1033         export->exp_flock_hash = NULL;
1034         /* 2 = class_handle_hash + last */
1035         refcount_set(&export->exp_handle.h_ref, 2);
1036         atomic_set(&export->exp_rpc_count, 0);
1037         atomic_set(&export->exp_cb_count, 0);
1038         atomic_set(&export->exp_locks_count, 0);
1039 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1040         INIT_LIST_HEAD(&export->exp_locks_list);
1041         spin_lock_init(&export->exp_locks_list_guard);
1042 #endif
1043         atomic_set(&export->exp_replay_count, 0);
1044         export->exp_obd = obd;
1045         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1046         spin_lock_init(&export->exp_uncommitted_replies_lock);
1047         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1048         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1049         INIT_HLIST_NODE(&export->exp_handle.h_link);
1050         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1051         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1052         class_handle_hash(&export->exp_handle, export_handle_owner);
1053         export->exp_last_request_time = ktime_get_real_seconds();
1054         spin_lock_init(&export->exp_lock);
1055         spin_lock_init(&export->exp_rpc_lock);
1056         INIT_HLIST_NODE(&export->exp_gen_hash);
1057         spin_lock_init(&export->exp_bl_list_lock);
1058         INIT_LIST_HEAD(&export->exp_bl_list);
1059         INIT_LIST_HEAD(&export->exp_stale_list);
1060         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1061
1062         export->exp_sp_peer = LUSTRE_SP_ANY;
1063         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1064         export->exp_client_uuid = *cluuid;
1065         obd_init_export(export);
1066
1067         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1068
1069         spin_lock(&obd->obd_dev_lock);
1070         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1071                 /* shouldn't happen, but might race */
1072                 if (obd->obd_stopping)
1073                         GOTO(exit_unlock, rc = -ENODEV);
1074
1075                 rc = obd_uuid_add(obd, export);
1076                 if (rc != 0) {
1077                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1078                                       obd->obd_name, cluuid->uuid, rc);
1079                         GOTO(exit_unlock, rc = -EALREADY);
1080                 }
1081         }
1082
1083         if (!is_self) {
1084                 class_incref(obd, "export", export);
1085                 list_add_tail(&export->exp_obd_chain_timed,
1086                               &obd->obd_exports_timed);
1087                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1088                 obd->obd_num_exports++;
1089         } else {
1090                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1091                 INIT_LIST_HEAD(&export->exp_obd_chain);
1092         }
1093         spin_unlock(&obd->obd_dev_lock);
1094         RETURN(export);
1095
1096 exit_unlock:
1097         spin_unlock(&obd->obd_dev_lock);
1098         class_handle_unhash(&export->exp_handle);
1099         obd_destroy_export(export);
1100         OBD_FREE_PTR(export);
1101         return ERR_PTR(rc);
1102 }
1103
1104 struct obd_export *class_new_export(struct obd_device *obd,
1105                                     struct obd_uuid *uuid)
1106 {
1107         return __class_new_export(obd, uuid, false);
1108 }
1109 EXPORT_SYMBOL(class_new_export);
1110
1111 struct obd_export *class_new_export_self(struct obd_device *obd,
1112                                          struct obd_uuid *uuid)
1113 {
1114         return __class_new_export(obd, uuid, true);
1115 }
1116
1117 void class_unlink_export(struct obd_export *exp)
1118 {
1119         class_handle_unhash(&exp->exp_handle);
1120
1121         if (exp->exp_obd->obd_self_export == exp) {
1122                 class_export_put(exp);
1123                 return;
1124         }
1125
1126         spin_lock(&exp->exp_obd->obd_dev_lock);
1127         /* delete an uuid-export hashitem from hashtables */
1128         if (exp != exp->exp_obd->obd_self_export)
1129                 obd_uuid_del(exp->exp_obd, exp);
1130
1131 #ifdef HAVE_SERVER_SUPPORT
1132         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1133                 struct tg_export_data   *ted = &exp->exp_target_data;
1134                 struct cfs_hash         *hash;
1135
1136                 /* Because obd_gen_hash will not be released until
1137                  * class_cleanup(), so hash should never be NULL here */
1138                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1139                 LASSERT(hash != NULL);
1140                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1141                              &exp->exp_gen_hash);
1142                 cfs_hash_putref(hash);
1143         }
1144 #endif /* HAVE_SERVER_SUPPORT */
1145
1146         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1147         list_del_init(&exp->exp_obd_chain_timed);
1148         exp->exp_obd->obd_num_exports--;
1149         spin_unlock(&exp->exp_obd->obd_dev_lock);
1150         atomic_inc(&obd_stale_export_num);
1151
1152         /* A reference is kept by obd_stale_exports list */
1153         obd_stale_export_put(exp);
1154 }
1155 EXPORT_SYMBOL(class_unlink_export);
1156
1157 /* Import management functions */
1158 static void obd_zombie_import_free(struct obd_import *imp)
1159 {
1160         ENTRY;
1161
1162         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1163                imp->imp_obd->obd_name);
1164
1165         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1166
1167         ptlrpc_connection_put(imp->imp_connection);
1168
1169         while (!list_empty(&imp->imp_conn_list)) {
1170                 struct obd_import_conn *imp_conn;
1171
1172                 imp_conn = list_first_entry(&imp->imp_conn_list,
1173                                             struct obd_import_conn, oic_item);
1174                 list_del_init(&imp_conn->oic_item);
1175                 ptlrpc_connection_put(imp_conn->oic_conn);
1176                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1177         }
1178
1179         LASSERT(imp->imp_sec == NULL);
1180         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1181                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1182         class_decref(imp->imp_obd, "import", imp);
1183         OBD_FREE_PTR(imp);
1184         EXIT;
1185 }
1186
1187 struct obd_import *class_import_get(struct obd_import *import)
1188 {
1189         refcount_inc(&import->imp_refcount);
1190         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1191                refcount_read(&import->imp_refcount),
1192                import->imp_obd->obd_name);
1193         return import;
1194 }
1195 EXPORT_SYMBOL(class_import_get);
1196
1197 void class_import_put(struct obd_import *imp)
1198 {
1199         ENTRY;
1200
1201         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1202
1203         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1204                refcount_read(&imp->imp_refcount) - 1,
1205                imp->imp_obd->obd_name);
1206
1207         if (refcount_dec_and_test(&imp->imp_refcount)) {
1208                 CDEBUG(D_INFO, "final put import %p\n", imp);
1209                 obd_zombie_import_add(imp);
1210         }
1211
1212         EXIT;
1213 }
1214 EXPORT_SYMBOL(class_import_put);
1215
1216 static void init_imp_at(struct imp_at *at) {
1217         int i;
1218         at_init(&at->iat_net_latency, 0, 0);
1219         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1220                 /* max service estimates are tracked on the server side, so
1221                    don't use the AT history here, just use the last reported
1222                    val. (But keep hist for proc histogram, worst_ever) */
1223                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1224                         AT_FLG_NOHIST);
1225         }
1226 }
1227
1228 static void obd_zombie_imp_cull(struct work_struct *ws)
1229 {
1230         struct obd_import *import;
1231
1232         import = container_of(ws, struct obd_import, imp_zombie_work);
1233         obd_zombie_import_free(import);
1234 }
1235
1236 struct obd_import *class_new_import(struct obd_device *obd)
1237 {
1238         struct obd_import *imp;
1239         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1240
1241         OBD_ALLOC(imp, sizeof(*imp));
1242         if (imp == NULL)
1243                 return NULL;
1244
1245         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1246         INIT_LIST_HEAD(&imp->imp_replay_list);
1247         INIT_LIST_HEAD(&imp->imp_sending_list);
1248         INIT_LIST_HEAD(&imp->imp_delayed_list);
1249         INIT_LIST_HEAD(&imp->imp_committed_list);
1250         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1251         imp->imp_known_replied_xid = 0;
1252         imp->imp_replay_cursor = &imp->imp_committed_list;
1253         spin_lock_init(&imp->imp_lock);
1254         imp->imp_last_success_conn = 0;
1255         imp->imp_state = LUSTRE_IMP_NEW;
1256         imp->imp_obd = class_incref(obd, "import", imp);
1257         rwlock_init(&imp->imp_sec_lock);
1258         init_waitqueue_head(&imp->imp_recovery_waitq);
1259         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1260
1261         if (curr_pid_ns && curr_pid_ns->child_reaper)
1262                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1263         else
1264                 imp->imp_sec_refpid = 1;
1265
1266         refcount_set(&imp->imp_refcount, 2);
1267         atomic_set(&imp->imp_unregistering, 0);
1268         atomic_set(&imp->imp_reqs, 0);
1269         atomic_set(&imp->imp_inflight, 0);
1270         atomic_set(&imp->imp_replay_inflight, 0);
1271         init_waitqueue_head(&imp->imp_replay_waitq);
1272         atomic_set(&imp->imp_inval_count, 0);
1273         INIT_LIST_HEAD(&imp->imp_conn_list);
1274         init_imp_at(&imp->imp_at);
1275
1276         /* the default magic is V2, will be used in connect RPC, and
1277          * then adjusted according to the flags in request/reply. */
1278         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1279
1280         return imp;
1281 }
1282 EXPORT_SYMBOL(class_new_import);
1283
1284 void class_destroy_import(struct obd_import *import)
1285 {
1286         LASSERT(import != NULL);
1287         LASSERT(import != LP_POISON);
1288
1289         spin_lock(&import->imp_lock);
1290         import->imp_generation++;
1291         spin_unlock(&import->imp_lock);
1292         class_import_put(import);
1293 }
1294 EXPORT_SYMBOL(class_destroy_import);
1295
1296 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1297
1298 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1299 {
1300         spin_lock(&exp->exp_locks_list_guard);
1301
1302         LASSERT(lock->l_exp_refs_nr >= 0);
1303
1304         if (lock->l_exp_refs_target != NULL &&
1305             lock->l_exp_refs_target != exp) {
1306                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1307                               exp, lock, lock->l_exp_refs_target);
1308         }
1309         if ((lock->l_exp_refs_nr ++) == 0) {
1310                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1311                 lock->l_exp_refs_target = exp;
1312         }
1313         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1314                lock, exp, lock->l_exp_refs_nr);
1315         spin_unlock(&exp->exp_locks_list_guard);
1316 }
1317 EXPORT_SYMBOL(__class_export_add_lock_ref);
1318
1319 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1320 {
1321         spin_lock(&exp->exp_locks_list_guard);
1322         LASSERT(lock->l_exp_refs_nr > 0);
1323         if (lock->l_exp_refs_target != exp) {
1324                 LCONSOLE_WARN("lock %p, "
1325                               "mismatching export pointers: %p, %p\n",
1326                               lock, lock->l_exp_refs_target, exp);
1327         }
1328         if (-- lock->l_exp_refs_nr == 0) {
1329                 list_del_init(&lock->l_exp_refs_link);
1330                 lock->l_exp_refs_target = NULL;
1331         }
1332         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1333                lock, exp, lock->l_exp_refs_nr);
1334         spin_unlock(&exp->exp_locks_list_guard);
1335 }
1336 EXPORT_SYMBOL(__class_export_del_lock_ref);
1337 #endif
1338
1339 /* A connection defines an export context in which preallocation can
1340    be managed. This releases the export pointer reference, and returns
1341    the export handle, so the export refcount is 1 when this function
1342    returns. */
1343 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1344                   struct obd_uuid *cluuid)
1345 {
1346         struct obd_export *export;
1347         LASSERT(conn != NULL);
1348         LASSERT(obd != NULL);
1349         LASSERT(cluuid != NULL);
1350         ENTRY;
1351
1352         export = class_new_export(obd, cluuid);
1353         if (IS_ERR(export))
1354                 RETURN(PTR_ERR(export));
1355
1356         conn->cookie = export->exp_handle.h_cookie;
1357         class_export_put(export);
1358
1359         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1360                cluuid->uuid, conn->cookie);
1361         RETURN(0);
1362 }
1363 EXPORT_SYMBOL(class_connect);
1364
1365 /* if export is involved in recovery then clean up related things */
1366 static void class_export_recovery_cleanup(struct obd_export *exp)
1367 {
1368         struct obd_device *obd = exp->exp_obd;
1369
1370         spin_lock(&obd->obd_recovery_task_lock);
1371         if (obd->obd_recovering) {
1372                 if (exp->exp_in_recovery) {
1373                         spin_lock(&exp->exp_lock);
1374                         exp->exp_in_recovery = 0;
1375                         spin_unlock(&exp->exp_lock);
1376                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1377                         atomic_dec(&obd->obd_connected_clients);
1378                 }
1379
1380                 /* if called during recovery then should update
1381                  * obd_stale_clients counter,
1382                  * lightweight exports are not counted */
1383                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1384                         exp->exp_obd->obd_stale_clients++;
1385         }
1386         spin_unlock(&obd->obd_recovery_task_lock);
1387
1388         spin_lock(&exp->exp_lock);
1389         /** Cleanup req replay fields */
1390         if (exp->exp_req_replay_needed) {
1391                 exp->exp_req_replay_needed = 0;
1392
1393                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1394                 atomic_dec(&obd->obd_req_replay_clients);
1395         }
1396
1397         /** Cleanup lock replay data */
1398         if (exp->exp_lock_replay_needed) {
1399                 exp->exp_lock_replay_needed = 0;
1400
1401                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1402                 atomic_dec(&obd->obd_lock_replay_clients);
1403         }
1404         spin_unlock(&exp->exp_lock);
1405 }
1406
1407 /* This function removes 1-3 references from the export:
1408  * 1 - for export pointer passed
1409  * and if disconnect really need
1410  * 2 - removing from hash
1411  * 3 - in client_unlink_export
1412  * The export pointer passed to this function can destroyed */
1413 int class_disconnect(struct obd_export *export)
1414 {
1415         int already_disconnected;
1416         ENTRY;
1417
1418         if (export == NULL) {
1419                 CWARN("attempting to free NULL export %p\n", export);
1420                 RETURN(-EINVAL);
1421         }
1422
1423         spin_lock(&export->exp_lock);
1424         already_disconnected = export->exp_disconnected;
1425         export->exp_disconnected = 1;
1426 #ifdef HAVE_SERVER_SUPPORT
1427         /*  We hold references of export for uuid hash
1428          *  and nid_hash and export link at least. So
1429          *  it is safe to call rh*table_remove_fast in
1430          *  there.
1431          */
1432         obd_nid_del(export->exp_obd, export);
1433 #endif /* HAVE_SERVER_SUPPORT */
1434         spin_unlock(&export->exp_lock);
1435
1436         /* class_cleanup(), abort_recovery(), and class_fail_export()
1437          * all end up in here, and if any of them race we shouldn't
1438          * call extra class_export_puts(). */
1439         if (already_disconnected)
1440                 GOTO(no_disconn, already_disconnected);
1441
1442         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1443                export->exp_handle.h_cookie);
1444
1445         class_export_recovery_cleanup(export);
1446         class_unlink_export(export);
1447 no_disconn:
1448         class_export_put(export);
1449         RETURN(0);
1450 }
1451 EXPORT_SYMBOL(class_disconnect);
1452
1453 /* Return non-zero for a fully connected export */
1454 int class_connected_export(struct obd_export *exp)
1455 {
1456         int connected = 0;
1457
1458         if (exp) {
1459                 spin_lock(&exp->exp_lock);
1460                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1461                 spin_unlock(&exp->exp_lock);
1462         }
1463         return connected;
1464 }
1465 EXPORT_SYMBOL(class_connected_export);
1466
1467 static void class_disconnect_export_list(struct list_head *list,
1468                                          enum obd_option flags)
1469 {
1470         int rc;
1471         struct obd_export *exp;
1472         ENTRY;
1473
1474         /* It's possible that an export may disconnect itself, but
1475          * nothing else will be added to this list. */
1476         while (!list_empty(list)) {
1477                 exp = list_first_entry(list, struct obd_export,
1478                                        exp_obd_chain);
1479                 /* need for safe call CDEBUG after obd_disconnect */
1480                 class_export_get(exp);
1481
1482                 spin_lock(&exp->exp_lock);
1483                 exp->exp_flags = flags;
1484                 spin_unlock(&exp->exp_lock);
1485
1486                 if (obd_uuid_equals(&exp->exp_client_uuid,
1487                                     &exp->exp_obd->obd_uuid)) {
1488                         CDEBUG(D_HA,
1489                                "exp %p export uuid == obd uuid, don't discon\n",
1490                                exp);
1491                         /* Need to delete this now so we don't end up pointing
1492                          * to work_list later when this export is cleaned up. */
1493                         list_del_init(&exp->exp_obd_chain);
1494                         class_export_put(exp);
1495                         continue;
1496                 }
1497
1498                 class_export_get(exp);
1499                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1500                        "last request at %lld\n",
1501                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1502                        exp, exp->exp_last_request_time);
1503                 /* release one export reference anyway */
1504                 rc = obd_disconnect(exp);
1505
1506                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1507                        obd_export_nid2str(exp), exp, rc);
1508                 class_export_put(exp);
1509         }
1510         EXIT;
1511 }
1512
1513 void class_disconnect_exports(struct obd_device *obd)
1514 {
1515         LIST_HEAD(work_list);
1516         ENTRY;
1517
1518         /* Move all of the exports from obd_exports to a work list, en masse. */
1519         spin_lock(&obd->obd_dev_lock);
1520         list_splice_init(&obd->obd_exports, &work_list);
1521         list_splice_init(&obd->obd_delayed_exports, &work_list);
1522         spin_unlock(&obd->obd_dev_lock);
1523
1524         if (!list_empty(&work_list)) {
1525                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1526                        "disconnecting them\n", obd->obd_minor, obd);
1527                 class_disconnect_export_list(&work_list,
1528                                              exp_flags_from_obd(obd));
1529         } else
1530                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1531                        obd->obd_minor, obd);
1532         EXIT;
1533 }
1534 EXPORT_SYMBOL(class_disconnect_exports);
1535
1536 /* Remove exports that have not completed recovery.
1537  */
1538 void class_disconnect_stale_exports(struct obd_device *obd,
1539                                     int (*test_export)(struct obd_export *))
1540 {
1541         LIST_HEAD(work_list);
1542         struct obd_export *exp, *n;
1543         int evicted = 0;
1544         ENTRY;
1545
1546         spin_lock(&obd->obd_dev_lock);
1547         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1548                                  exp_obd_chain) {
1549                 /* don't count self-export as client */
1550                 if (obd_uuid_equals(&exp->exp_client_uuid,
1551                                     &exp->exp_obd->obd_uuid))
1552                         continue;
1553
1554                 /* don't evict clients which have no slot in last_rcvd
1555                  * (e.g. lightweight connection) */
1556                 if (exp->exp_target_data.ted_lr_idx == -1)
1557                         continue;
1558
1559                 spin_lock(&exp->exp_lock);
1560                 if (exp->exp_failed || test_export(exp)) {
1561                         spin_unlock(&exp->exp_lock);
1562                         continue;
1563                 }
1564                 exp->exp_failed = 1;
1565                 spin_unlock(&exp->exp_lock);
1566
1567                 list_move(&exp->exp_obd_chain, &work_list);
1568                 evicted++;
1569                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1570                        obd->obd_name, exp->exp_client_uuid.uuid,
1571                        obd_export_nid2str(exp));
1572                 print_export_data(exp, "EVICTING", 0, D_HA);
1573         }
1574         spin_unlock(&obd->obd_dev_lock);
1575
1576         if (evicted)
1577                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1578                               obd->obd_name, evicted);
1579
1580         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1581                                                  OBD_OPT_ABORT_RECOV);
1582         EXIT;
1583 }
1584 EXPORT_SYMBOL(class_disconnect_stale_exports);
1585
1586 void class_fail_export(struct obd_export *exp)
1587 {
1588         int rc, already_failed;
1589
1590         spin_lock(&exp->exp_lock);
1591         already_failed = exp->exp_failed;
1592         exp->exp_failed = 1;
1593         spin_unlock(&exp->exp_lock);
1594
1595         if (already_failed) {
1596                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1597                        exp, exp->exp_client_uuid.uuid);
1598                 return;
1599         }
1600
1601         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1602                exp, exp->exp_client_uuid.uuid);
1603
1604         if (obd_dump_on_timeout)
1605                 libcfs_debug_dumplog();
1606
1607         /* need for safe call CDEBUG after obd_disconnect */
1608         class_export_get(exp);
1609
1610         /* Most callers into obd_disconnect are removing their own reference
1611          * (request, for example) in addition to the one from the hash table.
1612          * We don't have such a reference here, so make one. */
1613         class_export_get(exp);
1614         rc = obd_disconnect(exp);
1615         if (rc)
1616                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1617         else
1618                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1619                        exp, exp->exp_client_uuid.uuid);
1620         class_export_put(exp);
1621 }
1622 EXPORT_SYMBOL(class_fail_export);
1623
1624 #ifdef HAVE_SERVER_SUPPORT
1625 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1626 {
1627         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1628         struct obd_export *doomed_exp;
1629         struct rhashtable_iter iter;
1630         int exports_evicted = 0;
1631
1632         spin_lock(&obd->obd_dev_lock);
1633         /* umount has run already, so evict thread should leave
1634          * its task to umount thread now */
1635         if (obd->obd_stopping) {
1636                 spin_unlock(&obd->obd_dev_lock);
1637                 return exports_evicted;
1638         }
1639         spin_unlock(&obd->obd_dev_lock);
1640
1641         rhltable_walk_enter(&obd->obd_nid_hash, &iter);
1642         rhashtable_walk_start(&iter);
1643         while ((doomed_exp = rhashtable_walk_next(&iter)) != NULL) {
1644                 if (IS_ERR(doomed_exp))
1645                         continue;
1646
1647                 if (!doomed_exp->exp_connection ||
1648                     doomed_exp->exp_connection->c_peer.nid != nid_key)
1649                         continue;
1650
1651                 if (!refcount_inc_not_zero(&doomed_exp->exp_handle.h_ref))
1652                         continue;
1653
1654                 rhashtable_walk_stop(&iter);
1655
1656                 LASSERTF(doomed_exp != obd->obd_self_export,
1657                          "self-export is hashed by NID?\n");
1658
1659                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1660                               obd->obd_name,
1661                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1662                               obd_export_nid2str(doomed_exp));
1663
1664                 class_fail_export(doomed_exp);
1665                 class_export_put(doomed_exp);
1666                 exports_evicted++;
1667
1668                 rhashtable_walk_start(&iter);
1669         }
1670         rhashtable_walk_stop(&iter);
1671         rhashtable_walk_exit(&iter);
1672
1673         if (!exports_evicted)
1674                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1675                        obd->obd_name, nid);
1676         return exports_evicted;
1677 }
1678 EXPORT_SYMBOL(obd_export_evict_by_nid);
1679
1680 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1681 {
1682         struct obd_export *doomed_exp = NULL;
1683         struct obd_uuid doomed_uuid;
1684         int exports_evicted = 0;
1685
1686         spin_lock(&obd->obd_dev_lock);
1687         if (obd->obd_stopping) {
1688                 spin_unlock(&obd->obd_dev_lock);
1689                 return exports_evicted;
1690         }
1691         spin_unlock(&obd->obd_dev_lock);
1692
1693         obd_str2uuid(&doomed_uuid, uuid);
1694         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1695                 CERROR("%s: can't evict myself\n", obd->obd_name);
1696                 return exports_evicted;
1697         }
1698
1699         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1700         if (doomed_exp == NULL) {
1701                 CERROR("%s: can't disconnect %s: no exports found\n",
1702                        obd->obd_name, uuid);
1703         } else {
1704                 CWARN("%s: evicting %s at adminstrative request\n",
1705                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1706                 class_fail_export(doomed_exp);
1707                 class_export_put(doomed_exp);
1708                 obd_uuid_del(obd, doomed_exp);
1709                 exports_evicted++;
1710         }
1711
1712         return exports_evicted;
1713 }
1714 #endif /* HAVE_SERVER_SUPPORT */
1715
1716 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1717 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1718 EXPORT_SYMBOL(class_export_dump_hook);
1719 #endif
1720
1721 static void print_export_data(struct obd_export *exp, const char *status,
1722                               int locks, int debug_level)
1723 {
1724         struct ptlrpc_reply_state *rs;
1725         struct ptlrpc_reply_state *first_reply = NULL;
1726         int nreplies = 0;
1727
1728         spin_lock(&exp->exp_lock);
1729         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1730                             rs_exp_list) {
1731                 if (nreplies == 0)
1732                         first_reply = rs;
1733                 nreplies++;
1734         }
1735         spin_unlock(&exp->exp_lock);
1736
1737         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1738                "%p %s %llu stale:%d\n",
1739                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1740                obd_export_nid2str(exp),
1741                refcount_read(&exp->exp_handle.h_ref),
1742                atomic_read(&exp->exp_rpc_count),
1743                atomic_read(&exp->exp_cb_count),
1744                atomic_read(&exp->exp_locks_count),
1745                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1746                nreplies, first_reply, nreplies > 3 ? "..." : "",
1747                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1748 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1749         if (locks && class_export_dump_hook != NULL)
1750                 class_export_dump_hook(exp);
1751 #endif
1752 }
1753
1754 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1755 {
1756         struct obd_export *exp;
1757
1758         spin_lock(&obd->obd_dev_lock);
1759         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1760                 print_export_data(exp, "ACTIVE", locks, debug_level);
1761         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1762                 print_export_data(exp, "UNLINKED", locks, debug_level);
1763         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1764                 print_export_data(exp, "DELAYED", locks, debug_level);
1765         spin_unlock(&obd->obd_dev_lock);
1766 }
1767
1768 void obd_exports_barrier(struct obd_device *obd)
1769 {
1770         int waited = 2;
1771         LASSERT(list_empty(&obd->obd_exports));
1772         spin_lock(&obd->obd_dev_lock);
1773         while (!list_empty(&obd->obd_unlinked_exports)) {
1774                 spin_unlock(&obd->obd_dev_lock);
1775                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1776                 if (waited > 5 && is_power_of_2(waited)) {
1777                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1778                                       "more than %d seconds. "
1779                                       "The obd refcount = %d. Is it stuck?\n",
1780                                       obd->obd_name, waited,
1781                                       atomic_read(&obd->obd_refcount));
1782                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1783                 }
1784                 waited *= 2;
1785                 spin_lock(&obd->obd_dev_lock);
1786         }
1787         spin_unlock(&obd->obd_dev_lock);
1788 }
1789 EXPORT_SYMBOL(obd_exports_barrier);
1790
1791 /**
1792  * Add export to the obd_zombe thread and notify it.
1793  */
1794 static void obd_zombie_export_add(struct obd_export *exp) {
1795         atomic_dec(&obd_stale_export_num);
1796         spin_lock(&exp->exp_obd->obd_dev_lock);
1797         LASSERT(!list_empty(&exp->exp_obd_chain));
1798         list_del_init(&exp->exp_obd_chain);
1799         spin_unlock(&exp->exp_obd->obd_dev_lock);
1800
1801         queue_work(zombie_wq, &exp->exp_zombie_work);
1802 }
1803
1804 /**
1805  * Add import to the obd_zombe thread and notify it.
1806  */
1807 static void obd_zombie_import_add(struct obd_import *imp) {
1808         LASSERT(imp->imp_sec == NULL);
1809
1810         queue_work(zombie_wq, &imp->imp_zombie_work);
1811 }
1812
1813 /**
1814  * wait when obd_zombie import/export queues become empty
1815  */
1816 void obd_zombie_barrier(void)
1817 {
1818         flush_workqueue(zombie_wq);
1819 }
1820 EXPORT_SYMBOL(obd_zombie_barrier);
1821
1822
1823 struct obd_export *obd_stale_export_get(void)
1824 {
1825         struct obd_export *exp = NULL;
1826         ENTRY;
1827
1828         spin_lock(&obd_stale_export_lock);
1829         if (!list_empty(&obd_stale_exports)) {
1830                 exp = list_first_entry(&obd_stale_exports,
1831                                        struct obd_export, exp_stale_list);
1832                 list_del_init(&exp->exp_stale_list);
1833         }
1834         spin_unlock(&obd_stale_export_lock);
1835
1836         if (exp) {
1837                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1838                        atomic_read(&obd_stale_export_num));
1839         }
1840         RETURN(exp);
1841 }
1842 EXPORT_SYMBOL(obd_stale_export_get);
1843
1844 void obd_stale_export_put(struct obd_export *exp)
1845 {
1846         ENTRY;
1847
1848         LASSERT(list_empty(&exp->exp_stale_list));
1849         if (exp->exp_lock_hash &&
1850             atomic_read(&exp->exp_lock_hash->hs_count)) {
1851                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1852                        atomic_read(&obd_stale_export_num));
1853
1854                 spin_lock_bh(&exp->exp_bl_list_lock);
1855                 spin_lock(&obd_stale_export_lock);
1856                 /* Add to the tail if there is no blocked locks,
1857                  * to the head otherwise. */
1858                 if (list_empty(&exp->exp_bl_list))
1859                         list_add_tail(&exp->exp_stale_list,
1860                                       &obd_stale_exports);
1861                 else
1862                         list_add(&exp->exp_stale_list,
1863                                  &obd_stale_exports);
1864
1865                 spin_unlock(&obd_stale_export_lock);
1866                 spin_unlock_bh(&exp->exp_bl_list_lock);
1867         } else {
1868                 class_export_put(exp);
1869         }
1870         EXIT;
1871 }
1872 EXPORT_SYMBOL(obd_stale_export_put);
1873
1874 /**
1875  * Adjust the position of the export in the stale list,
1876  * i.e. move to the head of the list if is needed.
1877  **/
1878 void obd_stale_export_adjust(struct obd_export *exp)
1879 {
1880         LASSERT(exp != NULL);
1881         spin_lock_bh(&exp->exp_bl_list_lock);
1882         spin_lock(&obd_stale_export_lock);
1883
1884         if (!list_empty(&exp->exp_stale_list) &&
1885             !list_empty(&exp->exp_bl_list))
1886                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1887
1888         spin_unlock(&obd_stale_export_lock);
1889         spin_unlock_bh(&exp->exp_bl_list_lock);
1890 }
1891 EXPORT_SYMBOL(obd_stale_export_adjust);
1892
1893 /**
1894  * start destroy zombie import/export thread
1895  */
1896 int obd_zombie_impexp_init(void)
1897 {
1898         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1899                                            0, CFS_CPT_ANY,
1900                                            cfs_cpt_number(cfs_cpt_tab));
1901
1902         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1903 }
1904
1905 /**
1906  * stop destroy zombie import/export thread
1907  */
1908 void obd_zombie_impexp_stop(void)
1909 {
1910         destroy_workqueue(zombie_wq);
1911         LASSERT(list_empty(&obd_stale_exports));
1912 }
1913
1914 /***** Kernel-userspace comm helpers *******/
1915
1916 /* Get length of entire message, including header */
1917 int kuc_len(int payload_len)
1918 {
1919         return sizeof(struct kuc_hdr) + payload_len;
1920 }
1921 EXPORT_SYMBOL(kuc_len);
1922
1923 /* Get a pointer to kuc header, given a ptr to the payload
1924  * @param p Pointer to payload area
1925  * @returns Pointer to kuc header
1926  */
1927 struct kuc_hdr * kuc_ptr(void *p)
1928 {
1929         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1930         LASSERT(lh->kuc_magic == KUC_MAGIC);
1931         return lh;
1932 }
1933 EXPORT_SYMBOL(kuc_ptr);
1934
1935 /* Alloc space for a message, and fill in header
1936  * @return Pointer to payload area
1937  */
1938 void *kuc_alloc(int payload_len, int transport, int type)
1939 {
1940         struct kuc_hdr *lh;
1941         int len = kuc_len(payload_len);
1942
1943         OBD_ALLOC(lh, len);
1944         if (lh == NULL)
1945                 return ERR_PTR(-ENOMEM);
1946
1947         lh->kuc_magic = KUC_MAGIC;
1948         lh->kuc_transport = transport;
1949         lh->kuc_msgtype = type;
1950         lh->kuc_msglen = len;
1951
1952         return (void *)(lh + 1);
1953 }
1954 EXPORT_SYMBOL(kuc_alloc);
1955
1956 /* Takes pointer to payload area */
1957 void kuc_free(void *p, int payload_len)
1958 {
1959         struct kuc_hdr *lh = kuc_ptr(p);
1960         OBD_FREE(lh, kuc_len(payload_len));
1961 }
1962 EXPORT_SYMBOL(kuc_free);
1963
1964 struct obd_request_slot_waiter {
1965         struct list_head        orsw_entry;
1966         wait_queue_head_t       orsw_waitq;
1967         bool                    orsw_signaled;
1968 };
1969
1970 static bool obd_request_slot_avail(struct client_obd *cli,
1971                                    struct obd_request_slot_waiter *orsw)
1972 {
1973         bool avail;
1974
1975         spin_lock(&cli->cl_loi_list_lock);
1976         avail = !!list_empty(&orsw->orsw_entry);
1977         spin_unlock(&cli->cl_loi_list_lock);
1978
1979         return avail;
1980 };
1981
1982 /*
1983  * For network flow control, the RPC sponsor needs to acquire a credit
1984  * before sending the RPC. The credits count for a connection is defined
1985  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1986  * the subsequent RPC sponsors need to wait until others released their
1987  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1988  */
1989 int obd_get_request_slot(struct client_obd *cli)
1990 {
1991         struct obd_request_slot_waiter   orsw;
1992         int                              rc;
1993
1994         spin_lock(&cli->cl_loi_list_lock);
1995         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1996                 cli->cl_rpcs_in_flight++;
1997                 spin_unlock(&cli->cl_loi_list_lock);
1998                 return 0;
1999         }
2000
2001         init_waitqueue_head(&orsw.orsw_waitq);
2002         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2003         orsw.orsw_signaled = false;
2004         spin_unlock(&cli->cl_loi_list_lock);
2005
2006         rc = l_wait_event_abortable(orsw.orsw_waitq,
2007                                     obd_request_slot_avail(cli, &orsw) ||
2008                                     orsw.orsw_signaled);
2009
2010         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2011          * freed but other (such as obd_put_request_slot) is using it. */
2012         spin_lock(&cli->cl_loi_list_lock);
2013         if (rc != 0) {
2014                 if (!orsw.orsw_signaled) {
2015                         if (list_empty(&orsw.orsw_entry))
2016                                 cli->cl_rpcs_in_flight--;
2017                         else
2018                                 list_del(&orsw.orsw_entry);
2019                 }
2020                 rc = -EINTR;
2021         }
2022
2023         if (orsw.orsw_signaled) {
2024                 LASSERT(list_empty(&orsw.orsw_entry));
2025
2026                 rc = -EINTR;
2027         }
2028         spin_unlock(&cli->cl_loi_list_lock);
2029
2030         return rc;
2031 }
2032 EXPORT_SYMBOL(obd_get_request_slot);
2033
2034 void obd_put_request_slot(struct client_obd *cli)
2035 {
2036         struct obd_request_slot_waiter *orsw;
2037
2038         spin_lock(&cli->cl_loi_list_lock);
2039         cli->cl_rpcs_in_flight--;
2040
2041         /* If there is free slot, wakeup the first waiter. */
2042         if (!list_empty(&cli->cl_flight_waiters) &&
2043             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2044                 orsw = list_first_entry(&cli->cl_flight_waiters,
2045                                         struct obd_request_slot_waiter,
2046                                         orsw_entry);
2047                 list_del_init(&orsw->orsw_entry);
2048                 cli->cl_rpcs_in_flight++;
2049                 wake_up(&orsw->orsw_waitq);
2050         }
2051         spin_unlock(&cli->cl_loi_list_lock);
2052 }
2053 EXPORT_SYMBOL(obd_put_request_slot);
2054
2055 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2056 {
2057         return cli->cl_max_rpcs_in_flight;
2058 }
2059 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2060
2061 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2062 {
2063         struct obd_request_slot_waiter *orsw;
2064         __u32                           old;
2065         int                             diff;
2066         int                             i;
2067         int                             rc;
2068
2069         if (max > OBD_MAX_RIF_MAX || max < 1)
2070                 return -ERANGE;
2071
2072         CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2073                cli->cl_import->imp_obd->obd_name, max,
2074                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2075
2076         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2077                    LUSTRE_MDC_NAME) == 0) {
2078                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2079                  * strictly lower that max_rpcs_in_flight */
2080                 if (max < 2) {
2081                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2082                                cli->cl_import->imp_obd->obd_name);
2083                         return -ERANGE;
2084                 }
2085                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2086                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2087                         if (rc != 0)
2088                                 return rc;
2089                 }
2090         }
2091
2092         spin_lock(&cli->cl_loi_list_lock);
2093         old = cli->cl_max_rpcs_in_flight;
2094         cli->cl_max_rpcs_in_flight = max;
2095         client_adjust_max_dirty(cli);
2096
2097         diff = max - old;
2098
2099         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2100         for (i = 0; i < diff; i++) {
2101                 if (list_empty(&cli->cl_flight_waiters))
2102                         break;
2103
2104                 orsw = list_first_entry(&cli->cl_flight_waiters,
2105                                         struct obd_request_slot_waiter,
2106                                         orsw_entry);
2107                 list_del_init(&orsw->orsw_entry);
2108                 cli->cl_rpcs_in_flight++;
2109                 wake_up(&orsw->orsw_waitq);
2110         }
2111         spin_unlock(&cli->cl_loi_list_lock);
2112
2113         return 0;
2114 }
2115 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2116
2117 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2118 {
2119         return cli->cl_max_mod_rpcs_in_flight;
2120 }
2121 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2122
2123 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2124 {
2125         struct obd_connect_data *ocd;
2126         __u16 maxmodrpcs;
2127         __u16 prev;
2128
2129         if (max > OBD_MAX_RIF_MAX || max < 1)
2130                 return -ERANGE;
2131
2132         ocd = &cli->cl_import->imp_connect_data;
2133         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2134                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2135                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2136
2137         if (max == OBD_MAX_RIF_MAX)
2138                 max = OBD_MAX_RIF_MAX - 1;
2139
2140         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2141          * increase this value, also bump up max_rpcs_in_flight to match.
2142          */
2143         if (max >= cli->cl_max_rpcs_in_flight) {
2144                 CDEBUG(D_INFO,
2145                        "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2146                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2147                 obd_set_max_rpcs_in_flight(cli, max + 1);
2148         }
2149
2150         /* cannot exceed max modify RPCs in flight supported by the server,
2151          * but verify ocd_connect_flags is at least initialized first.  If
2152          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2153          */
2154         if (!ocd->ocd_connect_flags) {
2155                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2156         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2157                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2158                 if (maxmodrpcs == 0) { /* connection not finished yet */
2159                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2160                         CDEBUG(D_INFO,
2161                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2162                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2163                 }
2164         } else {
2165                 maxmodrpcs = 1;
2166         }
2167         if (max > maxmodrpcs) {
2168                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2169                        cli->cl_import->imp_obd->obd_name,
2170                        max, maxmodrpcs);
2171                 return -ERANGE;
2172         }
2173
2174         spin_lock(&cli->cl_mod_rpcs_lock);
2175
2176         prev = cli->cl_max_mod_rpcs_in_flight;
2177         cli->cl_max_mod_rpcs_in_flight = max;
2178
2179         /* wakeup waiters if limit has been increased */
2180         if (cli->cl_max_mod_rpcs_in_flight > prev)
2181                 wake_up(&cli->cl_mod_rpcs_waitq);
2182
2183         spin_unlock(&cli->cl_mod_rpcs_lock);
2184
2185         return 0;
2186 }
2187 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2188
2189 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2190                                struct seq_file *seq)
2191 {
2192         unsigned long mod_tot = 0, mod_cum;
2193         struct timespec64 now;
2194         int i;
2195
2196         ktime_get_real_ts64(&now);
2197
2198         spin_lock(&cli->cl_mod_rpcs_lock);
2199
2200         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2201                    (s64)now.tv_sec, now.tv_nsec);
2202         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2203                    cli->cl_mod_rpcs_in_flight);
2204
2205         seq_printf(seq, "\n\t\t\tmodify\n");
2206         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2207
2208         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2209
2210         mod_cum = 0;
2211         for (i = 0; i < OBD_HIST_MAX; i++) {
2212                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2213                 mod_cum += mod;
2214                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2215                            i, mod, pct(mod, mod_tot),
2216                            pct(mod_cum, mod_tot));
2217                 if (mod_cum == mod_tot)
2218                         break;
2219         }
2220
2221         spin_unlock(&cli->cl_mod_rpcs_lock);
2222
2223         return 0;
2224 }
2225 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2226
2227 /* The number of modify RPCs sent in parallel is limited
2228  * because the server has a finite number of slots per client to
2229  * store request result and ensure reply reconstruction when needed.
2230  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2231  * that takes into account server limit and cl_max_rpcs_in_flight
2232  * value.
2233  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2234  * one close request is allowed above the maximum.
2235  */
2236 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2237                                                  bool close_req)
2238 {
2239         bool avail;
2240
2241         /* A slot is available if
2242          * - number of modify RPCs in flight is less than the max
2243          * - it's a close RPC and no other close request is in flight
2244          */
2245         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2246                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2247
2248         return avail;
2249 }
2250
2251 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2252                                          bool close_req)
2253 {
2254         bool avail;
2255
2256         spin_lock(&cli->cl_mod_rpcs_lock);
2257         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2258         spin_unlock(&cli->cl_mod_rpcs_lock);
2259         return avail;
2260 }
2261
2262
2263 /* Get a modify RPC slot from the obd client @cli according
2264  * to the kind of operation @opc that is going to be sent
2265  * and the intent @it of the operation if it applies.
2266  * If the maximum number of modify RPCs in flight is reached
2267  * the thread is put to sleep.
2268  * Returns the tag to be set in the request message. Tag 0
2269  * is reserved for non-modifying requests.
2270  */
2271 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2272 {
2273         bool                    close_req = false;
2274         __u16                   i, max;
2275
2276         if (opc == MDS_CLOSE)
2277                 close_req = true;
2278
2279         do {
2280                 spin_lock(&cli->cl_mod_rpcs_lock);
2281                 max = cli->cl_max_mod_rpcs_in_flight;
2282                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2283                         /* there is a slot available */
2284                         cli->cl_mod_rpcs_in_flight++;
2285                         if (close_req)
2286                                 cli->cl_close_rpcs_in_flight++;
2287                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2288                                          cli->cl_mod_rpcs_in_flight);
2289                         /* find a free tag */
2290                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2291                                                 max + 1);
2292                         LASSERT(i < OBD_MAX_RIF_MAX);
2293                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2294                         spin_unlock(&cli->cl_mod_rpcs_lock);
2295                         /* tag 0 is reserved for non-modify RPCs */
2296
2297                         CDEBUG(D_RPCTRACE,
2298                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2299                                cli->cl_import->imp_obd->obd_name,
2300                                i + 1, opc, max);
2301
2302                         return i + 1;
2303                 }
2304                 spin_unlock(&cli->cl_mod_rpcs_lock);
2305
2306                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2307                        "opc %u, max %hu\n",
2308                        cli->cl_import->imp_obd->obd_name, opc, max);
2309
2310                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2311                                           obd_mod_rpc_slot_avail(cli,
2312                                                                  close_req));
2313         } while (true);
2314 }
2315 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2316
2317 /* Put a modify RPC slot from the obd client @cli according
2318  * to the kind of operation @opc that has been sent.
2319  */
2320 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2321 {
2322         bool                    close_req = false;
2323
2324         if (tag == 0)
2325                 return;
2326
2327         if (opc == MDS_CLOSE)
2328                 close_req = true;
2329
2330         spin_lock(&cli->cl_mod_rpcs_lock);
2331         cli->cl_mod_rpcs_in_flight--;
2332         if (close_req)
2333                 cli->cl_close_rpcs_in_flight--;
2334         /* release the tag in the bitmap */
2335         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2336         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2337         spin_unlock(&cli->cl_mod_rpcs_lock);
2338         wake_up(&cli->cl_mod_rpcs_waitq);
2339 }
2340 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2341