Whamcloud - gitweb
LU-14736 utils: update leak-finder.pl for new format
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
50
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
54
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58                               const char *status, int locks, int debug_level);
59
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
63
64 /*
65  * support functions: we could use inter-module communication, but this
66  * is more portable to other OS's
67  */
68 static struct obd_device *obd_device_alloc(void)
69 {
70         struct obd_device *obd;
71
72         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
73         if (obd != NULL) {
74                 obd->obd_magic = OBD_DEVICE_MAGIC;
75         }
76         return obd;
77 }
78
79 static void obd_device_free(struct obd_device *obd)
80 {
81         LASSERT(obd != NULL);
82         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84         if (obd->obd_namespace != NULL) {
85                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86                        obd, obd->obd_namespace, obd->obd_force);
87                 LBUG();
88         }
89         lu_ref_fini(&obd->obd_reference);
90         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 }
92
93 struct obd_type *class_search_type(const char *name)
94 {
95         struct kobject *kobj = kset_find_obj(lustre_kset, name);
96
97         if (kobj && kobj->ktype == &class_ktype)
98                 return container_of(kobj, struct obd_type, typ_kobj);
99
100         kobject_put(kobj);
101         return NULL;
102 }
103 EXPORT_SYMBOL(class_search_type);
104
105 struct obd_type *class_get_type(const char *name)
106 {
107         struct obd_type *type;
108
109         type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
111         if (!type) {
112                 const char *modname = name;
113
114 #ifdef HAVE_SERVER_SUPPORT
115                 if (strcmp(modname, "obdfilter") == 0)
116                         modname = "ofd";
117
118                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119                         modname = LUSTRE_OSP_NAME;
120
121                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122                         modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
124
125                 if (!request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 if (try_module_get(type->typ_dt_ops->o_owner)) {
136                         atomic_inc(&type->typ_refcnt);
137                         /* class_search_type() returned a counted reference,
138                          * but we don't need that count any more as
139                          * we have one through typ_refcnt.
140                          */
141                         kobject_put(&type->typ_kobj);
142                 } else {
143                         kobject_put(&type->typ_kobj);
144                         type = NULL;
145                 }
146         }
147         return type;
148 }
149 EXPORT_SYMBOL(class_get_type);
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         module_put(type->typ_dt_ops->o_owner);
155         atomic_dec(&type->typ_refcnt);
156 }
157 EXPORT_SYMBOL(class_put_type);
158
159 static void class_sysfs_release(struct kobject *kobj)
160 {
161         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
162
163         debugfs_remove_recursive(type->typ_debugfs_entry);
164         type->typ_debugfs_entry = NULL;
165
166         if (type->typ_lu)
167                 lu_device_type_fini(type->typ_lu);
168
169 #ifdef CONFIG_PROC_FS
170         if (type->typ_name && type->typ_procroot)
171                 remove_proc_subtree(type->typ_name, proc_lustre_root);
172 #endif
173         OBD_FREE(type, sizeof(*type));
174 }
175
176 static struct kobj_type class_ktype = {
177         .sysfs_ops      = &lustre_sysfs_ops,
178         .release        = class_sysfs_release,
179 };
180
181 #ifdef HAVE_SERVER_SUPPORT
182 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
183 {
184         struct dentry *symlink;
185         struct obd_type *type;
186         int rc;
187
188         type = class_search_type(name);
189         if (type) {
190                 kobject_put(&type->typ_kobj);
191                 return ERR_PTR(-EEXIST);
192         }
193
194         OBD_ALLOC(type, sizeof(*type));
195         if (!type)
196                 return ERR_PTR(-ENOMEM);
197
198         type->typ_kobj.kset = lustre_kset;
199         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
200                                   &lustre_kset->kobj, "%s", name);
201         if (rc)
202                 return ERR_PTR(rc);
203
204         symlink = debugfs_create_dir(name, debugfs_lustre_root);
205         type->typ_debugfs_entry = symlink;
206         type->typ_sym_filter = true;
207
208         if (enable_proc) {
209                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
210                                                       NULL, NULL);
211                 if (IS_ERR(type->typ_procroot)) {
212                         CERROR("%s: can't create compat proc entry: %d\n",
213                                name, (int)PTR_ERR(type->typ_procroot));
214                         type->typ_procroot = NULL;
215                 }
216         }
217
218         return type;
219 }
220 EXPORT_SYMBOL(class_add_symlinks);
221 #endif /* HAVE_SERVER_SUPPORT */
222
223 #define CLASS_MAX_NAME 1024
224
225 int class_register_type(const struct obd_ops *dt_ops,
226                         const struct md_ops *md_ops,
227                         bool enable_proc,
228                         const char *name, struct lu_device_type *ldt)
229 {
230         struct obd_type *type;
231         int rc;
232
233         ENTRY;
234         /* sanity check */
235         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
236
237         type = class_search_type(name);
238         if (type) {
239 #ifdef HAVE_SERVER_SUPPORT
240                 if (type->typ_sym_filter)
241                         goto dir_exist;
242 #endif /* HAVE_SERVER_SUPPORT */
243                 kobject_put(&type->typ_kobj);
244                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
245                 RETURN(-EEXIST);
246         }
247
248         OBD_ALLOC(type, sizeof(*type));
249         if (type == NULL)
250                 RETURN(-ENOMEM);
251
252         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
253         type->typ_kobj.kset = lustre_kset;
254         kobject_init(&type->typ_kobj, &class_ktype);
255 #ifdef HAVE_SERVER_SUPPORT
256 dir_exist:
257 #endif /* HAVE_SERVER_SUPPORT */
258
259         type->typ_dt_ops = dt_ops;
260         type->typ_md_ops = md_ops;
261
262 #ifdef HAVE_SERVER_SUPPORT
263         if (type->typ_sym_filter) {
264                 type->typ_sym_filter = false;
265                 kobject_put(&type->typ_kobj);
266                 goto setup_ldt;
267         }
268 #endif
269 #ifdef CONFIG_PROC_FS
270         if (enable_proc && !type->typ_procroot) {
271                 type->typ_procroot = lprocfs_register(name,
272                                                       proc_lustre_root,
273                                                       NULL, type);
274                 if (IS_ERR(type->typ_procroot)) {
275                         rc = PTR_ERR(type->typ_procroot);
276                         type->typ_procroot = NULL;
277                         GOTO(failed, rc);
278                 }
279         }
280 #endif
281         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
282
283         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
284         if (rc)
285                 GOTO(failed, rc);
286 #ifdef HAVE_SERVER_SUPPORT
287 setup_ldt:
288 #endif
289         if (ldt) {
290                 rc = lu_device_type_init(ldt);
291                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
292                 wake_up_var(&type->typ_lu);
293                 if (rc)
294                         GOTO(failed, rc);
295         }
296
297         RETURN(0);
298
299 failed:
300         kobject_put(&type->typ_kobj);
301
302         RETURN(rc);
303 }
304 EXPORT_SYMBOL(class_register_type);
305
306 int class_unregister_type(const char *name)
307 {
308         struct obd_type *type = class_search_type(name);
309         int rc = 0;
310         ENTRY;
311
312         if (!type) {
313                 CERROR("unknown obd type\n");
314                 RETURN(-EINVAL);
315         }
316
317         if (atomic_read(&type->typ_refcnt)) {
318                 CERROR("type %s has refcount (%d)\n", name,
319                        atomic_read(&type->typ_refcnt));
320                 /* This is a bad situation, let's make the best of it */
321                 /* Remove ops, but leave the name for debugging */
322                 type->typ_dt_ops = NULL;
323                 type->typ_md_ops = NULL;
324                 GOTO(out_put, rc = -EBUSY);
325         }
326
327         /* Put the final ref */
328         kobject_put(&type->typ_kobj);
329 out_put:
330         /* Put the ref returned by class_search_type() */
331         kobject_put(&type->typ_kobj);
332
333         RETURN(rc);
334 } /* class_unregister_type */
335 EXPORT_SYMBOL(class_unregister_type);
336
337 /**
338  * Create a new obd device.
339  *
340  * Allocate the new obd_device and initialize it.
341  *
342  * \param[in] type_name obd device type string.
343  * \param[in] name      obd device name.
344  * \param[in] uuid      obd device UUID
345  *
346  * \retval newdev         pointer to created obd_device
347  * \retval ERR_PTR(errno) on error
348  */
349 struct obd_device *class_newdev(const char *type_name, const char *name,
350                                 const char *uuid)
351 {
352         struct obd_device *newdev;
353         struct obd_type *type = NULL;
354         ENTRY;
355
356         if (strlen(name) >= MAX_OBD_NAME) {
357                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
358                 RETURN(ERR_PTR(-EINVAL));
359         }
360
361         type = class_get_type(type_name);
362         if (type == NULL){
363                 CERROR("OBD: unknown type: %s\n", type_name);
364                 RETURN(ERR_PTR(-ENODEV));
365         }
366
367         newdev = obd_device_alloc();
368         if (newdev == NULL) {
369                 class_put_type(type);
370                 RETURN(ERR_PTR(-ENOMEM));
371         }
372         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
373         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
374         newdev->obd_type = type;
375         newdev->obd_minor = -1;
376
377         rwlock_init(&newdev->obd_pool_lock);
378         newdev->obd_pool_limit = 0;
379         newdev->obd_pool_slv = 0;
380
381         INIT_LIST_HEAD(&newdev->obd_exports);
382         newdev->obd_num_exports = 0;
383         newdev->obd_grant_check_threshold = 100;
384         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
385         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
386         INIT_LIST_HEAD(&newdev->obd_exports_timed);
387         INIT_LIST_HEAD(&newdev->obd_nid_stats);
388         spin_lock_init(&newdev->obd_nid_lock);
389         spin_lock_init(&newdev->obd_dev_lock);
390         mutex_init(&newdev->obd_dev_mutex);
391         spin_lock_init(&newdev->obd_osfs_lock);
392         /* newdev->obd_osfs_age must be set to a value in the distant
393          * past to guarantee a fresh statfs is fetched on mount. */
394         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
395
396         /* XXX belongs in setup not attach  */
397         init_rwsem(&newdev->obd_observer_link_sem);
398         /* recovery data */
399         spin_lock_init(&newdev->obd_recovery_task_lock);
400         init_waitqueue_head(&newdev->obd_next_transno_waitq);
401         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
402         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
403         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
404         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
405         INIT_LIST_HEAD(&newdev->obd_evict_list);
406         INIT_LIST_HEAD(&newdev->obd_lwp_list);
407
408         llog_group_init(&newdev->obd_olg);
409         /* Detach drops this */
410         atomic_set(&newdev->obd_refcount, 1);
411         lu_ref_init(&newdev->obd_reference);
412         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
413
414         newdev->obd_conn_inprogress = 0;
415
416         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
417
418         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
419                newdev->obd_name, newdev);
420
421         return newdev;
422 }
423
424 /**
425  * Free obd device.
426  *
427  * \param[in] obd obd_device to be freed
428  *
429  * \retval none
430  */
431 void class_free_dev(struct obd_device *obd)
432 {
433         struct obd_type *obd_type = obd->obd_type;
434
435         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
436                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
437         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
438                  "obd %p != obd_devs[%d] %p\n",
439                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
440         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
441                  "obd_refcount should be 0, not %d\n",
442                  atomic_read(&obd->obd_refcount));
443         LASSERT(obd_type != NULL);
444
445         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
446                obd->obd_name, obd->obd_type->typ_name);
447
448         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
449                          obd->obd_name, obd->obd_uuid.uuid);
450         if (obd->obd_stopping) {
451                 int err;
452
453                 /* If we're not stopping, we were never set up */
454                 err = obd_cleanup(obd);
455                 if (err)
456                         CERROR("Cleanup %s returned %d\n",
457                                 obd->obd_name, err);
458         }
459
460         obd_device_free(obd);
461
462         class_put_type(obd_type);
463 }
464
465 /**
466  * Unregister obd device.
467  *
468  * Free slot in obd_dev[] used by \a obd.
469  *
470  * \param[in] new_obd obd_device to be unregistered
471  *
472  * \retval none
473  */
474 void class_unregister_device(struct obd_device *obd)
475 {
476         write_lock(&obd_dev_lock);
477         if (obd->obd_minor >= 0) {
478                 LASSERT(obd_devs[obd->obd_minor] == obd);
479                 obd_devs[obd->obd_minor] = NULL;
480                 obd->obd_minor = -1;
481         }
482         write_unlock(&obd_dev_lock);
483 }
484
485 /**
486  * Register obd device.
487  *
488  * Find free slot in obd_devs[], fills it with \a new_obd.
489  *
490  * \param[in] new_obd obd_device to be registered
491  *
492  * \retval 0          success
493  * \retval -EEXIST    device with this name is registered
494  * \retval -EOVERFLOW obd_devs[] is full
495  */
496 int class_register_device(struct obd_device *new_obd)
497 {
498         int ret = 0;
499         int i;
500         int new_obd_minor = 0;
501         bool minor_assign = false;
502         bool retried = false;
503
504 again:
505         write_lock(&obd_dev_lock);
506         for (i = 0; i < class_devno_max(); i++) {
507                 struct obd_device *obd = class_num2obd(i);
508
509                 if (obd != NULL &&
510                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
511
512                         if (!retried) {
513                                 write_unlock(&obd_dev_lock);
514
515                                 /* the obd_device could be waited to be
516                                  * destroyed by the "obd_zombie_impexp_thread".
517                                  */
518                                 obd_zombie_barrier();
519                                 retried = true;
520                                 goto again;
521                         }
522
523                         CERROR("%s: already exists, won't add\n",
524                                obd->obd_name);
525                         /* in case we found a free slot before duplicate */
526                         minor_assign = false;
527                         ret = -EEXIST;
528                         break;
529                 }
530                 if (!minor_assign && obd == NULL) {
531                         new_obd_minor = i;
532                         minor_assign = true;
533                 }
534         }
535
536         if (minor_assign) {
537                 new_obd->obd_minor = new_obd_minor;
538                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
539                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
540                 obd_devs[new_obd_minor] = new_obd;
541         } else {
542                 if (ret == 0) {
543                         ret = -EOVERFLOW;
544                         CERROR("%s: all %u/%u devices used, increase "
545                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
546                                i, class_devno_max(), ret);
547                 }
548         }
549         write_unlock(&obd_dev_lock);
550
551         RETURN(ret);
552 }
553
554 static int class_name2dev_nolock(const char *name)
555 {
556         int i;
557
558         if (!name)
559                 return -1;
560
561         for (i = 0; i < class_devno_max(); i++) {
562                 struct obd_device *obd = class_num2obd(i);
563
564                 if (obd && strcmp(name, obd->obd_name) == 0) {
565                         /* Make sure we finished attaching before we give
566                            out any references */
567                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
568                         if (obd->obd_attached) {
569                                 return i;
570                         }
571                         break;
572                 }
573         }
574
575         return -1;
576 }
577
578 int class_name2dev(const char *name)
579 {
580         int i;
581
582         if (!name)
583                 return -1;
584
585         read_lock(&obd_dev_lock);
586         i = class_name2dev_nolock(name);
587         read_unlock(&obd_dev_lock);
588
589         return i;
590 }
591 EXPORT_SYMBOL(class_name2dev);
592
593 struct obd_device *class_name2obd(const char *name)
594 {
595         int dev = class_name2dev(name);
596
597         if (dev < 0 || dev > class_devno_max())
598                 return NULL;
599         return class_num2obd(dev);
600 }
601 EXPORT_SYMBOL(class_name2obd);
602
603 int class_uuid2dev_nolock(struct obd_uuid *uuid)
604 {
605         int i;
606
607         for (i = 0; i < class_devno_max(); i++) {
608                 struct obd_device *obd = class_num2obd(i);
609
610                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
611                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
612                         return i;
613                 }
614         }
615
616         return -1;
617 }
618
619 int class_uuid2dev(struct obd_uuid *uuid)
620 {
621         int i;
622
623         read_lock(&obd_dev_lock);
624         i = class_uuid2dev_nolock(uuid);
625         read_unlock(&obd_dev_lock);
626
627         return i;
628 }
629 EXPORT_SYMBOL(class_uuid2dev);
630
631 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
632 {
633         int dev = class_uuid2dev(uuid);
634         if (dev < 0)
635                 return NULL;
636         return class_num2obd(dev);
637 }
638 EXPORT_SYMBOL(class_uuid2obd);
639
640 /**
641  * Get obd device from ::obd_devs[]
642  *
643  * \param num [in] array index
644  *
645  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
646  *         otherwise return the obd device there.
647  */
648 struct obd_device *class_num2obd(int num)
649 {
650         struct obd_device *obd = NULL;
651
652         if (num < class_devno_max()) {
653                 obd = obd_devs[num];
654                 if (obd == NULL)
655                         return NULL;
656
657                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
658                          "%p obd_magic %08x != %08x\n",
659                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
660                 LASSERTF(obd->obd_minor == num,
661                          "%p obd_minor %0d != %0d\n",
662                          obd, obd->obd_minor, num);
663         }
664
665         return obd;
666 }
667 EXPORT_SYMBOL(class_num2obd);
668
669 /**
670  * Find obd in obd_dev[] by name or uuid.
671  *
672  * Increment obd's refcount if found.
673  *
674  * \param[in] str obd name or uuid
675  *
676  * \retval NULL    if not found
677  * \retval target  pointer to found obd_device
678  */
679 struct obd_device *class_dev_by_str(const char *str)
680 {
681         struct obd_device *target = NULL;
682         struct obd_uuid tgtuuid;
683         int rc;
684
685         obd_str2uuid(&tgtuuid, str);
686
687         read_lock(&obd_dev_lock);
688         rc = class_uuid2dev_nolock(&tgtuuid);
689         if (rc < 0)
690                 rc = class_name2dev_nolock(str);
691
692         if (rc >= 0)
693                 target = class_num2obd(rc);
694
695         if (target != NULL)
696                 class_incref(target, "find", current);
697         read_unlock(&obd_dev_lock);
698
699         RETURN(target);
700 }
701 EXPORT_SYMBOL(class_dev_by_str);
702
703 /**
704  * Get obd devices count. Device in any
705  *    state are counted
706  * \retval obd device count
707  */
708 int get_devices_count(void)
709 {
710         int index, max_index = class_devno_max(), dev_count = 0;
711
712         read_lock(&obd_dev_lock);
713         for (index = 0; index <= max_index; index++) {
714                 struct obd_device *obd = class_num2obd(index);
715                 if (obd != NULL)
716                         dev_count++;
717         }
718         read_unlock(&obd_dev_lock);
719
720         return dev_count;
721 }
722 EXPORT_SYMBOL(get_devices_count);
723
724 void class_obd_list(void)
725 {
726         char *status;
727         int i;
728
729         read_lock(&obd_dev_lock);
730         for (i = 0; i < class_devno_max(); i++) {
731                 struct obd_device *obd = class_num2obd(i);
732
733                 if (obd == NULL)
734                         continue;
735                 if (obd->obd_stopping)
736                         status = "ST";
737                 else if (obd->obd_set_up)
738                         status = "UP";
739                 else if (obd->obd_attached)
740                         status = "AT";
741                 else
742                         status = "--";
743                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
744                          i, status, obd->obd_type->typ_name,
745                          obd->obd_name, obd->obd_uuid.uuid,
746                          atomic_read(&obd->obd_refcount));
747         }
748         read_unlock(&obd_dev_lock);
749 }
750
751 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
752  * specified, then only the client with that uuid is returned,
753  * otherwise any client connected to the tgt is returned.
754  */
755 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
756                                          const char *type_name,
757                                          struct obd_uuid *grp_uuid)
758 {
759         int i;
760
761         read_lock(&obd_dev_lock);
762         for (i = 0; i < class_devno_max(); i++) {
763                 struct obd_device *obd = class_num2obd(i);
764
765                 if (obd == NULL)
766                         continue;
767                 if ((strncmp(obd->obd_type->typ_name, type_name,
768                              strlen(type_name)) == 0)) {
769                         if (obd_uuid_equals(tgt_uuid,
770                                             &obd->u.cli.cl_target_uuid) &&
771                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
772                                                          &obd->obd_uuid) : 1)) {
773                                 read_unlock(&obd_dev_lock);
774                                 return obd;
775                         }
776                 }
777         }
778         read_unlock(&obd_dev_lock);
779
780         return NULL;
781 }
782 EXPORT_SYMBOL(class_find_client_obd);
783
784 /* Iterate the obd_device list looking devices have grp_uuid. Start
785  * searching at *next, and if a device is found, the next index to look
786  * at is saved in *next. If next is NULL, then the first matching device
787  * will always be returned.
788  */
789 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
790 {
791         int i;
792
793         if (next == NULL)
794                 i = 0;
795         else if (*next >= 0 && *next < class_devno_max())
796                 i = *next;
797         else
798                 return NULL;
799
800         read_lock(&obd_dev_lock);
801         for (; i < class_devno_max(); i++) {
802                 struct obd_device *obd = class_num2obd(i);
803
804                 if (obd == NULL)
805                         continue;
806                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
807                         if (next != NULL)
808                                 *next = i+1;
809                         read_unlock(&obd_dev_lock);
810                         return obd;
811                 }
812         }
813         read_unlock(&obd_dev_lock);
814
815         return NULL;
816 }
817 EXPORT_SYMBOL(class_devices_in_group);
818
819 /**
820  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
821  * adjust sptlrpc settings accordingly.
822  */
823 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
824 {
825         struct obd_device  *obd;
826         const char         *type;
827         int                 i, rc = 0, rc2;
828
829         LASSERT(namelen > 0);
830
831         read_lock(&obd_dev_lock);
832         for (i = 0; i < class_devno_max(); i++) {
833                 obd = class_num2obd(i);
834
835                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
836                         continue;
837
838                 /* only notify mdc, osc, osp, lwp, mdt, ost
839                  * because only these have a -sptlrpc llog */
840                 type = obd->obd_type->typ_name;
841                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
842                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
843                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
844                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
845                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
846                     strcmp(type, LUSTRE_OST_NAME) != 0)
847                         continue;
848
849                 if (strncmp(obd->obd_name, fsname, namelen))
850                         continue;
851
852                 class_incref(obd, __FUNCTION__, obd);
853                 read_unlock(&obd_dev_lock);
854                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
855                                          sizeof(KEY_SPTLRPC_CONF),
856                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
857                 rc = rc ? rc : rc2;
858                 class_decref(obd, __FUNCTION__, obd);
859                 read_lock(&obd_dev_lock);
860         }
861         read_unlock(&obd_dev_lock);
862         return rc;
863 }
864 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
865
866 void obd_cleanup_caches(void)
867 {
868         ENTRY;
869         if (obd_device_cachep) {
870                 kmem_cache_destroy(obd_device_cachep);
871                 obd_device_cachep = NULL;
872         }
873
874         EXIT;
875 }
876
877 int obd_init_caches(void)
878 {
879         int rc;
880         ENTRY;
881
882         LASSERT(obd_device_cachep == NULL);
883         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
884                                 sizeof(struct obd_device),
885                                 0, 0, 0, sizeof(struct obd_device), NULL);
886         if (!obd_device_cachep)
887                 GOTO(out, rc = -ENOMEM);
888
889         RETURN(0);
890 out:
891         obd_cleanup_caches();
892         RETURN(rc);
893 }
894
895 static const char export_handle_owner[] = "export";
896
897 /* map connection to client */
898 struct obd_export *class_conn2export(struct lustre_handle *conn)
899 {
900         struct obd_export *export;
901         ENTRY;
902
903         if (!conn) {
904                 CDEBUG(D_CACHE, "looking for null handle\n");
905                 RETURN(NULL);
906         }
907
908         if (conn->cookie == -1) {  /* this means assign a new connection */
909                 CDEBUG(D_CACHE, "want a new connection\n");
910                 RETURN(NULL);
911         }
912
913         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
914         export = class_handle2object(conn->cookie, export_handle_owner);
915         RETURN(export);
916 }
917 EXPORT_SYMBOL(class_conn2export);
918
919 struct obd_device *class_exp2obd(struct obd_export *exp)
920 {
921         if (exp)
922                 return exp->exp_obd;
923         return NULL;
924 }
925 EXPORT_SYMBOL(class_exp2obd);
926
927 struct obd_import *class_exp2cliimp(struct obd_export *exp)
928 {
929         struct obd_device *obd = exp->exp_obd;
930         if (obd == NULL)
931                 return NULL;
932         return obd->u.cli.cl_import;
933 }
934 EXPORT_SYMBOL(class_exp2cliimp);
935
936 /* Export management functions */
937 static void class_export_destroy(struct obd_export *exp)
938 {
939         struct obd_device *obd = exp->exp_obd;
940         ENTRY;
941
942         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
943         LASSERT(obd != NULL);
944
945         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
946                exp->exp_client_uuid.uuid, obd->obd_name);
947
948         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
949         ptlrpc_connection_put(exp->exp_connection);
950
951         LASSERT(list_empty(&exp->exp_outstanding_replies));
952         LASSERT(list_empty(&exp->exp_uncommitted_replies));
953         LASSERT(list_empty(&exp->exp_req_replay_queue));
954         LASSERT(list_empty(&exp->exp_hp_rpcs));
955         obd_destroy_export(exp);
956         /* self export doesn't hold a reference to an obd, although it
957          * exists until freeing of the obd */
958         if (exp != obd->obd_self_export)
959                 class_decref(obd, "export", exp);
960
961         OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
962         kfree_rcu(exp, exp_handle.h_rcu);
963         EXIT;
964 }
965
966 struct obd_export *class_export_get(struct obd_export *exp)
967 {
968         refcount_inc(&exp->exp_handle.h_ref);
969         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
970                refcount_read(&exp->exp_handle.h_ref));
971         return exp;
972 }
973 EXPORT_SYMBOL(class_export_get);
974
975 void class_export_put(struct obd_export *exp)
976 {
977         LASSERT(exp != NULL);
978         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
979         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
980         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
981                refcount_read(&exp->exp_handle.h_ref) - 1);
982
983         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
984                 struct obd_device *obd = exp->exp_obd;
985
986                 CDEBUG(D_IOCTL, "final put %p/%s\n",
987                        exp, exp->exp_client_uuid.uuid);
988
989                 /* release nid stat refererence */
990                 lprocfs_exp_cleanup(exp);
991
992                 if (exp == obd->obd_self_export) {
993                         /* self export should be destroyed without
994                          * zombie thread as it doesn't hold a
995                          * reference to obd and doesn't hold any
996                          * resources */
997                         class_export_destroy(exp);
998                         /* self export is destroyed, no class
999                          * references exist and it is safe to free
1000                          * obd */
1001                         class_free_dev(obd);
1002                 } else {
1003                         LASSERT(!list_empty(&exp->exp_obd_chain));
1004                         obd_zombie_export_add(exp);
1005                 }
1006
1007         }
1008 }
1009 EXPORT_SYMBOL(class_export_put);
1010
1011 static void obd_zombie_exp_cull(struct work_struct *ws)
1012 {
1013         struct obd_export *export;
1014
1015         export = container_of(ws, struct obd_export, exp_zombie_work);
1016         class_export_destroy(export);
1017 }
1018
1019 /* Creates a new export, adds it to the hash table, and returns a
1020  * pointer to it. The refcount is 2: one for the hash reference, and
1021  * one for the pointer returned by this function. */
1022 struct obd_export *__class_new_export(struct obd_device *obd,
1023                                       struct obd_uuid *cluuid, bool is_self)
1024 {
1025         struct obd_export *export;
1026         int rc = 0;
1027         ENTRY;
1028
1029         OBD_ALLOC_PTR(export);
1030         if (!export)
1031                 return ERR_PTR(-ENOMEM);
1032
1033         export->exp_conn_cnt = 0;
1034         export->exp_lock_hash = NULL;
1035         export->exp_flock_hash = NULL;
1036         /* 2 = class_handle_hash + last */
1037         refcount_set(&export->exp_handle.h_ref, 2);
1038         atomic_set(&export->exp_rpc_count, 0);
1039         atomic_set(&export->exp_cb_count, 0);
1040         atomic_set(&export->exp_locks_count, 0);
1041 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1042         INIT_LIST_HEAD(&export->exp_locks_list);
1043         spin_lock_init(&export->exp_locks_list_guard);
1044 #endif
1045         atomic_set(&export->exp_replay_count, 0);
1046         export->exp_obd = obd;
1047         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1048         spin_lock_init(&export->exp_uncommitted_replies_lock);
1049         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1050         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1051         INIT_HLIST_NODE(&export->exp_handle.h_link);
1052         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1053         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1054         class_handle_hash(&export->exp_handle, export_handle_owner);
1055         export->exp_last_request_time = ktime_get_real_seconds();
1056         spin_lock_init(&export->exp_lock);
1057         spin_lock_init(&export->exp_rpc_lock);
1058         INIT_HLIST_NODE(&export->exp_gen_hash);
1059         spin_lock_init(&export->exp_bl_list_lock);
1060         INIT_LIST_HEAD(&export->exp_bl_list);
1061         INIT_LIST_HEAD(&export->exp_stale_list);
1062         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1063
1064         export->exp_sp_peer = LUSTRE_SP_ANY;
1065         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1066         export->exp_client_uuid = *cluuid;
1067         obd_init_export(export);
1068
1069         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1070
1071         spin_lock(&obd->obd_dev_lock);
1072         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1073                 /* shouldn't happen, but might race */
1074                 if (obd->obd_stopping)
1075                         GOTO(exit_unlock, rc = -ENODEV);
1076
1077                 rc = obd_uuid_add(obd, export);
1078                 if (rc != 0) {
1079                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1080                                       obd->obd_name, cluuid->uuid, rc);
1081                         GOTO(exit_unlock, rc = -EALREADY);
1082                 }
1083         }
1084
1085         if (!is_self) {
1086                 class_incref(obd, "export", export);
1087                 list_add_tail(&export->exp_obd_chain_timed,
1088                               &obd->obd_exports_timed);
1089                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1090                 obd->obd_num_exports++;
1091         } else {
1092                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1093                 INIT_LIST_HEAD(&export->exp_obd_chain);
1094         }
1095         spin_unlock(&obd->obd_dev_lock);
1096         RETURN(export);
1097
1098 exit_unlock:
1099         spin_unlock(&obd->obd_dev_lock);
1100         class_handle_unhash(&export->exp_handle);
1101         obd_destroy_export(export);
1102         OBD_FREE_PTR(export);
1103         return ERR_PTR(rc);
1104 }
1105
1106 struct obd_export *class_new_export(struct obd_device *obd,
1107                                     struct obd_uuid *uuid)
1108 {
1109         return __class_new_export(obd, uuid, false);
1110 }
1111 EXPORT_SYMBOL(class_new_export);
1112
1113 struct obd_export *class_new_export_self(struct obd_device *obd,
1114                                          struct obd_uuid *uuid)
1115 {
1116         return __class_new_export(obd, uuid, true);
1117 }
1118
1119 void class_unlink_export(struct obd_export *exp)
1120 {
1121         class_handle_unhash(&exp->exp_handle);
1122
1123         if (exp->exp_obd->obd_self_export == exp) {
1124                 class_export_put(exp);
1125                 return;
1126         }
1127
1128         spin_lock(&exp->exp_obd->obd_dev_lock);
1129         /* delete an uuid-export hashitem from hashtables */
1130         if (exp != exp->exp_obd->obd_self_export)
1131                 obd_uuid_del(exp->exp_obd, exp);
1132
1133 #ifdef HAVE_SERVER_SUPPORT
1134         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1135                 struct tg_export_data   *ted = &exp->exp_target_data;
1136                 struct cfs_hash         *hash;
1137
1138                 /* Because obd_gen_hash will not be released until
1139                  * class_cleanup(), so hash should never be NULL here */
1140                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1141                 LASSERT(hash != NULL);
1142                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1143                              &exp->exp_gen_hash);
1144                 cfs_hash_putref(hash);
1145         }
1146 #endif /* HAVE_SERVER_SUPPORT */
1147
1148         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1149         list_del_init(&exp->exp_obd_chain_timed);
1150         exp->exp_obd->obd_num_exports--;
1151         spin_unlock(&exp->exp_obd->obd_dev_lock);
1152         atomic_inc(&obd_stale_export_num);
1153
1154         /* A reference is kept by obd_stale_exports list */
1155         obd_stale_export_put(exp);
1156 }
1157 EXPORT_SYMBOL(class_unlink_export);
1158
1159 /* Import management functions */
1160 static void obd_zombie_import_free(struct obd_import *imp)
1161 {
1162         struct obd_import_conn *imp_conn;
1163
1164         ENTRY;
1165         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1166                imp->imp_obd->obd_name);
1167
1168         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1169
1170         ptlrpc_connection_put(imp->imp_connection);
1171
1172         while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1173                                                     struct obd_import_conn,
1174                                                     oic_item)) != NULL) {
1175                 list_del_init(&imp_conn->oic_item);
1176                 ptlrpc_connection_put(imp_conn->oic_conn);
1177                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1178         }
1179
1180         LASSERT(imp->imp_sec == NULL);
1181         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1182                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1183         class_decref(imp->imp_obd, "import", imp);
1184         OBD_FREE_PTR(imp);
1185         EXIT;
1186 }
1187
1188 struct obd_import *class_import_get(struct obd_import *import)
1189 {
1190         refcount_inc(&import->imp_refcount);
1191         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1192                refcount_read(&import->imp_refcount),
1193                import->imp_obd->obd_name);
1194         return import;
1195 }
1196 EXPORT_SYMBOL(class_import_get);
1197
1198 void class_import_put(struct obd_import *imp)
1199 {
1200         ENTRY;
1201
1202         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1203
1204         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1205                refcount_read(&imp->imp_refcount) - 1,
1206                imp->imp_obd->obd_name);
1207
1208         if (refcount_dec_and_test(&imp->imp_refcount)) {
1209                 CDEBUG(D_INFO, "final put import %p\n", imp);
1210                 obd_zombie_import_add(imp);
1211         }
1212
1213         EXIT;
1214 }
1215 EXPORT_SYMBOL(class_import_put);
1216
1217 static void init_imp_at(struct imp_at *at) {
1218         int i;
1219         at_init(&at->iat_net_latency, 0, 0);
1220         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1221                 /* max service estimates are tracked on the server side, so
1222                    don't use the AT history here, just use the last reported
1223                    val. (But keep hist for proc histogram, worst_ever) */
1224                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1225                         AT_FLG_NOHIST);
1226         }
1227 }
1228
1229 static void obd_zombie_imp_cull(struct work_struct *ws)
1230 {
1231         struct obd_import *import;
1232
1233         import = container_of(ws, struct obd_import, imp_zombie_work);
1234         obd_zombie_import_free(import);
1235 }
1236
1237 struct obd_import *class_new_import(struct obd_device *obd)
1238 {
1239         struct obd_import *imp;
1240         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1241
1242         OBD_ALLOC(imp, sizeof(*imp));
1243         if (imp == NULL)
1244                 return NULL;
1245
1246         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1247         INIT_LIST_HEAD(&imp->imp_replay_list);
1248         INIT_LIST_HEAD(&imp->imp_sending_list);
1249         INIT_LIST_HEAD(&imp->imp_delayed_list);
1250         INIT_LIST_HEAD(&imp->imp_committed_list);
1251         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1252         imp->imp_known_replied_xid = 0;
1253         imp->imp_replay_cursor = &imp->imp_committed_list;
1254         spin_lock_init(&imp->imp_lock);
1255         imp->imp_last_success_conn = 0;
1256         imp->imp_state = LUSTRE_IMP_NEW;
1257         imp->imp_obd = class_incref(obd, "import", imp);
1258         rwlock_init(&imp->imp_sec_lock);
1259         init_waitqueue_head(&imp->imp_recovery_waitq);
1260         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1261
1262         if (curr_pid_ns && curr_pid_ns->child_reaper)
1263                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1264         else
1265                 imp->imp_sec_refpid = 1;
1266
1267         refcount_set(&imp->imp_refcount, 2);
1268         atomic_set(&imp->imp_unregistering, 0);
1269         atomic_set(&imp->imp_reqs, 0);
1270         atomic_set(&imp->imp_inflight, 0);
1271         atomic_set(&imp->imp_replay_inflight, 0);
1272         init_waitqueue_head(&imp->imp_replay_waitq);
1273         atomic_set(&imp->imp_inval_count, 0);
1274         atomic_set(&imp->imp_waiting, 0);
1275         INIT_LIST_HEAD(&imp->imp_conn_list);
1276         init_imp_at(&imp->imp_at);
1277
1278         /* the default magic is V2, will be used in connect RPC, and
1279          * then adjusted according to the flags in request/reply. */
1280         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1281
1282         return imp;
1283 }
1284 EXPORT_SYMBOL(class_new_import);
1285
1286 void class_destroy_import(struct obd_import *import)
1287 {
1288         LASSERT(import != NULL);
1289         LASSERT(import != LP_POISON);
1290
1291         spin_lock(&import->imp_lock);
1292         import->imp_generation++;
1293         spin_unlock(&import->imp_lock);
1294         class_import_put(import);
1295 }
1296 EXPORT_SYMBOL(class_destroy_import);
1297
1298 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1299
1300 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1301 {
1302         spin_lock(&exp->exp_locks_list_guard);
1303
1304         LASSERT(lock->l_exp_refs_nr >= 0);
1305
1306         if (lock->l_exp_refs_target != NULL &&
1307             lock->l_exp_refs_target != exp) {
1308                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1309                               exp, lock, lock->l_exp_refs_target);
1310         }
1311         if ((lock->l_exp_refs_nr ++) == 0) {
1312                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1313                 lock->l_exp_refs_target = exp;
1314         }
1315         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1316                lock, exp, lock->l_exp_refs_nr);
1317         spin_unlock(&exp->exp_locks_list_guard);
1318 }
1319 EXPORT_SYMBOL(__class_export_add_lock_ref);
1320
1321 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1322 {
1323         spin_lock(&exp->exp_locks_list_guard);
1324         LASSERT(lock->l_exp_refs_nr > 0);
1325         if (lock->l_exp_refs_target != exp) {
1326                 LCONSOLE_WARN("lock %p, "
1327                               "mismatching export pointers: %p, %p\n",
1328                               lock, lock->l_exp_refs_target, exp);
1329         }
1330         if (-- lock->l_exp_refs_nr == 0) {
1331                 list_del_init(&lock->l_exp_refs_link);
1332                 lock->l_exp_refs_target = NULL;
1333         }
1334         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1335                lock, exp, lock->l_exp_refs_nr);
1336         spin_unlock(&exp->exp_locks_list_guard);
1337 }
1338 EXPORT_SYMBOL(__class_export_del_lock_ref);
1339 #endif
1340
1341 /* A connection defines an export context in which preallocation can
1342    be managed. This releases the export pointer reference, and returns
1343    the export handle, so the export refcount is 1 when this function
1344    returns. */
1345 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1346                   struct obd_uuid *cluuid)
1347 {
1348         struct obd_export *export;
1349         LASSERT(conn != NULL);
1350         LASSERT(obd != NULL);
1351         LASSERT(cluuid != NULL);
1352         ENTRY;
1353
1354         export = class_new_export(obd, cluuid);
1355         if (IS_ERR(export))
1356                 RETURN(PTR_ERR(export));
1357
1358         conn->cookie = export->exp_handle.h_cookie;
1359         class_export_put(export);
1360
1361         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1362                cluuid->uuid, conn->cookie);
1363         RETURN(0);
1364 }
1365 EXPORT_SYMBOL(class_connect);
1366
1367 /* if export is involved in recovery then clean up related things */
1368 static void class_export_recovery_cleanup(struct obd_export *exp)
1369 {
1370         struct obd_device *obd = exp->exp_obd;
1371
1372         spin_lock(&obd->obd_recovery_task_lock);
1373         if (obd->obd_recovering) {
1374                 if (exp->exp_in_recovery) {
1375                         spin_lock(&exp->exp_lock);
1376                         exp->exp_in_recovery = 0;
1377                         spin_unlock(&exp->exp_lock);
1378                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1379                         atomic_dec(&obd->obd_connected_clients);
1380                 }
1381
1382                 /* if called during recovery then should update
1383                  * obd_stale_clients counter,
1384                  * lightweight exports are not counted */
1385                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1386                         exp->exp_obd->obd_stale_clients++;
1387         }
1388         spin_unlock(&obd->obd_recovery_task_lock);
1389
1390         spin_lock(&exp->exp_lock);
1391         /** Cleanup req replay fields */
1392         if (exp->exp_req_replay_needed) {
1393                 exp->exp_req_replay_needed = 0;
1394
1395                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1396                 atomic_dec(&obd->obd_req_replay_clients);
1397         }
1398
1399         /** Cleanup lock replay data */
1400         if (exp->exp_lock_replay_needed) {
1401                 exp->exp_lock_replay_needed = 0;
1402
1403                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1404                 atomic_dec(&obd->obd_lock_replay_clients);
1405         }
1406         spin_unlock(&exp->exp_lock);
1407 }
1408
1409 /* This function removes 1-3 references from the export:
1410  * 1 - for export pointer passed
1411  * and if disconnect really need
1412  * 2 - removing from hash
1413  * 3 - in client_unlink_export
1414  * The export pointer passed to this function can destroyed */
1415 int class_disconnect(struct obd_export *export)
1416 {
1417         int already_disconnected;
1418         ENTRY;
1419
1420         if (export == NULL) {
1421                 CWARN("attempting to free NULL export %p\n", export);
1422                 RETURN(-EINVAL);
1423         }
1424
1425         spin_lock(&export->exp_lock);
1426         already_disconnected = export->exp_disconnected;
1427         export->exp_disconnected = 1;
1428 #ifdef HAVE_SERVER_SUPPORT
1429         /*  We hold references of export for uuid hash
1430          *  and nid_hash and export link at least. So
1431          *  it is safe to call rh*table_remove_fast in
1432          *  there.
1433          */
1434         obd_nid_del(export->exp_obd, export);
1435 #endif /* HAVE_SERVER_SUPPORT */
1436         spin_unlock(&export->exp_lock);
1437
1438         /* class_cleanup(), abort_recovery(), and class_fail_export()
1439          * all end up in here, and if any of them race we shouldn't
1440          * call extra class_export_puts(). */
1441         if (already_disconnected)
1442                 GOTO(no_disconn, already_disconnected);
1443
1444         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1445                export->exp_handle.h_cookie);
1446
1447         class_export_recovery_cleanup(export);
1448         class_unlink_export(export);
1449 no_disconn:
1450         class_export_put(export);
1451         RETURN(0);
1452 }
1453 EXPORT_SYMBOL(class_disconnect);
1454
1455 /* Return non-zero for a fully connected export */
1456 int class_connected_export(struct obd_export *exp)
1457 {
1458         int connected = 0;
1459
1460         if (exp) {
1461                 spin_lock(&exp->exp_lock);
1462                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1463                 spin_unlock(&exp->exp_lock);
1464         }
1465         return connected;
1466 }
1467 EXPORT_SYMBOL(class_connected_export);
1468
1469 static void class_disconnect_export_list(struct list_head *list,
1470                                          enum obd_option flags)
1471 {
1472         int rc;
1473         struct obd_export *exp;
1474         ENTRY;
1475
1476         /* It's possible that an export may disconnect itself, but
1477          * nothing else will be added to this list.
1478          */
1479         while ((exp = list_first_entry_or_null(list, struct obd_export,
1480                                                exp_obd_chain)) != NULL) {
1481                 /* need for safe call CDEBUG after obd_disconnect */
1482                 class_export_get(exp);
1483
1484                 spin_lock(&exp->exp_lock);
1485                 exp->exp_flags = flags;
1486                 spin_unlock(&exp->exp_lock);
1487
1488                 if (obd_uuid_equals(&exp->exp_client_uuid,
1489                                     &exp->exp_obd->obd_uuid)) {
1490                         CDEBUG(D_HA,
1491                                "exp %p export uuid == obd uuid, don't discon\n",
1492                                exp);
1493                         /* Need to delete this now so we don't end up pointing
1494                          * to work_list later when this export is cleaned up. */
1495                         list_del_init(&exp->exp_obd_chain);
1496                         class_export_put(exp);
1497                         continue;
1498                 }
1499
1500                 class_export_get(exp);
1501                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1502                        "last request at %lld\n",
1503                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1504                        exp, exp->exp_last_request_time);
1505                 /* release one export reference anyway */
1506                 rc = obd_disconnect(exp);
1507
1508                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1509                        obd_export_nid2str(exp), exp, rc);
1510                 class_export_put(exp);
1511         }
1512         EXIT;
1513 }
1514
1515 void class_disconnect_exports(struct obd_device *obd)
1516 {
1517         LIST_HEAD(work_list);
1518         ENTRY;
1519
1520         /* Move all of the exports from obd_exports to a work list, en masse. */
1521         spin_lock(&obd->obd_dev_lock);
1522         list_splice_init(&obd->obd_exports, &work_list);
1523         list_splice_init(&obd->obd_delayed_exports, &work_list);
1524         spin_unlock(&obd->obd_dev_lock);
1525
1526         if (!list_empty(&work_list)) {
1527                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1528                        "disconnecting them\n", obd->obd_minor, obd);
1529                 class_disconnect_export_list(&work_list,
1530                                              exp_flags_from_obd(obd));
1531         } else
1532                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1533                        obd->obd_minor, obd);
1534         EXIT;
1535 }
1536 EXPORT_SYMBOL(class_disconnect_exports);
1537
1538 /* Remove exports that have not completed recovery.
1539  */
1540 void class_disconnect_stale_exports(struct obd_device *obd,
1541                                     int (*test_export)(struct obd_export *))
1542 {
1543         LIST_HEAD(work_list);
1544         struct obd_export *exp, *n;
1545         int evicted = 0;
1546         ENTRY;
1547
1548         spin_lock(&obd->obd_dev_lock);
1549         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1550                                  exp_obd_chain) {
1551                 /* don't count self-export as client */
1552                 if (obd_uuid_equals(&exp->exp_client_uuid,
1553                                     &exp->exp_obd->obd_uuid))
1554                         continue;
1555
1556                 /* don't evict clients which have no slot in last_rcvd
1557                  * (e.g. lightweight connection) */
1558                 if (exp->exp_target_data.ted_lr_idx == -1)
1559                         continue;
1560
1561                 spin_lock(&exp->exp_lock);
1562                 if (exp->exp_failed || test_export(exp)) {
1563                         spin_unlock(&exp->exp_lock);
1564                         continue;
1565                 }
1566                 exp->exp_failed = 1;
1567                 spin_unlock(&exp->exp_lock);
1568
1569                 list_move(&exp->exp_obd_chain, &work_list);
1570                 evicted++;
1571                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1572                        obd->obd_name, exp->exp_client_uuid.uuid,
1573                        obd_export_nid2str(exp));
1574                 print_export_data(exp, "EVICTING", 0, D_HA);
1575         }
1576         spin_unlock(&obd->obd_dev_lock);
1577
1578         if (evicted)
1579                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1580                               obd->obd_name, evicted);
1581
1582         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1583                                                  OBD_OPT_ABORT_RECOV);
1584         EXIT;
1585 }
1586 EXPORT_SYMBOL(class_disconnect_stale_exports);
1587
1588 void class_fail_export(struct obd_export *exp)
1589 {
1590         int rc, already_failed;
1591
1592         spin_lock(&exp->exp_lock);
1593         already_failed = exp->exp_failed;
1594         exp->exp_failed = 1;
1595         spin_unlock(&exp->exp_lock);
1596
1597         if (already_failed) {
1598                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1599                        exp, exp->exp_client_uuid.uuid);
1600                 return;
1601         }
1602
1603         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1604                exp, exp->exp_client_uuid.uuid);
1605
1606         if (obd_dump_on_timeout)
1607                 libcfs_debug_dumplog();
1608
1609         /* need for safe call CDEBUG after obd_disconnect */
1610         class_export_get(exp);
1611
1612         /* Most callers into obd_disconnect are removing their own reference
1613          * (request, for example) in addition to the one from the hash table.
1614          * We don't have such a reference here, so make one. */
1615         class_export_get(exp);
1616         rc = obd_disconnect(exp);
1617         if (rc)
1618                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1619         else
1620                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1621                        exp, exp->exp_client_uuid.uuid);
1622         class_export_put(exp);
1623 }
1624 EXPORT_SYMBOL(class_fail_export);
1625
1626 #ifdef HAVE_SERVER_SUPPORT
1627
1628 static int take_first(struct obd_export *exp, void *data)
1629 {
1630         struct obd_export **expp = data;
1631
1632         if (*expp)
1633                 /* already have one */
1634                 return 0;
1635         if (exp->exp_failed)
1636                 /* Don't want this one */
1637                 return 0;
1638         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1639                 /* Cannot get a ref on this one */
1640                 return 0;
1641         *expp = exp;
1642         return 1;
1643 }
1644
1645 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1646 {
1647         struct lnet_nid nid_key;
1648         struct obd_export *doomed_exp;
1649         int exports_evicted = 0;
1650
1651         libcfs_strnid(&nid_key, nid);
1652
1653         spin_lock(&obd->obd_dev_lock);
1654         /* umount has run already, so evict thread should leave
1655          * its task to umount thread now */
1656         if (obd->obd_stopping) {
1657                 spin_unlock(&obd->obd_dev_lock);
1658                 return exports_evicted;
1659         }
1660         spin_unlock(&obd->obd_dev_lock);
1661
1662         doomed_exp = NULL;
1663         while (obd_nid_export_for_each(obd, &nid_key,
1664                                        take_first, &doomed_exp) > 0) {
1665
1666                 LASSERTF(doomed_exp != obd->obd_self_export,
1667                          "self-export is hashed by NID?\n");
1668
1669                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1670                               obd->obd_name,
1671                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1672                               obd_export_nid2str(doomed_exp));
1673
1674                 class_fail_export(doomed_exp);
1675                 class_export_put(doomed_exp);
1676                 exports_evicted++;
1677                 doomed_exp = NULL;
1678         }
1679
1680         if (!exports_evicted)
1681                 CDEBUG(D_HA,
1682                        "%s: can't disconnect NID '%s': no exports found\n",
1683                        obd->obd_name, nid);
1684         return exports_evicted;
1685 }
1686 EXPORT_SYMBOL(obd_export_evict_by_nid);
1687
1688 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1689 {
1690         struct obd_export *doomed_exp = NULL;
1691         struct obd_uuid doomed_uuid;
1692         int exports_evicted = 0;
1693
1694         spin_lock(&obd->obd_dev_lock);
1695         if (obd->obd_stopping) {
1696                 spin_unlock(&obd->obd_dev_lock);
1697                 return exports_evicted;
1698         }
1699         spin_unlock(&obd->obd_dev_lock);
1700
1701         obd_str2uuid(&doomed_uuid, uuid);
1702         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1703                 CERROR("%s: can't evict myself\n", obd->obd_name);
1704                 return exports_evicted;
1705         }
1706
1707         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1708         if (doomed_exp == NULL) {
1709                 CERROR("%s: can't disconnect %s: no exports found\n",
1710                        obd->obd_name, uuid);
1711         } else {
1712                 CWARN("%s: evicting %s at adminstrative request\n",
1713                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1714                 class_fail_export(doomed_exp);
1715                 class_export_put(doomed_exp);
1716                 obd_uuid_del(obd, doomed_exp);
1717                 exports_evicted++;
1718         }
1719
1720         return exports_evicted;
1721 }
1722 #endif /* HAVE_SERVER_SUPPORT */
1723
1724 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1725 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1726 EXPORT_SYMBOL(class_export_dump_hook);
1727 #endif
1728
1729 static void print_export_data(struct obd_export *exp, const char *status,
1730                               int locks, int debug_level)
1731 {
1732         struct ptlrpc_reply_state *rs;
1733         struct ptlrpc_reply_state *first_reply = NULL;
1734         int nreplies = 0;
1735
1736         spin_lock(&exp->exp_lock);
1737         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1738                             rs_exp_list) {
1739                 if (nreplies == 0)
1740                         first_reply = rs;
1741                 nreplies++;
1742         }
1743         spin_unlock(&exp->exp_lock);
1744
1745         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1746                "%p %s %llu stale:%d\n",
1747                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1748                obd_export_nid2str(exp),
1749                refcount_read(&exp->exp_handle.h_ref),
1750                atomic_read(&exp->exp_rpc_count),
1751                atomic_read(&exp->exp_cb_count),
1752                atomic_read(&exp->exp_locks_count),
1753                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1754                nreplies, first_reply, nreplies > 3 ? "..." : "",
1755                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1756 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1757         if (locks && class_export_dump_hook != NULL)
1758                 class_export_dump_hook(exp);
1759 #endif
1760 }
1761
1762 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1763 {
1764         struct obd_export *exp;
1765
1766         spin_lock(&obd->obd_dev_lock);
1767         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1768                 print_export_data(exp, "ACTIVE", locks, debug_level);
1769         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1770                 print_export_data(exp, "UNLINKED", locks, debug_level);
1771         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1772                 print_export_data(exp, "DELAYED", locks, debug_level);
1773         spin_unlock(&obd->obd_dev_lock);
1774 }
1775
1776 void obd_exports_barrier(struct obd_device *obd)
1777 {
1778         int waited = 2;
1779         LASSERT(list_empty(&obd->obd_exports));
1780         spin_lock(&obd->obd_dev_lock);
1781         while (!list_empty(&obd->obd_unlinked_exports)) {
1782                 spin_unlock(&obd->obd_dev_lock);
1783                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1784                 if (waited > 5 && is_power_of_2(waited)) {
1785                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1786                                       "more than %d seconds. "
1787                                       "The obd refcount = %d. Is it stuck?\n",
1788                                       obd->obd_name, waited,
1789                                       atomic_read(&obd->obd_refcount));
1790                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1791                 }
1792                 waited *= 2;
1793                 spin_lock(&obd->obd_dev_lock);
1794         }
1795         spin_unlock(&obd->obd_dev_lock);
1796 }
1797 EXPORT_SYMBOL(obd_exports_barrier);
1798
1799 /**
1800  * Add export to the obd_zombe thread and notify it.
1801  */
1802 static void obd_zombie_export_add(struct obd_export *exp) {
1803         atomic_dec(&obd_stale_export_num);
1804         spin_lock(&exp->exp_obd->obd_dev_lock);
1805         LASSERT(!list_empty(&exp->exp_obd_chain));
1806         list_del_init(&exp->exp_obd_chain);
1807         spin_unlock(&exp->exp_obd->obd_dev_lock);
1808
1809         queue_work(zombie_wq, &exp->exp_zombie_work);
1810 }
1811
1812 /**
1813  * Add import to the obd_zombe thread and notify it.
1814  */
1815 static void obd_zombie_import_add(struct obd_import *imp) {
1816         LASSERT(imp->imp_sec == NULL);
1817
1818         queue_work(zombie_wq, &imp->imp_zombie_work);
1819 }
1820
1821 /**
1822  * wait when obd_zombie import/export queues become empty
1823  */
1824 void obd_zombie_barrier(void)
1825 {
1826         flush_workqueue(zombie_wq);
1827 }
1828 EXPORT_SYMBOL(obd_zombie_barrier);
1829
1830
1831 struct obd_export *obd_stale_export_get(void)
1832 {
1833         struct obd_export *exp = NULL;
1834         ENTRY;
1835
1836         spin_lock(&obd_stale_export_lock);
1837         if (!list_empty(&obd_stale_exports)) {
1838                 exp = list_first_entry(&obd_stale_exports,
1839                                        struct obd_export, exp_stale_list);
1840                 list_del_init(&exp->exp_stale_list);
1841         }
1842         spin_unlock(&obd_stale_export_lock);
1843
1844         if (exp) {
1845                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1846                        atomic_read(&obd_stale_export_num));
1847         }
1848         RETURN(exp);
1849 }
1850 EXPORT_SYMBOL(obd_stale_export_get);
1851
1852 void obd_stale_export_put(struct obd_export *exp)
1853 {
1854         ENTRY;
1855
1856         LASSERT(list_empty(&exp->exp_stale_list));
1857         if (exp->exp_lock_hash &&
1858             atomic_read(&exp->exp_lock_hash->hs_count)) {
1859                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1860                        atomic_read(&obd_stale_export_num));
1861
1862                 spin_lock_bh(&exp->exp_bl_list_lock);
1863                 spin_lock(&obd_stale_export_lock);
1864                 /* Add to the tail if there is no blocked locks,
1865                  * to the head otherwise. */
1866                 if (list_empty(&exp->exp_bl_list))
1867                         list_add_tail(&exp->exp_stale_list,
1868                                       &obd_stale_exports);
1869                 else
1870                         list_add(&exp->exp_stale_list,
1871                                  &obd_stale_exports);
1872
1873                 spin_unlock(&obd_stale_export_lock);
1874                 spin_unlock_bh(&exp->exp_bl_list_lock);
1875         } else {
1876                 class_export_put(exp);
1877         }
1878         EXIT;
1879 }
1880 EXPORT_SYMBOL(obd_stale_export_put);
1881
1882 /**
1883  * Adjust the position of the export in the stale list,
1884  * i.e. move to the head of the list if is needed.
1885  **/
1886 void obd_stale_export_adjust(struct obd_export *exp)
1887 {
1888         LASSERT(exp != NULL);
1889         spin_lock_bh(&exp->exp_bl_list_lock);
1890         spin_lock(&obd_stale_export_lock);
1891
1892         if (!list_empty(&exp->exp_stale_list) &&
1893             !list_empty(&exp->exp_bl_list))
1894                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1895
1896         spin_unlock(&obd_stale_export_lock);
1897         spin_unlock_bh(&exp->exp_bl_list_lock);
1898 }
1899 EXPORT_SYMBOL(obd_stale_export_adjust);
1900
1901 /**
1902  * start destroy zombie import/export thread
1903  */
1904 int obd_zombie_impexp_init(void)
1905 {
1906         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1907                                            0, CFS_CPT_ANY,
1908                                            cfs_cpt_number(cfs_cpt_tab));
1909
1910         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1911 }
1912
1913 /**
1914  * stop destroy zombie import/export thread
1915  */
1916 void obd_zombie_impexp_stop(void)
1917 {
1918         destroy_workqueue(zombie_wq);
1919         LASSERT(list_empty(&obd_stale_exports));
1920 }
1921
1922 /***** Kernel-userspace comm helpers *******/
1923
1924 /* Get length of entire message, including header */
1925 int kuc_len(int payload_len)
1926 {
1927         return sizeof(struct kuc_hdr) + payload_len;
1928 }
1929 EXPORT_SYMBOL(kuc_len);
1930
1931 /* Get a pointer to kuc header, given a ptr to the payload
1932  * @param p Pointer to payload area
1933  * @returns Pointer to kuc header
1934  */
1935 struct kuc_hdr * kuc_ptr(void *p)
1936 {
1937         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1938         LASSERT(lh->kuc_magic == KUC_MAGIC);
1939         return lh;
1940 }
1941 EXPORT_SYMBOL(kuc_ptr);
1942
1943 /* Alloc space for a message, and fill in header
1944  * @return Pointer to payload area
1945  */
1946 void *kuc_alloc(int payload_len, int transport, int type)
1947 {
1948         struct kuc_hdr *lh;
1949         int len = kuc_len(payload_len);
1950
1951         OBD_ALLOC(lh, len);
1952         if (lh == NULL)
1953                 return ERR_PTR(-ENOMEM);
1954
1955         lh->kuc_magic = KUC_MAGIC;
1956         lh->kuc_transport = transport;
1957         lh->kuc_msgtype = type;
1958         lh->kuc_msglen = len;
1959
1960         return (void *)(lh + 1);
1961 }
1962 EXPORT_SYMBOL(kuc_alloc);
1963
1964 /* Takes pointer to payload area */
1965 void kuc_free(void *p, int payload_len)
1966 {
1967         struct kuc_hdr *lh = kuc_ptr(p);
1968         OBD_FREE(lh, kuc_len(payload_len));
1969 }
1970 EXPORT_SYMBOL(kuc_free);
1971
1972 struct obd_request_slot_waiter {
1973         struct list_head        orsw_entry;
1974         wait_queue_head_t       orsw_waitq;
1975         bool                    orsw_signaled;
1976 };
1977
1978 static bool obd_request_slot_avail(struct client_obd *cli,
1979                                    struct obd_request_slot_waiter *orsw)
1980 {
1981         bool avail;
1982
1983         spin_lock(&cli->cl_loi_list_lock);
1984         avail = !!list_empty(&orsw->orsw_entry);
1985         spin_unlock(&cli->cl_loi_list_lock);
1986
1987         return avail;
1988 };
1989
1990 /*
1991  * For network flow control, the RPC sponsor needs to acquire a credit
1992  * before sending the RPC. The credits count for a connection is defined
1993  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1994  * the subsequent RPC sponsors need to wait until others released their
1995  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1996  */
1997 int obd_get_request_slot(struct client_obd *cli)
1998 {
1999         struct obd_request_slot_waiter   orsw;
2000         int                              rc;
2001
2002         spin_lock(&cli->cl_loi_list_lock);
2003         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2004                 cli->cl_rpcs_in_flight++;
2005                 spin_unlock(&cli->cl_loi_list_lock);
2006                 return 0;
2007         }
2008
2009         init_waitqueue_head(&orsw.orsw_waitq);
2010         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2011         orsw.orsw_signaled = false;
2012         spin_unlock(&cli->cl_loi_list_lock);
2013
2014         rc = l_wait_event_abortable(orsw.orsw_waitq,
2015                                     obd_request_slot_avail(cli, &orsw) ||
2016                                     orsw.orsw_signaled);
2017
2018         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2019          * freed but other (such as obd_put_request_slot) is using it. */
2020         spin_lock(&cli->cl_loi_list_lock);
2021         if (rc != 0) {
2022                 if (!orsw.orsw_signaled) {
2023                         if (list_empty(&orsw.orsw_entry))
2024                                 cli->cl_rpcs_in_flight--;
2025                         else
2026                                 list_del(&orsw.orsw_entry);
2027                 }
2028                 rc = -EINTR;
2029         }
2030
2031         if (orsw.orsw_signaled) {
2032                 LASSERT(list_empty(&orsw.orsw_entry));
2033
2034                 rc = -EINTR;
2035         }
2036         spin_unlock(&cli->cl_loi_list_lock);
2037
2038         return rc;
2039 }
2040 EXPORT_SYMBOL(obd_get_request_slot);
2041
2042 void obd_put_request_slot(struct client_obd *cli)
2043 {
2044         struct obd_request_slot_waiter *orsw;
2045
2046         spin_lock(&cli->cl_loi_list_lock);
2047         cli->cl_rpcs_in_flight--;
2048
2049         /* If there is free slot, wakeup the first waiter. */
2050         if (!list_empty(&cli->cl_flight_waiters) &&
2051             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2052                 orsw = list_first_entry(&cli->cl_flight_waiters,
2053                                         struct obd_request_slot_waiter,
2054                                         orsw_entry);
2055                 list_del_init(&orsw->orsw_entry);
2056                 cli->cl_rpcs_in_flight++;
2057                 wake_up(&orsw->orsw_waitq);
2058         }
2059         spin_unlock(&cli->cl_loi_list_lock);
2060 }
2061 EXPORT_SYMBOL(obd_put_request_slot);
2062
2063 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2064 {
2065         return cli->cl_max_rpcs_in_flight;
2066 }
2067 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2068
2069 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2070 {
2071         struct obd_request_slot_waiter *orsw;
2072         __u32                           old;
2073         int                             diff;
2074         int                             i;
2075         int                             rc;
2076
2077         if (max > OBD_MAX_RIF_MAX || max < 1)
2078                 return -ERANGE;
2079
2080         CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2081                cli->cl_import->imp_obd->obd_name, max,
2082                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2083
2084         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2085                    LUSTRE_MDC_NAME) == 0) {
2086                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2087                  * strictly lower that max_rpcs_in_flight */
2088                 if (max < 2) {
2089                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2090                                cli->cl_import->imp_obd->obd_name);
2091                         return -ERANGE;
2092                 }
2093                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2094                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2095                         if (rc != 0)
2096                                 return rc;
2097                 }
2098         }
2099
2100         spin_lock(&cli->cl_loi_list_lock);
2101         old = cli->cl_max_rpcs_in_flight;
2102         cli->cl_max_rpcs_in_flight = max;
2103         client_adjust_max_dirty(cli);
2104
2105         diff = max - old;
2106
2107         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2108         for (i = 0; i < diff; i++) {
2109                 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2110                                                 struct obd_request_slot_waiter,
2111                                                 orsw_entry);
2112                 if (!orsw)
2113                         break;
2114
2115                 list_del_init(&orsw->orsw_entry);
2116                 cli->cl_rpcs_in_flight++;
2117                 wake_up(&orsw->orsw_waitq);
2118         }
2119         spin_unlock(&cli->cl_loi_list_lock);
2120
2121         return 0;
2122 }
2123 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2124
2125 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2126 {
2127         return cli->cl_max_mod_rpcs_in_flight;
2128 }
2129 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2130
2131 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2132 {
2133         struct obd_connect_data *ocd;
2134         __u16 maxmodrpcs;
2135         __u16 prev;
2136
2137         if (max > OBD_MAX_RIF_MAX || max < 1)
2138                 return -ERANGE;
2139
2140         ocd = &cli->cl_import->imp_connect_data;
2141         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2142                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2143                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2144
2145         if (max == OBD_MAX_RIF_MAX)
2146                 max = OBD_MAX_RIF_MAX - 1;
2147
2148         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2149          * increase this value, also bump up max_rpcs_in_flight to match.
2150          */
2151         if (max >= cli->cl_max_rpcs_in_flight) {
2152                 CDEBUG(D_INFO,
2153                        "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2154                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2155                 obd_set_max_rpcs_in_flight(cli, max + 1);
2156         }
2157
2158         /* cannot exceed max modify RPCs in flight supported by the server,
2159          * but verify ocd_connect_flags is at least initialized first.  If
2160          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2161          */
2162         if (!ocd->ocd_connect_flags) {
2163                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2164         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2165                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2166                 if (maxmodrpcs == 0) { /* connection not finished yet */
2167                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2168                         CDEBUG(D_INFO,
2169                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2170                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2171                 }
2172         } else {
2173                 maxmodrpcs = 1;
2174         }
2175         if (max > maxmodrpcs) {
2176                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2177                        cli->cl_import->imp_obd->obd_name,
2178                        max, maxmodrpcs);
2179                 return -ERANGE;
2180         }
2181
2182         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2183
2184         prev = cli->cl_max_mod_rpcs_in_flight;
2185         cli->cl_max_mod_rpcs_in_flight = max;
2186
2187         /* wakeup waiters if limit has been increased */
2188         if (cli->cl_max_mod_rpcs_in_flight > prev)
2189                 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2190
2191         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2192
2193         return 0;
2194 }
2195 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2196
2197 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2198                                struct seq_file *seq)
2199 {
2200         unsigned long mod_tot = 0, mod_cum;
2201         int i;
2202
2203         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2204         lprocfs_stats_header(seq, ktime_get(), cli->cl_mod_rpcs_init, 25,
2205                              ":", true);
2206         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2207                    cli->cl_mod_rpcs_in_flight);
2208
2209         seq_printf(seq, "\n\t\t\tmodify\n");
2210         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2211
2212         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2213
2214         mod_cum = 0;
2215         for (i = 0; i < OBD_HIST_MAX; i++) {
2216                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2217
2218                 mod_cum += mod;
2219                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2220                            i, mod, pct(mod, mod_tot),
2221                            pct(mod_cum, mod_tot));
2222                 if (mod_cum == mod_tot)
2223                         break;
2224         }
2225
2226         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2227
2228         return 0;
2229 }
2230 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2231
2232 /* The number of modify RPCs sent in parallel is limited
2233  * because the server has a finite number of slots per client to
2234  * store request result and ensure reply reconstruction when needed.
2235  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2236  * that takes into account server limit and cl_max_rpcs_in_flight
2237  * value.
2238  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2239  * one close request is allowed above the maximum.
2240  */
2241 struct mod_waiter {
2242         struct client_obd *cli;
2243         bool close_req;
2244         wait_queue_entry_t wqe;
2245 };
2246 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2247                                   unsigned int mode, int flags, void *key)
2248 {
2249         struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2250         struct client_obd *cli = w->cli;
2251         bool close_req = w->close_req;
2252         bool avail;
2253         int ret;
2254
2255         /* As woken_wake_function() doesn't remove us from the wait_queue,
2256          * we could get called twice for the same thread - take care.
2257          */
2258         if (wq_entry->flags & WQ_FLAG_WOKEN)
2259                 /* Already woke this thread, don't try again */
2260                 return 0;
2261
2262         /* A slot is available if
2263          * - number of modify RPCs in flight is less than the max
2264          * - it's a close RPC and no other close request is in flight
2265          */
2266         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2267                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2268         if (avail) {
2269                 cli->cl_mod_rpcs_in_flight++;
2270                 if (w->close_req)
2271                         cli->cl_close_rpcs_in_flight++;
2272                 ret = woken_wake_function(wq_entry, mode, flags, key);
2273         } else if (cli->cl_close_rpcs_in_flight)
2274                 /* No other waiter could be woken */
2275                 ret = -1;
2276         else if (key == NULL)
2277                 /* This was not a wakeup from a close completion, so there is no
2278                  * point seeing if there are close waiters to be woken
2279                  */
2280                 ret = -1;
2281         else
2282                 /* There might be be a close we could wake, keep looking */
2283                 ret = 0;
2284         return ret;
2285 }
2286
2287 /* Get a modify RPC slot from the obd client @cli according
2288  * to the kind of operation @opc that is going to be sent
2289  * and the intent @it of the operation if it applies.
2290  * If the maximum number of modify RPCs in flight is reached
2291  * the thread is put to sleep.
2292  * Returns the tag to be set in the request message. Tag 0
2293  * is reserved for non-modifying requests.
2294  */
2295 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2296 {
2297         struct mod_waiter wait = {
2298                 .cli = cli,
2299                 .close_req = (opc == MDS_CLOSE),
2300         };
2301         __u16                   i, max;
2302
2303         init_wait(&wait.wqe);
2304         wait.wqe.func = claim_mod_rpc_function;
2305
2306         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2307         __add_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2308         /* This wakeup will only succeed if the maximums haven't
2309          * been reached.  If that happens, WQ_FLAG_WOKEN will be cleared
2310          * and there will be no need to wait.
2311          */
2312         wake_up_locked(&cli->cl_mod_rpcs_waitq);
2313         if (!(wait.wqe.flags & WQ_FLAG_WOKEN)) {
2314                 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2315                 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2316                            MAX_SCHEDULE_TIMEOUT);
2317                 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2318         }
2319         __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2320
2321         max = cli->cl_max_mod_rpcs_in_flight;
2322         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2323                          cli->cl_mod_rpcs_in_flight);
2324         /* find a free tag */
2325         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2326                                 max + 1);
2327         LASSERT(i < OBD_MAX_RIF_MAX);
2328         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2329         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2330         /* tag 0 is reserved for non-modify RPCs */
2331
2332         CDEBUG(D_RPCTRACE,
2333                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2334                cli->cl_import->imp_obd->obd_name,
2335                i + 1, opc, max);
2336
2337         return i + 1;
2338 }
2339 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2340
2341 /* Put a modify RPC slot from the obd client @cli according
2342  * to the kind of operation @opc that has been sent.
2343  */
2344 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2345 {
2346         bool                    close_req = false;
2347
2348         if (tag == 0)
2349                 return;
2350
2351         if (opc == MDS_CLOSE)
2352                 close_req = true;
2353
2354         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2355         cli->cl_mod_rpcs_in_flight--;
2356         if (close_req)
2357                 cli->cl_close_rpcs_in_flight--;
2358         /* release the tag in the bitmap */
2359         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2360         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2361         __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2362                              (void *)close_req);
2363         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2364 }
2365 EXPORT_SYMBOL(obd_put_mod_rpc_slot);