Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 /*
66  * support functions: we could use inter-module communication, but this
67  * is more portable to other OS's
68  */
69 static struct obd_device *obd_device_alloc(void)
70 {
71         struct obd_device *obd;
72
73         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74         if (obd != NULL) {
75                 obd->obd_magic = OBD_DEVICE_MAGIC;
76         }
77         return obd;
78 }
79
80 static void obd_device_free(struct obd_device *obd)
81 {
82         LASSERT(obd != NULL);
83         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85         if (obd->obd_namespace != NULL) {
86                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87                        obd, obd->obd_namespace, obd->obd_force);
88                 LBUG();
89         }
90         lu_ref_fini(&obd->obd_reference);
91         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
92 }
93
94 struct obd_type *class_search_type(const char *name)
95 {
96         struct kobject *kobj = kset_find_obj(lustre_kset, name);
97
98         if (kobj && kobj->ktype == &class_ktype)
99                 return container_of(kobj, struct obd_type, typ_kobj);
100
101         kobject_put(kobj);
102         return NULL;
103 }
104 EXPORT_SYMBOL(class_search_type);
105
106 struct obd_type *class_get_type(const char *name)
107 {
108         struct obd_type *type;
109
110         type = class_search_type(name);
111 #ifdef HAVE_MODULE_LOADING_SUPPORT
112         if (!type) {
113                 const char *modname = name;
114
115 #ifdef HAVE_SERVER_SUPPORT
116                 if (strcmp(modname, "obdfilter") == 0)
117                         modname = "ofd";
118
119                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
120                         modname = LUSTRE_OSP_NAME;
121
122                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
123                         modname = LUSTRE_MDT_NAME;
124 #endif /* HAVE_SERVER_SUPPORT */
125
126                 if (!request_module("%s", modname)) {
127                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
128                         type = class_search_type(name);
129                 } else {
130                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
131                                            modname);
132                 }
133         }
134 #endif
135         if (type) {
136                 if (try_module_get(type->typ_dt_ops->o_owner)) {
137                         atomic_inc(&type->typ_refcnt);
138                         /* class_search_type() returned a counted reference,
139                          * but we don't need that count any more as
140                          * we have one through typ_refcnt.
141                          */
142                         kobject_put(&type->typ_kobj);
143                 } else {
144                         kobject_put(&type->typ_kobj);
145                         type = NULL;
146                 }
147         }
148         return type;
149 }
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         module_put(type->typ_dt_ops->o_owner);
155         atomic_dec(&type->typ_refcnt);
156 }
157
158 static void class_sysfs_release(struct kobject *kobj)
159 {
160         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
161
162         debugfs_remove_recursive(type->typ_debugfs_entry);
163         type->typ_debugfs_entry = NULL;
164
165         if (type->typ_lu)
166                 lu_device_type_fini(type->typ_lu);
167
168 #ifdef CONFIG_PROC_FS
169         if (type->typ_name && type->typ_procroot)
170                 remove_proc_subtree(type->typ_name, proc_lustre_root);
171 #endif
172         OBD_FREE(type, sizeof(*type));
173 }
174
175 static struct kobj_type class_ktype = {
176         .sysfs_ops      = &lustre_sysfs_ops,
177         .release        = class_sysfs_release,
178 };
179
180 #ifdef HAVE_SERVER_SUPPORT
181 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
182 {
183         struct dentry *symlink;
184         struct obd_type *type;
185         int rc;
186
187         type = class_search_type(name);
188         if (type) {
189                 kobject_put(&type->typ_kobj);
190                 return ERR_PTR(-EEXIST);
191         }
192
193         OBD_ALLOC(type, sizeof(*type));
194         if (!type)
195                 return ERR_PTR(-ENOMEM);
196
197         type->typ_kobj.kset = lustre_kset;
198         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
199                                   &lustre_kset->kobj, "%s", name);
200         if (rc)
201                 return ERR_PTR(rc);
202
203         symlink = debugfs_create_dir(name, debugfs_lustre_root);
204         type->typ_debugfs_entry = symlink;
205         type->typ_sym_filter = true;
206
207         if (enable_proc) {
208                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
209                                                       NULL, NULL);
210                 if (IS_ERR(type->typ_procroot)) {
211                         CERROR("%s: can't create compat proc entry: %d\n",
212                                name, (int)PTR_ERR(type->typ_procroot));
213                         type->typ_procroot = NULL;
214                 }
215         }
216
217         return type;
218 }
219 EXPORT_SYMBOL(class_add_symlinks);
220 #endif /* HAVE_SERVER_SUPPORT */
221
222 #define CLASS_MAX_NAME 1024
223
224 int class_register_type(const struct obd_ops *dt_ops,
225                         const struct md_ops *md_ops,
226                         bool enable_proc,
227                         const char *name, struct lu_device_type *ldt)
228 {
229         struct obd_type *type;
230         int rc;
231
232         ENTRY;
233         /* sanity check */
234         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
235
236         type = class_search_type(name);
237         if (type) {
238 #ifdef HAVE_SERVER_SUPPORT
239                 if (type->typ_sym_filter)
240                         goto dir_exist;
241 #endif /* HAVE_SERVER_SUPPORT */
242                 kobject_put(&type->typ_kobj);
243                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
244                 RETURN(-EEXIST);
245         }
246
247         OBD_ALLOC(type, sizeof(*type));
248         if (type == NULL)
249                 RETURN(-ENOMEM);
250
251         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
252         type->typ_kobj.kset = lustre_kset;
253         kobject_init(&type->typ_kobj, &class_ktype);
254 #ifdef HAVE_SERVER_SUPPORT
255 dir_exist:
256 #endif /* HAVE_SERVER_SUPPORT */
257
258         type->typ_dt_ops = dt_ops;
259         type->typ_md_ops = md_ops;
260
261 #ifdef HAVE_SERVER_SUPPORT
262         if (type->typ_sym_filter) {
263                 type->typ_sym_filter = false;
264                 kobject_put(&type->typ_kobj);
265                 goto setup_ldt;
266         }
267 #endif
268 #ifdef CONFIG_PROC_FS
269         if (enable_proc && !type->typ_procroot) {
270                 type->typ_procroot = lprocfs_register(name,
271                                                       proc_lustre_root,
272                                                       NULL, type);
273                 if (IS_ERR(type->typ_procroot)) {
274                         rc = PTR_ERR(type->typ_procroot);
275                         type->typ_procroot = NULL;
276                         GOTO(failed, rc);
277                 }
278         }
279 #endif
280         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
281
282         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
283         if (rc)
284                 GOTO(failed, rc);
285 #ifdef HAVE_SERVER_SUPPORT
286 setup_ldt:
287 #endif
288         if (ldt) {
289                 rc = lu_device_type_init(ldt);
290                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
291                 wake_up_var(&type->typ_lu);
292                 if (rc)
293                         GOTO(failed, rc);
294         }
295
296         RETURN(0);
297
298 failed:
299         kobject_put(&type->typ_kobj);
300
301         RETURN(rc);
302 }
303 EXPORT_SYMBOL(class_register_type);
304
305 int class_unregister_type(const char *name)
306 {
307         struct obd_type *type = class_search_type(name);
308         int rc = 0;
309         ENTRY;
310
311         if (!type) {
312                 CERROR("unknown obd type\n");
313                 RETURN(-EINVAL);
314         }
315
316         if (atomic_read(&type->typ_refcnt)) {
317                 CERROR("type %s has refcount (%d)\n", name,
318                        atomic_read(&type->typ_refcnt));
319                 /* This is a bad situation, let's make the best of it */
320                 /* Remove ops, but leave the name for debugging */
321                 type->typ_dt_ops = NULL;
322                 type->typ_md_ops = NULL;
323                 GOTO(out_put, rc = -EBUSY);
324         }
325
326         /* Put the final ref */
327         kobject_put(&type->typ_kobj);
328 out_put:
329         /* Put the ref returned by class_search_type() */
330         kobject_put(&type->typ_kobj);
331
332         RETURN(rc);
333 } /* class_unregister_type */
334 EXPORT_SYMBOL(class_unregister_type);
335
336 /**
337  * Create a new obd device.
338  *
339  * Allocate the new obd_device and initialize it.
340  *
341  * \param[in] type_name obd device type string.
342  * \param[in] name      obd device name.
343  * \param[in] uuid      obd device UUID
344  *
345  * \retval newdev         pointer to created obd_device
346  * \retval ERR_PTR(errno) on error
347  */
348 struct obd_device *class_newdev(const char *type_name, const char *name,
349                                 const char *uuid)
350 {
351         struct obd_device *newdev;
352         struct obd_type *type = NULL;
353         ENTRY;
354
355         if (strlen(name) >= MAX_OBD_NAME) {
356                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
357                 RETURN(ERR_PTR(-EINVAL));
358         }
359
360         type = class_get_type(type_name);
361         if (type == NULL){
362                 CERROR("OBD: unknown type: %s\n", type_name);
363                 RETURN(ERR_PTR(-ENODEV));
364         }
365
366         newdev = obd_device_alloc();
367         if (newdev == NULL) {
368                 class_put_type(type);
369                 RETURN(ERR_PTR(-ENOMEM));
370         }
371         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
372         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
373         newdev->obd_type = type;
374         newdev->obd_minor = -1;
375
376         rwlock_init(&newdev->obd_pool_lock);
377         newdev->obd_pool_limit = 0;
378         newdev->obd_pool_slv = 0;
379
380         INIT_LIST_HEAD(&newdev->obd_exports);
381         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
382         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
383         INIT_LIST_HEAD(&newdev->obd_exports_timed);
384         INIT_LIST_HEAD(&newdev->obd_nid_stats);
385         spin_lock_init(&newdev->obd_nid_lock);
386         spin_lock_init(&newdev->obd_dev_lock);
387         mutex_init(&newdev->obd_dev_mutex);
388         spin_lock_init(&newdev->obd_osfs_lock);
389         /* newdev->obd_osfs_age must be set to a value in the distant
390          * past to guarantee a fresh statfs is fetched on mount. */
391         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
392
393         /* XXX belongs in setup not attach  */
394         init_rwsem(&newdev->obd_observer_link_sem);
395         /* recovery data */
396         spin_lock_init(&newdev->obd_recovery_task_lock);
397         init_waitqueue_head(&newdev->obd_next_transno_waitq);
398         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
399         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
400         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
401         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
402         INIT_LIST_HEAD(&newdev->obd_evict_list);
403         INIT_LIST_HEAD(&newdev->obd_lwp_list);
404
405         llog_group_init(&newdev->obd_olg);
406         /* Detach drops this */
407         atomic_set(&newdev->obd_refcount, 1);
408         lu_ref_init(&newdev->obd_reference);
409         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
410
411         newdev->obd_conn_inprogress = 0;
412
413         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
414
415         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
416                newdev->obd_name, newdev);
417
418         return newdev;
419 }
420
421 /**
422  * Free obd device.
423  *
424  * \param[in] obd obd_device to be freed
425  *
426  * \retval none
427  */
428 void class_free_dev(struct obd_device *obd)
429 {
430         struct obd_type *obd_type = obd->obd_type;
431
432         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
433                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
434         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
435                  "obd %p != obd_devs[%d] %p\n",
436                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
437         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
438                  "obd_refcount should be 0, not %d\n",
439                  atomic_read(&obd->obd_refcount));
440         LASSERT(obd_type != NULL);
441
442         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
443                obd->obd_name, obd->obd_type->typ_name);
444
445         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
446                          obd->obd_name, obd->obd_uuid.uuid);
447         if (obd->obd_stopping) {
448                 int err;
449
450                 /* If we're not stopping, we were never set up */
451                 err = obd_cleanup(obd);
452                 if (err)
453                         CERROR("Cleanup %s returned %d\n",
454                                 obd->obd_name, err);
455         }
456
457         obd_device_free(obd);
458
459         class_put_type(obd_type);
460 }
461
462 /**
463  * Unregister obd device.
464  *
465  * Free slot in obd_dev[] used by \a obd.
466  *
467  * \param[in] new_obd obd_device to be unregistered
468  *
469  * \retval none
470  */
471 void class_unregister_device(struct obd_device *obd)
472 {
473         write_lock(&obd_dev_lock);
474         if (obd->obd_minor >= 0) {
475                 LASSERT(obd_devs[obd->obd_minor] == obd);
476                 obd_devs[obd->obd_minor] = NULL;
477                 obd->obd_minor = -1;
478         }
479         write_unlock(&obd_dev_lock);
480 }
481
482 /**
483  * Register obd device.
484  *
485  * Find free slot in obd_devs[], fills it with \a new_obd.
486  *
487  * \param[in] new_obd obd_device to be registered
488  *
489  * \retval 0          success
490  * \retval -EEXIST    device with this name is registered
491  * \retval -EOVERFLOW obd_devs[] is full
492  */
493 int class_register_device(struct obd_device *new_obd)
494 {
495         int ret = 0;
496         int i;
497         int new_obd_minor = 0;
498         bool minor_assign = false;
499         bool retried = false;
500
501 again:
502         write_lock(&obd_dev_lock);
503         for (i = 0; i < class_devno_max(); i++) {
504                 struct obd_device *obd = class_num2obd(i);
505
506                 if (obd != NULL &&
507                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
508
509                         if (!retried) {
510                                 write_unlock(&obd_dev_lock);
511
512                                 /* the obd_device could be waited to be
513                                  * destroyed by the "obd_zombie_impexp_thread".
514                                  */
515                                 obd_zombie_barrier();
516                                 retried = true;
517                                 goto again;
518                         }
519
520                         CERROR("%s: already exists, won't add\n",
521                                obd->obd_name);
522                         /* in case we found a free slot before duplicate */
523                         minor_assign = false;
524                         ret = -EEXIST;
525                         break;
526                 }
527                 if (!minor_assign && obd == NULL) {
528                         new_obd_minor = i;
529                         minor_assign = true;
530                 }
531         }
532
533         if (minor_assign) {
534                 new_obd->obd_minor = new_obd_minor;
535                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
536                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
537                 obd_devs[new_obd_minor] = new_obd;
538         } else {
539                 if (ret == 0) {
540                         ret = -EOVERFLOW;
541                         CERROR("%s: all %u/%u devices used, increase "
542                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
543                                i, class_devno_max(), ret);
544                 }
545         }
546         write_unlock(&obd_dev_lock);
547
548         RETURN(ret);
549 }
550
551 static int class_name2dev_nolock(const char *name)
552 {
553         int i;
554
555         if (!name)
556                 return -1;
557
558         for (i = 0; i < class_devno_max(); i++) {
559                 struct obd_device *obd = class_num2obd(i);
560
561                 if (obd && strcmp(name, obd->obd_name) == 0) {
562                         /* Make sure we finished attaching before we give
563                            out any references */
564                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
565                         if (obd->obd_attached) {
566                                 return i;
567                         }
568                         break;
569                 }
570         }
571
572         return -1;
573 }
574
575 int class_name2dev(const char *name)
576 {
577         int i;
578
579         if (!name)
580                 return -1;
581
582         read_lock(&obd_dev_lock);
583         i = class_name2dev_nolock(name);
584         read_unlock(&obd_dev_lock);
585
586         return i;
587 }
588 EXPORT_SYMBOL(class_name2dev);
589
590 struct obd_device *class_name2obd(const char *name)
591 {
592         int dev = class_name2dev(name);
593
594         if (dev < 0 || dev > class_devno_max())
595                 return NULL;
596         return class_num2obd(dev);
597 }
598 EXPORT_SYMBOL(class_name2obd);
599
600 int class_uuid2dev_nolock(struct obd_uuid *uuid)
601 {
602         int i;
603
604         for (i = 0; i < class_devno_max(); i++) {
605                 struct obd_device *obd = class_num2obd(i);
606
607                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
608                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
609                         return i;
610                 }
611         }
612
613         return -1;
614 }
615
616 int class_uuid2dev(struct obd_uuid *uuid)
617 {
618         int i;
619
620         read_lock(&obd_dev_lock);
621         i = class_uuid2dev_nolock(uuid);
622         read_unlock(&obd_dev_lock);
623
624         return i;
625 }
626 EXPORT_SYMBOL(class_uuid2dev);
627
628 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
629 {
630         int dev = class_uuid2dev(uuid);
631         if (dev < 0)
632                 return NULL;
633         return class_num2obd(dev);
634 }
635 EXPORT_SYMBOL(class_uuid2obd);
636
637 /**
638  * Get obd device from ::obd_devs[]
639  *
640  * \param num [in] array index
641  *
642  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
643  *         otherwise return the obd device there.
644  */
645 struct obd_device *class_num2obd(int num)
646 {
647         struct obd_device *obd = NULL;
648
649         if (num < class_devno_max()) {
650                 obd = obd_devs[num];
651                 if (obd == NULL)
652                         return NULL;
653
654                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
655                          "%p obd_magic %08x != %08x\n",
656                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
657                 LASSERTF(obd->obd_minor == num,
658                          "%p obd_minor %0d != %0d\n",
659                          obd, obd->obd_minor, num);
660         }
661
662         return obd;
663 }
664 EXPORT_SYMBOL(class_num2obd);
665
666 /**
667  * Find obd in obd_dev[] by name or uuid.
668  *
669  * Increment obd's refcount if found.
670  *
671  * \param[in] str obd name or uuid
672  *
673  * \retval NULL    if not found
674  * \retval target  pointer to found obd_device
675  */
676 struct obd_device *class_dev_by_str(const char *str)
677 {
678         struct obd_device *target = NULL;
679         struct obd_uuid tgtuuid;
680         int rc;
681
682         obd_str2uuid(&tgtuuid, str);
683
684         read_lock(&obd_dev_lock);
685         rc = class_uuid2dev_nolock(&tgtuuid);
686         if (rc < 0)
687                 rc = class_name2dev_nolock(str);
688
689         if (rc >= 0)
690                 target = class_num2obd(rc);
691
692         if (target != NULL)
693                 class_incref(target, "find", current);
694         read_unlock(&obd_dev_lock);
695
696         RETURN(target);
697 }
698 EXPORT_SYMBOL(class_dev_by_str);
699
700 /**
701  * Get obd devices count. Device in any
702  *    state are counted
703  * \retval obd device count
704  */
705 int get_devices_count(void)
706 {
707         int index, max_index = class_devno_max(), dev_count = 0;
708
709         read_lock(&obd_dev_lock);
710         for (index = 0; index <= max_index; index++) {
711                 struct obd_device *obd = class_num2obd(index);
712                 if (obd != NULL)
713                         dev_count++;
714         }
715         read_unlock(&obd_dev_lock);
716
717         return dev_count;
718 }
719 EXPORT_SYMBOL(get_devices_count);
720
721 void class_obd_list(void)
722 {
723         char *status;
724         int i;
725
726         read_lock(&obd_dev_lock);
727         for (i = 0; i < class_devno_max(); i++) {
728                 struct obd_device *obd = class_num2obd(i);
729
730                 if (obd == NULL)
731                         continue;
732                 if (obd->obd_stopping)
733                         status = "ST";
734                 else if (obd->obd_set_up)
735                         status = "UP";
736                 else if (obd->obd_attached)
737                         status = "AT";
738                 else
739                         status = "--";
740                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
741                          i, status, obd->obd_type->typ_name,
742                          obd->obd_name, obd->obd_uuid.uuid,
743                          atomic_read(&obd->obd_refcount));
744         }
745         read_unlock(&obd_dev_lock);
746 }
747
748 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
749  * specified, then only the client with that uuid is returned,
750  * otherwise any client connected to the tgt is returned.
751  */
752 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
753                                          const char *type_name,
754                                          struct obd_uuid *grp_uuid)
755 {
756         int i;
757
758         read_lock(&obd_dev_lock);
759         for (i = 0; i < class_devno_max(); i++) {
760                 struct obd_device *obd = class_num2obd(i);
761
762                 if (obd == NULL)
763                         continue;
764                 if ((strncmp(obd->obd_type->typ_name, type_name,
765                              strlen(type_name)) == 0)) {
766                         if (obd_uuid_equals(tgt_uuid,
767                                             &obd->u.cli.cl_target_uuid) &&
768                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
769                                                          &obd->obd_uuid) : 1)) {
770                                 read_unlock(&obd_dev_lock);
771                                 return obd;
772                         }
773                 }
774         }
775         read_unlock(&obd_dev_lock);
776
777         return NULL;
778 }
779 EXPORT_SYMBOL(class_find_client_obd);
780
781 /* Iterate the obd_device list looking devices have grp_uuid. Start
782  * searching at *next, and if a device is found, the next index to look
783  * at is saved in *next. If next is NULL, then the first matching device
784  * will always be returned.
785  */
786 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
787 {
788         int i;
789
790         if (next == NULL)
791                 i = 0;
792         else if (*next >= 0 && *next < class_devno_max())
793                 i = *next;
794         else
795                 return NULL;
796
797         read_lock(&obd_dev_lock);
798         for (; i < class_devno_max(); i++) {
799                 struct obd_device *obd = class_num2obd(i);
800
801                 if (obd == NULL)
802                         continue;
803                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
804                         if (next != NULL)
805                                 *next = i+1;
806                         read_unlock(&obd_dev_lock);
807                         return obd;
808                 }
809         }
810         read_unlock(&obd_dev_lock);
811
812         return NULL;
813 }
814 EXPORT_SYMBOL(class_devices_in_group);
815
816 /**
817  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
818  * adjust sptlrpc settings accordingly.
819  */
820 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
821 {
822         struct obd_device  *obd;
823         const char         *type;
824         int                 i, rc = 0, rc2;
825
826         LASSERT(namelen > 0);
827
828         read_lock(&obd_dev_lock);
829         for (i = 0; i < class_devno_max(); i++) {
830                 obd = class_num2obd(i);
831
832                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
833                         continue;
834
835                 /* only notify mdc, osc, osp, lwp, mdt, ost
836                  * because only these have a -sptlrpc llog */
837                 type = obd->obd_type->typ_name;
838                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
839                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
840                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
841                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
842                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
843                     strcmp(type, LUSTRE_OST_NAME) != 0)
844                         continue;
845
846                 if (strncmp(obd->obd_name, fsname, namelen))
847                         continue;
848
849                 class_incref(obd, __FUNCTION__, obd);
850                 read_unlock(&obd_dev_lock);
851                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
852                                          sizeof(KEY_SPTLRPC_CONF),
853                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
854                 rc = rc ? rc : rc2;
855                 class_decref(obd, __FUNCTION__, obd);
856                 read_lock(&obd_dev_lock);
857         }
858         read_unlock(&obd_dev_lock);
859         return rc;
860 }
861 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
862
863 void obd_cleanup_caches(void)
864 {
865         ENTRY;
866         if (obd_device_cachep) {
867                 kmem_cache_destroy(obd_device_cachep);
868                 obd_device_cachep = NULL;
869         }
870
871         EXIT;
872 }
873
874 int obd_init_caches(void)
875 {
876         int rc;
877         ENTRY;
878
879         LASSERT(obd_device_cachep == NULL);
880         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
881                                 sizeof(struct obd_device),
882                                 0, 0, 0, sizeof(struct obd_device), NULL);
883         if (!obd_device_cachep)
884                 GOTO(out, rc = -ENOMEM);
885
886         RETURN(0);
887 out:
888         obd_cleanup_caches();
889         RETURN(rc);
890 }
891
892 static const char export_handle_owner[] = "export";
893
894 /* map connection to client */
895 struct obd_export *class_conn2export(struct lustre_handle *conn)
896 {
897         struct obd_export *export;
898         ENTRY;
899
900         if (!conn) {
901                 CDEBUG(D_CACHE, "looking for null handle\n");
902                 RETURN(NULL);
903         }
904
905         if (conn->cookie == -1) {  /* this means assign a new connection */
906                 CDEBUG(D_CACHE, "want a new connection\n");
907                 RETURN(NULL);
908         }
909
910         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
911         export = class_handle2object(conn->cookie, export_handle_owner);
912         RETURN(export);
913 }
914 EXPORT_SYMBOL(class_conn2export);
915
916 struct obd_device *class_exp2obd(struct obd_export *exp)
917 {
918         if (exp)
919                 return exp->exp_obd;
920         return NULL;
921 }
922 EXPORT_SYMBOL(class_exp2obd);
923
924 struct obd_import *class_exp2cliimp(struct obd_export *exp)
925 {
926         struct obd_device *obd = exp->exp_obd;
927         if (obd == NULL)
928                 return NULL;
929         return obd->u.cli.cl_import;
930 }
931 EXPORT_SYMBOL(class_exp2cliimp);
932
933 /* Export management functions */
934 static void class_export_destroy(struct obd_export *exp)
935 {
936         struct obd_device *obd = exp->exp_obd;
937         ENTRY;
938
939         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
940         LASSERT(obd != NULL);
941
942         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
943                exp->exp_client_uuid.uuid, obd->obd_name);
944
945         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
946         ptlrpc_connection_put(exp->exp_connection);
947
948         LASSERT(list_empty(&exp->exp_outstanding_replies));
949         LASSERT(list_empty(&exp->exp_uncommitted_replies));
950         LASSERT(list_empty(&exp->exp_req_replay_queue));
951         LASSERT(list_empty(&exp->exp_hp_rpcs));
952         obd_destroy_export(exp);
953         /* self export doesn't hold a reference to an obd, although it
954          * exists until freeing of the obd */
955         if (exp != obd->obd_self_export)
956                 class_decref(obd, "export", exp);
957
958         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
959         kfree_rcu(exp, exp_handle.h_rcu);
960         EXIT;
961 }
962
963 struct obd_export *class_export_get(struct obd_export *exp)
964 {
965         refcount_inc(&exp->exp_handle.h_ref);
966         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
967                refcount_read(&exp->exp_handle.h_ref));
968         return exp;
969 }
970 EXPORT_SYMBOL(class_export_get);
971
972 void class_export_put(struct obd_export *exp)
973 {
974         LASSERT(exp != NULL);
975         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
976         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
977         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
978                refcount_read(&exp->exp_handle.h_ref) - 1);
979
980         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
981                 struct obd_device *obd = exp->exp_obd;
982
983                 CDEBUG(D_IOCTL, "final put %p/%s\n",
984                        exp, exp->exp_client_uuid.uuid);
985
986                 /* release nid stat refererence */
987                 lprocfs_exp_cleanup(exp);
988
989                 if (exp == obd->obd_self_export) {
990                         /* self export should be destroyed without
991                          * zombie thread as it doesn't hold a
992                          * reference to obd and doesn't hold any
993                          * resources */
994                         class_export_destroy(exp);
995                         /* self export is destroyed, no class
996                          * references exist and it is safe to free
997                          * obd */
998                         class_free_dev(obd);
999                 } else {
1000                         LASSERT(!list_empty(&exp->exp_obd_chain));
1001                         obd_zombie_export_add(exp);
1002                 }
1003
1004         }
1005 }
1006 EXPORT_SYMBOL(class_export_put);
1007
1008 static void obd_zombie_exp_cull(struct work_struct *ws)
1009 {
1010         struct obd_export *export;
1011
1012         export = container_of(ws, struct obd_export, exp_zombie_work);
1013         class_export_destroy(export);
1014 }
1015
1016 /* Creates a new export, adds it to the hash table, and returns a
1017  * pointer to it. The refcount is 2: one for the hash reference, and
1018  * one for the pointer returned by this function. */
1019 struct obd_export *__class_new_export(struct obd_device *obd,
1020                                       struct obd_uuid *cluuid, bool is_self)
1021 {
1022         struct obd_export *export;
1023         int rc = 0;
1024         ENTRY;
1025
1026         OBD_ALLOC_PTR(export);
1027         if (!export)
1028                 return ERR_PTR(-ENOMEM);
1029
1030         export->exp_conn_cnt = 0;
1031         export->exp_lock_hash = NULL;
1032         export->exp_flock_hash = NULL;
1033         /* 2 = class_handle_hash + last */
1034         refcount_set(&export->exp_handle.h_ref, 2);
1035         atomic_set(&export->exp_rpc_count, 0);
1036         atomic_set(&export->exp_cb_count, 0);
1037         atomic_set(&export->exp_locks_count, 0);
1038 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1039         INIT_LIST_HEAD(&export->exp_locks_list);
1040         spin_lock_init(&export->exp_locks_list_guard);
1041 #endif
1042         atomic_set(&export->exp_replay_count, 0);
1043         export->exp_obd = obd;
1044         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1045         spin_lock_init(&export->exp_uncommitted_replies_lock);
1046         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1047         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1048         INIT_HLIST_NODE(&export->exp_handle.h_link);
1049         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1050         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1051         class_handle_hash(&export->exp_handle, export_handle_owner);
1052         export->exp_last_request_time = ktime_get_real_seconds();
1053         spin_lock_init(&export->exp_lock);
1054         spin_lock_init(&export->exp_rpc_lock);
1055         INIT_HLIST_NODE(&export->exp_gen_hash);
1056         spin_lock_init(&export->exp_bl_list_lock);
1057         INIT_LIST_HEAD(&export->exp_bl_list);
1058         INIT_LIST_HEAD(&export->exp_stale_list);
1059         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1060
1061         export->exp_sp_peer = LUSTRE_SP_ANY;
1062         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1063         export->exp_client_uuid = *cluuid;
1064         obd_init_export(export);
1065
1066         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1067
1068         spin_lock(&obd->obd_dev_lock);
1069         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1070                 /* shouldn't happen, but might race */
1071                 if (obd->obd_stopping)
1072                         GOTO(exit_unlock, rc = -ENODEV);
1073
1074                 rc = obd_uuid_add(obd, export);
1075                 if (rc != 0) {
1076                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1077                                       obd->obd_name, cluuid->uuid, rc);
1078                         GOTO(exit_unlock, rc = -EALREADY);
1079                 }
1080         }
1081
1082         if (!is_self) {
1083                 class_incref(obd, "export", export);
1084                 list_add_tail(&export->exp_obd_chain_timed,
1085                               &obd->obd_exports_timed);
1086                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1087                 obd->obd_num_exports++;
1088         } else {
1089                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1090                 INIT_LIST_HEAD(&export->exp_obd_chain);
1091         }
1092         spin_unlock(&obd->obd_dev_lock);
1093         RETURN(export);
1094
1095 exit_unlock:
1096         spin_unlock(&obd->obd_dev_lock);
1097         class_handle_unhash(&export->exp_handle);
1098         obd_destroy_export(export);
1099         OBD_FREE_PTR(export);
1100         return ERR_PTR(rc);
1101 }
1102
1103 struct obd_export *class_new_export(struct obd_device *obd,
1104                                     struct obd_uuid *uuid)
1105 {
1106         return __class_new_export(obd, uuid, false);
1107 }
1108 EXPORT_SYMBOL(class_new_export);
1109
1110 struct obd_export *class_new_export_self(struct obd_device *obd,
1111                                          struct obd_uuid *uuid)
1112 {
1113         return __class_new_export(obd, uuid, true);
1114 }
1115
1116 void class_unlink_export(struct obd_export *exp)
1117 {
1118         class_handle_unhash(&exp->exp_handle);
1119
1120         if (exp->exp_obd->obd_self_export == exp) {
1121                 class_export_put(exp);
1122                 return;
1123         }
1124
1125         spin_lock(&exp->exp_obd->obd_dev_lock);
1126         /* delete an uuid-export hashitem from hashtables */
1127         if (exp != exp->exp_obd->obd_self_export)
1128                 obd_uuid_del(exp->exp_obd, exp);
1129
1130 #ifdef HAVE_SERVER_SUPPORT
1131         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1132                 struct tg_export_data   *ted = &exp->exp_target_data;
1133                 struct cfs_hash         *hash;
1134
1135                 /* Because obd_gen_hash will not be released until
1136                  * class_cleanup(), so hash should never be NULL here */
1137                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1138                 LASSERT(hash != NULL);
1139                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1140                              &exp->exp_gen_hash);
1141                 cfs_hash_putref(hash);
1142         }
1143 #endif /* HAVE_SERVER_SUPPORT */
1144
1145         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1146         list_del_init(&exp->exp_obd_chain_timed);
1147         exp->exp_obd->obd_num_exports--;
1148         spin_unlock(&exp->exp_obd->obd_dev_lock);
1149         atomic_inc(&obd_stale_export_num);
1150
1151         /* A reference is kept by obd_stale_exports list */
1152         obd_stale_export_put(exp);
1153 }
1154 EXPORT_SYMBOL(class_unlink_export);
1155
1156 /* Import management functions */
1157 static void obd_zombie_import_free(struct obd_import *imp)
1158 {
1159         ENTRY;
1160
1161         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1162                imp->imp_obd->obd_name);
1163
1164         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1165
1166         ptlrpc_connection_put(imp->imp_connection);
1167
1168         while (!list_empty(&imp->imp_conn_list)) {
1169                 struct obd_import_conn *imp_conn;
1170
1171                 imp_conn = list_first_entry(&imp->imp_conn_list,
1172                                             struct obd_import_conn, oic_item);
1173                 list_del_init(&imp_conn->oic_item);
1174                 ptlrpc_connection_put(imp_conn->oic_conn);
1175                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1176         }
1177
1178         LASSERT(imp->imp_sec == NULL);
1179         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1180                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1181         class_decref(imp->imp_obd, "import", imp);
1182         OBD_FREE_PTR(imp);
1183         EXIT;
1184 }
1185
1186 struct obd_import *class_import_get(struct obd_import *import)
1187 {
1188         refcount_inc(&import->imp_refcount);
1189         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1190                refcount_read(&import->imp_refcount),
1191                import->imp_obd->obd_name);
1192         return import;
1193 }
1194 EXPORT_SYMBOL(class_import_get);
1195
1196 void class_import_put(struct obd_import *imp)
1197 {
1198         ENTRY;
1199
1200         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1201
1202         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1203                refcount_read(&imp->imp_refcount) - 1,
1204                imp->imp_obd->obd_name);
1205
1206         if (refcount_dec_and_test(&imp->imp_refcount)) {
1207                 CDEBUG(D_INFO, "final put import %p\n", imp);
1208                 obd_zombie_import_add(imp);
1209         }
1210
1211         EXIT;
1212 }
1213 EXPORT_SYMBOL(class_import_put);
1214
1215 static void init_imp_at(struct imp_at *at) {
1216         int i;
1217         at_init(&at->iat_net_latency, 0, 0);
1218         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1219                 /* max service estimates are tracked on the server side, so
1220                    don't use the AT history here, just use the last reported
1221                    val. (But keep hist for proc histogram, worst_ever) */
1222                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1223                         AT_FLG_NOHIST);
1224         }
1225 }
1226
1227 static void obd_zombie_imp_cull(struct work_struct *ws)
1228 {
1229         struct obd_import *import;
1230
1231         import = container_of(ws, struct obd_import, imp_zombie_work);
1232         obd_zombie_import_free(import);
1233 }
1234
1235 struct obd_import *class_new_import(struct obd_device *obd)
1236 {
1237         struct obd_import *imp;
1238         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1239
1240         OBD_ALLOC(imp, sizeof(*imp));
1241         if (imp == NULL)
1242                 return NULL;
1243
1244         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1245         INIT_LIST_HEAD(&imp->imp_replay_list);
1246         INIT_LIST_HEAD(&imp->imp_sending_list);
1247         INIT_LIST_HEAD(&imp->imp_delayed_list);
1248         INIT_LIST_HEAD(&imp->imp_committed_list);
1249         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1250         imp->imp_known_replied_xid = 0;
1251         imp->imp_replay_cursor = &imp->imp_committed_list;
1252         spin_lock_init(&imp->imp_lock);
1253         imp->imp_last_success_conn = 0;
1254         imp->imp_state = LUSTRE_IMP_NEW;
1255         imp->imp_obd = class_incref(obd, "import", imp);
1256         rwlock_init(&imp->imp_sec_lock);
1257         init_waitqueue_head(&imp->imp_recovery_waitq);
1258         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1259
1260         if (curr_pid_ns && curr_pid_ns->child_reaper)
1261                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1262         else
1263                 imp->imp_sec_refpid = 1;
1264
1265         refcount_set(&imp->imp_refcount, 2);
1266         atomic_set(&imp->imp_unregistering, 0);
1267         atomic_set(&imp->imp_reqs, 0);
1268         atomic_set(&imp->imp_inflight, 0);
1269         atomic_set(&imp->imp_replay_inflight, 0);
1270         init_waitqueue_head(&imp->imp_replay_waitq);
1271         atomic_set(&imp->imp_inval_count, 0);
1272         INIT_LIST_HEAD(&imp->imp_conn_list);
1273         init_imp_at(&imp->imp_at);
1274
1275         /* the default magic is V2, will be used in connect RPC, and
1276          * then adjusted according to the flags in request/reply. */
1277         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1278
1279         return imp;
1280 }
1281 EXPORT_SYMBOL(class_new_import);
1282
1283 void class_destroy_import(struct obd_import *import)
1284 {
1285         LASSERT(import != NULL);
1286         LASSERT(import != LP_POISON);
1287
1288         spin_lock(&import->imp_lock);
1289         import->imp_generation++;
1290         spin_unlock(&import->imp_lock);
1291         class_import_put(import);
1292 }
1293 EXPORT_SYMBOL(class_destroy_import);
1294
1295 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1296
1297 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1298 {
1299         spin_lock(&exp->exp_locks_list_guard);
1300
1301         LASSERT(lock->l_exp_refs_nr >= 0);
1302
1303         if (lock->l_exp_refs_target != NULL &&
1304             lock->l_exp_refs_target != exp) {
1305                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1306                               exp, lock, lock->l_exp_refs_target);
1307         }
1308         if ((lock->l_exp_refs_nr ++) == 0) {
1309                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1310                 lock->l_exp_refs_target = exp;
1311         }
1312         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1313                lock, exp, lock->l_exp_refs_nr);
1314         spin_unlock(&exp->exp_locks_list_guard);
1315 }
1316 EXPORT_SYMBOL(__class_export_add_lock_ref);
1317
1318 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1319 {
1320         spin_lock(&exp->exp_locks_list_guard);
1321         LASSERT(lock->l_exp_refs_nr > 0);
1322         if (lock->l_exp_refs_target != exp) {
1323                 LCONSOLE_WARN("lock %p, "
1324                               "mismatching export pointers: %p, %p\n",
1325                               lock, lock->l_exp_refs_target, exp);
1326         }
1327         if (-- lock->l_exp_refs_nr == 0) {
1328                 list_del_init(&lock->l_exp_refs_link);
1329                 lock->l_exp_refs_target = NULL;
1330         }
1331         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1332                lock, exp, lock->l_exp_refs_nr);
1333         spin_unlock(&exp->exp_locks_list_guard);
1334 }
1335 EXPORT_SYMBOL(__class_export_del_lock_ref);
1336 #endif
1337
1338 /* A connection defines an export context in which preallocation can
1339    be managed. This releases the export pointer reference, and returns
1340    the export handle, so the export refcount is 1 when this function
1341    returns. */
1342 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1343                   struct obd_uuid *cluuid)
1344 {
1345         struct obd_export *export;
1346         LASSERT(conn != NULL);
1347         LASSERT(obd != NULL);
1348         LASSERT(cluuid != NULL);
1349         ENTRY;
1350
1351         export = class_new_export(obd, cluuid);
1352         if (IS_ERR(export))
1353                 RETURN(PTR_ERR(export));
1354
1355         conn->cookie = export->exp_handle.h_cookie;
1356         class_export_put(export);
1357
1358         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1359                cluuid->uuid, conn->cookie);
1360         RETURN(0);
1361 }
1362 EXPORT_SYMBOL(class_connect);
1363
1364 /* if export is involved in recovery then clean up related things */
1365 static void class_export_recovery_cleanup(struct obd_export *exp)
1366 {
1367         struct obd_device *obd = exp->exp_obd;
1368
1369         spin_lock(&obd->obd_recovery_task_lock);
1370         if (obd->obd_recovering) {
1371                 if (exp->exp_in_recovery) {
1372                         spin_lock(&exp->exp_lock);
1373                         exp->exp_in_recovery = 0;
1374                         spin_unlock(&exp->exp_lock);
1375                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1376                         atomic_dec(&obd->obd_connected_clients);
1377                 }
1378
1379                 /* if called during recovery then should update
1380                  * obd_stale_clients counter,
1381                  * lightweight exports are not counted */
1382                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1383                         exp->exp_obd->obd_stale_clients++;
1384         }
1385         spin_unlock(&obd->obd_recovery_task_lock);
1386
1387         spin_lock(&exp->exp_lock);
1388         /** Cleanup req replay fields */
1389         if (exp->exp_req_replay_needed) {
1390                 exp->exp_req_replay_needed = 0;
1391
1392                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1393                 atomic_dec(&obd->obd_req_replay_clients);
1394         }
1395
1396         /** Cleanup lock replay data */
1397         if (exp->exp_lock_replay_needed) {
1398                 exp->exp_lock_replay_needed = 0;
1399
1400                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1401                 atomic_dec(&obd->obd_lock_replay_clients);
1402         }
1403         spin_unlock(&exp->exp_lock);
1404 }
1405
1406 /* This function removes 1-3 references from the export:
1407  * 1 - for export pointer passed
1408  * and if disconnect really need
1409  * 2 - removing from hash
1410  * 3 - in client_unlink_export
1411  * The export pointer passed to this function can destroyed */
1412 int class_disconnect(struct obd_export *export)
1413 {
1414         int already_disconnected;
1415         ENTRY;
1416
1417         if (export == NULL) {
1418                 CWARN("attempting to free NULL export %p\n", export);
1419                 RETURN(-EINVAL);
1420         }
1421
1422         spin_lock(&export->exp_lock);
1423         already_disconnected = export->exp_disconnected;
1424         export->exp_disconnected = 1;
1425 #ifdef HAVE_SERVER_SUPPORT
1426         /*  We hold references of export for uuid hash
1427          *  and nid_hash and export link at least. So
1428          *  it is safe to call rh*table_remove_fast in
1429          *  there.
1430          */
1431         obd_nid_del(export->exp_obd, export);
1432 #endif /* HAVE_SERVER_SUPPORT */
1433         spin_unlock(&export->exp_lock);
1434
1435         /* class_cleanup(), abort_recovery(), and class_fail_export()
1436          * all end up in here, and if any of them race we shouldn't
1437          * call extra class_export_puts(). */
1438         if (already_disconnected)
1439                 GOTO(no_disconn, already_disconnected);
1440
1441         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1442                export->exp_handle.h_cookie);
1443
1444         class_export_recovery_cleanup(export);
1445         class_unlink_export(export);
1446 no_disconn:
1447         class_export_put(export);
1448         RETURN(0);
1449 }
1450 EXPORT_SYMBOL(class_disconnect);
1451
1452 /* Return non-zero for a fully connected export */
1453 int class_connected_export(struct obd_export *exp)
1454 {
1455         int connected = 0;
1456
1457         if (exp) {
1458                 spin_lock(&exp->exp_lock);
1459                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1460                 spin_unlock(&exp->exp_lock);
1461         }
1462         return connected;
1463 }
1464 EXPORT_SYMBOL(class_connected_export);
1465
1466 static void class_disconnect_export_list(struct list_head *list,
1467                                          enum obd_option flags)
1468 {
1469         int rc;
1470         struct obd_export *exp;
1471         ENTRY;
1472
1473         /* It's possible that an export may disconnect itself, but
1474          * nothing else will be added to this list. */
1475         while (!list_empty(list)) {
1476                 exp = list_first_entry(list, struct obd_export,
1477                                        exp_obd_chain);
1478                 /* need for safe call CDEBUG after obd_disconnect */
1479                 class_export_get(exp);
1480
1481                 spin_lock(&exp->exp_lock);
1482                 exp->exp_flags = flags;
1483                 spin_unlock(&exp->exp_lock);
1484
1485                 if (obd_uuid_equals(&exp->exp_client_uuid,
1486                                     &exp->exp_obd->obd_uuid)) {
1487                         CDEBUG(D_HA,
1488                                "exp %p export uuid == obd uuid, don't discon\n",
1489                                exp);
1490                         /* Need to delete this now so we don't end up pointing
1491                          * to work_list later when this export is cleaned up. */
1492                         list_del_init(&exp->exp_obd_chain);
1493                         class_export_put(exp);
1494                         continue;
1495                 }
1496
1497                 class_export_get(exp);
1498                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1499                        "last request at %lld\n",
1500                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1501                        exp, exp->exp_last_request_time);
1502                 /* release one export reference anyway */
1503                 rc = obd_disconnect(exp);
1504
1505                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1506                        obd_export_nid2str(exp), exp, rc);
1507                 class_export_put(exp);
1508         }
1509         EXIT;
1510 }
1511
1512 void class_disconnect_exports(struct obd_device *obd)
1513 {
1514         LIST_HEAD(work_list);
1515         ENTRY;
1516
1517         /* Move all of the exports from obd_exports to a work list, en masse. */
1518         spin_lock(&obd->obd_dev_lock);
1519         list_splice_init(&obd->obd_exports, &work_list);
1520         list_splice_init(&obd->obd_delayed_exports, &work_list);
1521         spin_unlock(&obd->obd_dev_lock);
1522
1523         if (!list_empty(&work_list)) {
1524                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1525                        "disconnecting them\n", obd->obd_minor, obd);
1526                 class_disconnect_export_list(&work_list,
1527                                              exp_flags_from_obd(obd));
1528         } else
1529                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1530                        obd->obd_minor, obd);
1531         EXIT;
1532 }
1533 EXPORT_SYMBOL(class_disconnect_exports);
1534
1535 /* Remove exports that have not completed recovery.
1536  */
1537 void class_disconnect_stale_exports(struct obd_device *obd,
1538                                     int (*test_export)(struct obd_export *))
1539 {
1540         LIST_HEAD(work_list);
1541         struct obd_export *exp, *n;
1542         int evicted = 0;
1543         ENTRY;
1544
1545         spin_lock(&obd->obd_dev_lock);
1546         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1547                                  exp_obd_chain) {
1548                 /* don't count self-export as client */
1549                 if (obd_uuid_equals(&exp->exp_client_uuid,
1550                                     &exp->exp_obd->obd_uuid))
1551                         continue;
1552
1553                 /* don't evict clients which have no slot in last_rcvd
1554                  * (e.g. lightweight connection) */
1555                 if (exp->exp_target_data.ted_lr_idx == -1)
1556                         continue;
1557
1558                 spin_lock(&exp->exp_lock);
1559                 if (exp->exp_failed || test_export(exp)) {
1560                         spin_unlock(&exp->exp_lock);
1561                         continue;
1562                 }
1563                 exp->exp_failed = 1;
1564                 spin_unlock(&exp->exp_lock);
1565
1566                 list_move(&exp->exp_obd_chain, &work_list);
1567                 evicted++;
1568                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1569                        obd->obd_name, exp->exp_client_uuid.uuid,
1570                        obd_export_nid2str(exp));
1571                 print_export_data(exp, "EVICTING", 0, D_HA);
1572         }
1573         spin_unlock(&obd->obd_dev_lock);
1574
1575         if (evicted)
1576                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1577                               obd->obd_name, evicted);
1578
1579         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1580                                                  OBD_OPT_ABORT_RECOV);
1581         EXIT;
1582 }
1583 EXPORT_SYMBOL(class_disconnect_stale_exports);
1584
1585 void class_fail_export(struct obd_export *exp)
1586 {
1587         int rc, already_failed;
1588
1589         spin_lock(&exp->exp_lock);
1590         already_failed = exp->exp_failed;
1591         exp->exp_failed = 1;
1592         spin_unlock(&exp->exp_lock);
1593
1594         if (already_failed) {
1595                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1596                        exp, exp->exp_client_uuid.uuid);
1597                 return;
1598         }
1599
1600         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1601                exp, exp->exp_client_uuid.uuid);
1602
1603         if (obd_dump_on_timeout)
1604                 libcfs_debug_dumplog();
1605
1606         /* need for safe call CDEBUG after obd_disconnect */
1607         class_export_get(exp);
1608
1609         /* Most callers into obd_disconnect are removing their own reference
1610          * (request, for example) in addition to the one from the hash table.
1611          * We don't have such a reference here, so make one. */
1612         class_export_get(exp);
1613         rc = obd_disconnect(exp);
1614         if (rc)
1615                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1616         else
1617                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1618                        exp, exp->exp_client_uuid.uuid);
1619         class_export_put(exp);
1620 }
1621 EXPORT_SYMBOL(class_fail_export);
1622
1623 #ifdef HAVE_SERVER_SUPPORT
1624
1625 static int take_first(struct obd_export *exp, void *data)
1626 {
1627         struct obd_export **expp = data;
1628
1629         if (*expp)
1630                 /* already have one */
1631                 return 0;
1632         if (exp->exp_failed)
1633                 /* Don't want this one */
1634                 return 0;
1635         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1636                 /* Cannot get a ref on this one */
1637                 return 0;
1638         *expp = exp;
1639         return 1;
1640 }
1641
1642 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1643 {
1644         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1645         struct obd_export *doomed_exp;
1646         int exports_evicted = 0;
1647
1648         spin_lock(&obd->obd_dev_lock);
1649         /* umount has run already, so evict thread should leave
1650          * its task to umount thread now */
1651         if (obd->obd_stopping) {
1652                 spin_unlock(&obd->obd_dev_lock);
1653                 return exports_evicted;
1654         }
1655         spin_unlock(&obd->obd_dev_lock);
1656
1657         doomed_exp = NULL;
1658         while (obd_nid_export_for_each(obd, nid_key,
1659                                        take_first, &doomed_exp) > 0) {
1660
1661                 LASSERTF(doomed_exp != obd->obd_self_export,
1662                          "self-export is hashed by NID?\n");
1663
1664                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1665                               obd->obd_name,
1666                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1667                               obd_export_nid2str(doomed_exp));
1668
1669                 class_fail_export(doomed_exp);
1670                 class_export_put(doomed_exp);
1671                 exports_evicted++;
1672                 doomed_exp = NULL;
1673         }
1674
1675         if (!exports_evicted)
1676                 CDEBUG(D_HA,
1677                        "%s: can't disconnect NID '%s': no exports found\n",
1678                        obd->obd_name, nid);
1679         return exports_evicted;
1680 }
1681 EXPORT_SYMBOL(obd_export_evict_by_nid);
1682
1683 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1684 {
1685         struct obd_export *doomed_exp = NULL;
1686         struct obd_uuid doomed_uuid;
1687         int exports_evicted = 0;
1688
1689         spin_lock(&obd->obd_dev_lock);
1690         if (obd->obd_stopping) {
1691                 spin_unlock(&obd->obd_dev_lock);
1692                 return exports_evicted;
1693         }
1694         spin_unlock(&obd->obd_dev_lock);
1695
1696         obd_str2uuid(&doomed_uuid, uuid);
1697         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1698                 CERROR("%s: can't evict myself\n", obd->obd_name);
1699                 return exports_evicted;
1700         }
1701
1702         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1703         if (doomed_exp == NULL) {
1704                 CERROR("%s: can't disconnect %s: no exports found\n",
1705                        obd->obd_name, uuid);
1706         } else {
1707                 CWARN("%s: evicting %s at adminstrative request\n",
1708                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1709                 class_fail_export(doomed_exp);
1710                 class_export_put(doomed_exp);
1711                 obd_uuid_del(obd, doomed_exp);
1712                 exports_evicted++;
1713         }
1714
1715         return exports_evicted;
1716 }
1717 #endif /* HAVE_SERVER_SUPPORT */
1718
1719 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1720 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1721 EXPORT_SYMBOL(class_export_dump_hook);
1722 #endif
1723
1724 static void print_export_data(struct obd_export *exp, const char *status,
1725                               int locks, int debug_level)
1726 {
1727         struct ptlrpc_reply_state *rs;
1728         struct ptlrpc_reply_state *first_reply = NULL;
1729         int nreplies = 0;
1730
1731         spin_lock(&exp->exp_lock);
1732         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1733                             rs_exp_list) {
1734                 if (nreplies == 0)
1735                         first_reply = rs;
1736                 nreplies++;
1737         }
1738         spin_unlock(&exp->exp_lock);
1739
1740         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1741                "%p %s %llu stale:%d\n",
1742                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1743                obd_export_nid2str(exp),
1744                refcount_read(&exp->exp_handle.h_ref),
1745                atomic_read(&exp->exp_rpc_count),
1746                atomic_read(&exp->exp_cb_count),
1747                atomic_read(&exp->exp_locks_count),
1748                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1749                nreplies, first_reply, nreplies > 3 ? "..." : "",
1750                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1751 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1752         if (locks && class_export_dump_hook != NULL)
1753                 class_export_dump_hook(exp);
1754 #endif
1755 }
1756
1757 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1758 {
1759         struct obd_export *exp;
1760
1761         spin_lock(&obd->obd_dev_lock);
1762         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1763                 print_export_data(exp, "ACTIVE", locks, debug_level);
1764         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1765                 print_export_data(exp, "UNLINKED", locks, debug_level);
1766         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1767                 print_export_data(exp, "DELAYED", locks, debug_level);
1768         spin_unlock(&obd->obd_dev_lock);
1769 }
1770
1771 void obd_exports_barrier(struct obd_device *obd)
1772 {
1773         int waited = 2;
1774         LASSERT(list_empty(&obd->obd_exports));
1775         spin_lock(&obd->obd_dev_lock);
1776         while (!list_empty(&obd->obd_unlinked_exports)) {
1777                 spin_unlock(&obd->obd_dev_lock);
1778                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1779                 if (waited > 5 && is_power_of_2(waited)) {
1780                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1781                                       "more than %d seconds. "
1782                                       "The obd refcount = %d. Is it stuck?\n",
1783                                       obd->obd_name, waited,
1784                                       atomic_read(&obd->obd_refcount));
1785                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1786                 }
1787                 waited *= 2;
1788                 spin_lock(&obd->obd_dev_lock);
1789         }
1790         spin_unlock(&obd->obd_dev_lock);
1791 }
1792 EXPORT_SYMBOL(obd_exports_barrier);
1793
1794 /**
1795  * Add export to the obd_zombe thread and notify it.
1796  */
1797 static void obd_zombie_export_add(struct obd_export *exp) {
1798         atomic_dec(&obd_stale_export_num);
1799         spin_lock(&exp->exp_obd->obd_dev_lock);
1800         LASSERT(!list_empty(&exp->exp_obd_chain));
1801         list_del_init(&exp->exp_obd_chain);
1802         spin_unlock(&exp->exp_obd->obd_dev_lock);
1803
1804         queue_work(zombie_wq, &exp->exp_zombie_work);
1805 }
1806
1807 /**
1808  * Add import to the obd_zombe thread and notify it.
1809  */
1810 static void obd_zombie_import_add(struct obd_import *imp) {
1811         LASSERT(imp->imp_sec == NULL);
1812
1813         queue_work(zombie_wq, &imp->imp_zombie_work);
1814 }
1815
1816 /**
1817  * wait when obd_zombie import/export queues become empty
1818  */
1819 void obd_zombie_barrier(void)
1820 {
1821         flush_workqueue(zombie_wq);
1822 }
1823 EXPORT_SYMBOL(obd_zombie_barrier);
1824
1825
1826 struct obd_export *obd_stale_export_get(void)
1827 {
1828         struct obd_export *exp = NULL;
1829         ENTRY;
1830
1831         spin_lock(&obd_stale_export_lock);
1832         if (!list_empty(&obd_stale_exports)) {
1833                 exp = list_first_entry(&obd_stale_exports,
1834                                        struct obd_export, exp_stale_list);
1835                 list_del_init(&exp->exp_stale_list);
1836         }
1837         spin_unlock(&obd_stale_export_lock);
1838
1839         if (exp) {
1840                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1841                        atomic_read(&obd_stale_export_num));
1842         }
1843         RETURN(exp);
1844 }
1845 EXPORT_SYMBOL(obd_stale_export_get);
1846
1847 void obd_stale_export_put(struct obd_export *exp)
1848 {
1849         ENTRY;
1850
1851         LASSERT(list_empty(&exp->exp_stale_list));
1852         if (exp->exp_lock_hash &&
1853             atomic_read(&exp->exp_lock_hash->hs_count)) {
1854                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1855                        atomic_read(&obd_stale_export_num));
1856
1857                 spin_lock_bh(&exp->exp_bl_list_lock);
1858                 spin_lock(&obd_stale_export_lock);
1859                 /* Add to the tail if there is no blocked locks,
1860                  * to the head otherwise. */
1861                 if (list_empty(&exp->exp_bl_list))
1862                         list_add_tail(&exp->exp_stale_list,
1863                                       &obd_stale_exports);
1864                 else
1865                         list_add(&exp->exp_stale_list,
1866                                  &obd_stale_exports);
1867
1868                 spin_unlock(&obd_stale_export_lock);
1869                 spin_unlock_bh(&exp->exp_bl_list_lock);
1870         } else {
1871                 class_export_put(exp);
1872         }
1873         EXIT;
1874 }
1875 EXPORT_SYMBOL(obd_stale_export_put);
1876
1877 /**
1878  * Adjust the position of the export in the stale list,
1879  * i.e. move to the head of the list if is needed.
1880  **/
1881 void obd_stale_export_adjust(struct obd_export *exp)
1882 {
1883         LASSERT(exp != NULL);
1884         spin_lock_bh(&exp->exp_bl_list_lock);
1885         spin_lock(&obd_stale_export_lock);
1886
1887         if (!list_empty(&exp->exp_stale_list) &&
1888             !list_empty(&exp->exp_bl_list))
1889                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1890
1891         spin_unlock(&obd_stale_export_lock);
1892         spin_unlock_bh(&exp->exp_bl_list_lock);
1893 }
1894 EXPORT_SYMBOL(obd_stale_export_adjust);
1895
1896 /**
1897  * start destroy zombie import/export thread
1898  */
1899 int obd_zombie_impexp_init(void)
1900 {
1901         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1902                                            0, CFS_CPT_ANY,
1903                                            cfs_cpt_number(cfs_cpt_tab));
1904
1905         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1906 }
1907
1908 /**
1909  * stop destroy zombie import/export thread
1910  */
1911 void obd_zombie_impexp_stop(void)
1912 {
1913         destroy_workqueue(zombie_wq);
1914         LASSERT(list_empty(&obd_stale_exports));
1915 }
1916
1917 /***** Kernel-userspace comm helpers *******/
1918
1919 /* Get length of entire message, including header */
1920 int kuc_len(int payload_len)
1921 {
1922         return sizeof(struct kuc_hdr) + payload_len;
1923 }
1924 EXPORT_SYMBOL(kuc_len);
1925
1926 /* Get a pointer to kuc header, given a ptr to the payload
1927  * @param p Pointer to payload area
1928  * @returns Pointer to kuc header
1929  */
1930 struct kuc_hdr * kuc_ptr(void *p)
1931 {
1932         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1933         LASSERT(lh->kuc_magic == KUC_MAGIC);
1934         return lh;
1935 }
1936 EXPORT_SYMBOL(kuc_ptr);
1937
1938 /* Alloc space for a message, and fill in header
1939  * @return Pointer to payload area
1940  */
1941 void *kuc_alloc(int payload_len, int transport, int type)
1942 {
1943         struct kuc_hdr *lh;
1944         int len = kuc_len(payload_len);
1945
1946         OBD_ALLOC(lh, len);
1947         if (lh == NULL)
1948                 return ERR_PTR(-ENOMEM);
1949
1950         lh->kuc_magic = KUC_MAGIC;
1951         lh->kuc_transport = transport;
1952         lh->kuc_msgtype = type;
1953         lh->kuc_msglen = len;
1954
1955         return (void *)(lh + 1);
1956 }
1957 EXPORT_SYMBOL(kuc_alloc);
1958
1959 /* Takes pointer to payload area */
1960 void kuc_free(void *p, int payload_len)
1961 {
1962         struct kuc_hdr *lh = kuc_ptr(p);
1963         OBD_FREE(lh, kuc_len(payload_len));
1964 }
1965 EXPORT_SYMBOL(kuc_free);
1966
1967 struct obd_request_slot_waiter {
1968         struct list_head        orsw_entry;
1969         wait_queue_head_t       orsw_waitq;
1970         bool                    orsw_signaled;
1971 };
1972
1973 static bool obd_request_slot_avail(struct client_obd *cli,
1974                                    struct obd_request_slot_waiter *orsw)
1975 {
1976         bool avail;
1977
1978         spin_lock(&cli->cl_loi_list_lock);
1979         avail = !!list_empty(&orsw->orsw_entry);
1980         spin_unlock(&cli->cl_loi_list_lock);
1981
1982         return avail;
1983 };
1984
1985 /*
1986  * For network flow control, the RPC sponsor needs to acquire a credit
1987  * before sending the RPC. The credits count for a connection is defined
1988  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1989  * the subsequent RPC sponsors need to wait until others released their
1990  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1991  */
1992 int obd_get_request_slot(struct client_obd *cli)
1993 {
1994         struct obd_request_slot_waiter   orsw;
1995         int                              rc;
1996
1997         spin_lock(&cli->cl_loi_list_lock);
1998         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1999                 cli->cl_rpcs_in_flight++;
2000                 spin_unlock(&cli->cl_loi_list_lock);
2001                 return 0;
2002         }
2003
2004         init_waitqueue_head(&orsw.orsw_waitq);
2005         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2006         orsw.orsw_signaled = false;
2007         spin_unlock(&cli->cl_loi_list_lock);
2008
2009         rc = l_wait_event_abortable(orsw.orsw_waitq,
2010                                     obd_request_slot_avail(cli, &orsw) ||
2011                                     orsw.orsw_signaled);
2012
2013         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2014          * freed but other (such as obd_put_request_slot) is using it. */
2015         spin_lock(&cli->cl_loi_list_lock);
2016         if (rc != 0) {
2017                 if (!orsw.orsw_signaled) {
2018                         if (list_empty(&orsw.orsw_entry))
2019                                 cli->cl_rpcs_in_flight--;
2020                         else
2021                                 list_del(&orsw.orsw_entry);
2022                 }
2023                 rc = -EINTR;
2024         }
2025
2026         if (orsw.orsw_signaled) {
2027                 LASSERT(list_empty(&orsw.orsw_entry));
2028
2029                 rc = -EINTR;
2030         }
2031         spin_unlock(&cli->cl_loi_list_lock);
2032
2033         return rc;
2034 }
2035 EXPORT_SYMBOL(obd_get_request_slot);
2036
2037 void obd_put_request_slot(struct client_obd *cli)
2038 {
2039         struct obd_request_slot_waiter *orsw;
2040
2041         spin_lock(&cli->cl_loi_list_lock);
2042         cli->cl_rpcs_in_flight--;
2043
2044         /* If there is free slot, wakeup the first waiter. */
2045         if (!list_empty(&cli->cl_flight_waiters) &&
2046             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2047                 orsw = list_first_entry(&cli->cl_flight_waiters,
2048                                         struct obd_request_slot_waiter,
2049                                         orsw_entry);
2050                 list_del_init(&orsw->orsw_entry);
2051                 cli->cl_rpcs_in_flight++;
2052                 wake_up(&orsw->orsw_waitq);
2053         }
2054         spin_unlock(&cli->cl_loi_list_lock);
2055 }
2056 EXPORT_SYMBOL(obd_put_request_slot);
2057
2058 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2059 {
2060         return cli->cl_max_rpcs_in_flight;
2061 }
2062 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2063
2064 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2065 {
2066         struct obd_request_slot_waiter *orsw;
2067         __u32                           old;
2068         int                             diff;
2069         int                             i;
2070         int                             rc;
2071
2072         if (max > OBD_MAX_RIF_MAX || max < 1)
2073                 return -ERANGE;
2074
2075         CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2076                cli->cl_import->imp_obd->obd_name, max,
2077                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2078
2079         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2080                    LUSTRE_MDC_NAME) == 0) {
2081                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2082                  * strictly lower that max_rpcs_in_flight */
2083                 if (max < 2) {
2084                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2085                                cli->cl_import->imp_obd->obd_name);
2086                         return -ERANGE;
2087                 }
2088                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2089                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2090                         if (rc != 0)
2091                                 return rc;
2092                 }
2093         }
2094
2095         spin_lock(&cli->cl_loi_list_lock);
2096         old = cli->cl_max_rpcs_in_flight;
2097         cli->cl_max_rpcs_in_flight = max;
2098         client_adjust_max_dirty(cli);
2099
2100         diff = max - old;
2101
2102         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2103         for (i = 0; i < diff; i++) {
2104                 if (list_empty(&cli->cl_flight_waiters))
2105                         break;
2106
2107                 orsw = list_first_entry(&cli->cl_flight_waiters,
2108                                         struct obd_request_slot_waiter,
2109                                         orsw_entry);
2110                 list_del_init(&orsw->orsw_entry);
2111                 cli->cl_rpcs_in_flight++;
2112                 wake_up(&orsw->orsw_waitq);
2113         }
2114         spin_unlock(&cli->cl_loi_list_lock);
2115
2116         return 0;
2117 }
2118 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2119
2120 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2121 {
2122         return cli->cl_max_mod_rpcs_in_flight;
2123 }
2124 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2125
2126 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2127 {
2128         struct obd_connect_data *ocd;
2129         __u16 maxmodrpcs;
2130         __u16 prev;
2131
2132         if (max > OBD_MAX_RIF_MAX || max < 1)
2133                 return -ERANGE;
2134
2135         ocd = &cli->cl_import->imp_connect_data;
2136         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2137                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2138                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2139
2140         if (max == OBD_MAX_RIF_MAX)
2141                 max = OBD_MAX_RIF_MAX - 1;
2142
2143         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2144          * increase this value, also bump up max_rpcs_in_flight to match.
2145          */
2146         if (max >= cli->cl_max_rpcs_in_flight) {
2147                 CDEBUG(D_INFO,
2148                        "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2149                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2150                 obd_set_max_rpcs_in_flight(cli, max + 1);
2151         }
2152
2153         /* cannot exceed max modify RPCs in flight supported by the server,
2154          * but verify ocd_connect_flags is at least initialized first.  If
2155          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2156          */
2157         if (!ocd->ocd_connect_flags) {
2158                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2159         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2160                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2161                 if (maxmodrpcs == 0) { /* connection not finished yet */
2162                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2163                         CDEBUG(D_INFO,
2164                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2165                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2166                 }
2167         } else {
2168                 maxmodrpcs = 1;
2169         }
2170         if (max > maxmodrpcs) {
2171                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2172                        cli->cl_import->imp_obd->obd_name,
2173                        max, maxmodrpcs);
2174                 return -ERANGE;
2175         }
2176
2177         spin_lock(&cli->cl_mod_rpcs_lock);
2178
2179         prev = cli->cl_max_mod_rpcs_in_flight;
2180         cli->cl_max_mod_rpcs_in_flight = max;
2181
2182         /* wakeup waiters if limit has been increased */
2183         if (cli->cl_max_mod_rpcs_in_flight > prev)
2184                 wake_up(&cli->cl_mod_rpcs_waitq);
2185
2186         spin_unlock(&cli->cl_mod_rpcs_lock);
2187
2188         return 0;
2189 }
2190 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2191
2192 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2193                                struct seq_file *seq)
2194 {
2195         unsigned long mod_tot = 0, mod_cum;
2196         struct timespec64 now;
2197         int i;
2198
2199         ktime_get_real_ts64(&now);
2200
2201         spin_lock(&cli->cl_mod_rpcs_lock);
2202
2203         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2204                    (s64)now.tv_sec, now.tv_nsec);
2205         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2206                    cli->cl_mod_rpcs_in_flight);
2207
2208         seq_printf(seq, "\n\t\t\tmodify\n");
2209         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2210
2211         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2212
2213         mod_cum = 0;
2214         for (i = 0; i < OBD_HIST_MAX; i++) {
2215                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2216                 mod_cum += mod;
2217                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2218                            i, mod, pct(mod, mod_tot),
2219                            pct(mod_cum, mod_tot));
2220                 if (mod_cum == mod_tot)
2221                         break;
2222         }
2223
2224         spin_unlock(&cli->cl_mod_rpcs_lock);
2225
2226         return 0;
2227 }
2228 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2229
2230 /* The number of modify RPCs sent in parallel is limited
2231  * because the server has a finite number of slots per client to
2232  * store request result and ensure reply reconstruction when needed.
2233  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2234  * that takes into account server limit and cl_max_rpcs_in_flight
2235  * value.
2236  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2237  * one close request is allowed above the maximum.
2238  */
2239 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2240                                                  bool close_req)
2241 {
2242         bool avail;
2243
2244         /* A slot is available if
2245          * - number of modify RPCs in flight is less than the max
2246          * - it's a close RPC and no other close request is in flight
2247          */
2248         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2249                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2250
2251         return avail;
2252 }
2253
2254 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2255                                          bool close_req)
2256 {
2257         bool avail;
2258
2259         spin_lock(&cli->cl_mod_rpcs_lock);
2260         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2261         spin_unlock(&cli->cl_mod_rpcs_lock);
2262         return avail;
2263 }
2264
2265
2266 /* Get a modify RPC slot from the obd client @cli according
2267  * to the kind of operation @opc that is going to be sent
2268  * and the intent @it of the operation if it applies.
2269  * If the maximum number of modify RPCs in flight is reached
2270  * the thread is put to sleep.
2271  * Returns the tag to be set in the request message. Tag 0
2272  * is reserved for non-modifying requests.
2273  */
2274 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2275 {
2276         bool                    close_req = false;
2277         __u16                   i, max;
2278
2279         if (opc == MDS_CLOSE)
2280                 close_req = true;
2281
2282         do {
2283                 spin_lock(&cli->cl_mod_rpcs_lock);
2284                 max = cli->cl_max_mod_rpcs_in_flight;
2285                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2286                         /* there is a slot available */
2287                         cli->cl_mod_rpcs_in_flight++;
2288                         if (close_req)
2289                                 cli->cl_close_rpcs_in_flight++;
2290                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2291                                          cli->cl_mod_rpcs_in_flight);
2292                         /* find a free tag */
2293                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2294                                                 max + 1);
2295                         LASSERT(i < OBD_MAX_RIF_MAX);
2296                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2297                         spin_unlock(&cli->cl_mod_rpcs_lock);
2298                         /* tag 0 is reserved for non-modify RPCs */
2299
2300                         CDEBUG(D_RPCTRACE,
2301                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2302                                cli->cl_import->imp_obd->obd_name,
2303                                i + 1, opc, max);
2304
2305                         return i + 1;
2306                 }
2307                 spin_unlock(&cli->cl_mod_rpcs_lock);
2308
2309                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2310                        "opc %u, max %hu\n",
2311                        cli->cl_import->imp_obd->obd_name, opc, max);
2312
2313                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2314                                           obd_mod_rpc_slot_avail(cli,
2315                                                                  close_req));
2316         } while (true);
2317 }
2318 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2319
2320 /* Put a modify RPC slot from the obd client @cli according
2321  * to the kind of operation @opc that has been sent.
2322  */
2323 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2324 {
2325         bool                    close_req = false;
2326
2327         if (tag == 0)
2328                 return;
2329
2330         if (opc == MDS_CLOSE)
2331                 close_req = true;
2332
2333         spin_lock(&cli->cl_mod_rpcs_lock);
2334         cli->cl_mod_rpcs_in_flight--;
2335         if (close_req)
2336                 cli->cl_close_rpcs_in_flight--;
2337         /* release the tag in the bitmap */
2338         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2339         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2340         spin_unlock(&cli->cl_mod_rpcs_lock);
2341         wake_up(&cli->cl_mod_rpcs_waitq);
2342 }
2343 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2344