Whamcloud - gitweb
LU-17038 tests: remove mlink utility
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
50
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
54
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58                               const char *status, int locks, int debug_level);
59
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
63
64 /*
65  * support functions: we could use inter-module communication, but this
66  * is more portable to other OS's
67  */
68 static struct obd_device *obd_device_alloc(void)
69 {
70         struct obd_device *obd;
71
72         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
73         if (obd != NULL) {
74                 obd->obd_magic = OBD_DEVICE_MAGIC;
75         }
76         return obd;
77 }
78
79 static void obd_device_free(struct obd_device *obd)
80 {
81         LASSERT(obd != NULL);
82         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84         if (obd->obd_namespace != NULL) {
85                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86                        obd, obd->obd_namespace, obd->obd_force);
87                 LBUG();
88         }
89         lu_ref_fini(&obd->obd_reference);
90         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 }
92
93 struct obd_type *class_search_type(const char *name)
94 {
95         struct kobject *kobj = kset_find_obj(lustre_kset, name);
96
97         if (kobj && kobj->ktype == &class_ktype)
98                 return container_of(kobj, struct obd_type, typ_kobj);
99
100         kobject_put(kobj);
101         return NULL;
102 }
103 EXPORT_SYMBOL(class_search_type);
104
105 struct obd_type *class_get_type(const char *name)
106 {
107         struct obd_type *type;
108
109         type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
111         if (!type) {
112                 const char *modname = name;
113
114 #ifdef HAVE_SERVER_SUPPORT
115                 if (strcmp(modname, "obdfilter") == 0)
116                         modname = "ofd";
117
118                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119                         modname = LUSTRE_OSP_NAME;
120
121                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122                         modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
124
125                 if (!request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 if (try_module_get(type->typ_dt_ops->o_owner)) {
136                         atomic_inc(&type->typ_refcnt);
137                         /* class_search_type() returned a counted reference,
138                          * but we don't need that count any more as
139                          * we have one through typ_refcnt.
140                          */
141                         kobject_put(&type->typ_kobj);
142                 } else {
143                         kobject_put(&type->typ_kobj);
144                         type = NULL;
145                 }
146         }
147         return type;
148 }
149 EXPORT_SYMBOL(class_get_type);
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         module_put(type->typ_dt_ops->o_owner);
155         atomic_dec(&type->typ_refcnt);
156 }
157 EXPORT_SYMBOL(class_put_type);
158
159 static void class_sysfs_release(struct kobject *kobj)
160 {
161         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
162
163         debugfs_remove_recursive(type->typ_debugfs_entry);
164         type->typ_debugfs_entry = NULL;
165
166         if (type->typ_lu)
167                 lu_device_type_fini(type->typ_lu);
168
169 #ifdef CONFIG_PROC_FS
170         if (type->typ_name && type->typ_procroot)
171                 remove_proc_subtree(type->typ_name, proc_lustre_root);
172 #endif
173         OBD_FREE(type, sizeof(*type));
174 }
175
176 static struct kobj_type class_ktype = {
177         .sysfs_ops      = &lustre_sysfs_ops,
178         .release        = class_sysfs_release,
179 };
180
181 #ifdef HAVE_SERVER_SUPPORT
182 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
183 {
184         struct dentry *symlink;
185         struct obd_type *type;
186         int rc;
187
188         type = class_search_type(name);
189         if (type) {
190                 kobject_put(&type->typ_kobj);
191                 return ERR_PTR(-EEXIST);
192         }
193
194         OBD_ALLOC(type, sizeof(*type));
195         if (!type)
196                 return ERR_PTR(-ENOMEM);
197
198         type->typ_kobj.kset = lustre_kset;
199         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
200                                   &lustre_kset->kobj, "%s", name);
201         if (rc)
202                 return ERR_PTR(rc);
203
204         symlink = debugfs_create_dir(name, debugfs_lustre_root);
205         type->typ_debugfs_entry = symlink;
206         type->typ_sym_filter = true;
207
208         if (enable_proc) {
209                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
210                                                       NULL, NULL);
211                 if (IS_ERR(type->typ_procroot)) {
212                         CERROR("%s: can't create compat proc entry: %d\n",
213                                name, (int)PTR_ERR(type->typ_procroot));
214                         type->typ_procroot = NULL;
215                 }
216         }
217
218         return type;
219 }
220 EXPORT_SYMBOL(class_add_symlinks);
221 #endif /* HAVE_SERVER_SUPPORT */
222
223 #define CLASS_MAX_NAME 1024
224
225 int class_register_type(const struct obd_ops *dt_ops,
226                         const struct md_ops *md_ops,
227                         bool enable_proc,
228                         const char *name, struct lu_device_type *ldt)
229 {
230         struct obd_type *type;
231         int rc;
232
233         ENTRY;
234         /* sanity check */
235         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
236
237         type = class_search_type(name);
238         if (type) {
239 #ifdef HAVE_SERVER_SUPPORT
240                 if (type->typ_sym_filter)
241                         goto dir_exist;
242 #endif /* HAVE_SERVER_SUPPORT */
243                 kobject_put(&type->typ_kobj);
244                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
245                 RETURN(-EEXIST);
246         }
247
248         OBD_ALLOC(type, sizeof(*type));
249         if (type == NULL)
250                 RETURN(-ENOMEM);
251
252         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
253         type->typ_kobj.kset = lustre_kset;
254         kobject_init(&type->typ_kobj, &class_ktype);
255 #ifdef HAVE_SERVER_SUPPORT
256 dir_exist:
257 #endif /* HAVE_SERVER_SUPPORT */
258
259         type->typ_dt_ops = dt_ops;
260         type->typ_md_ops = md_ops;
261
262 #ifdef HAVE_SERVER_SUPPORT
263         if (type->typ_sym_filter) {
264                 type->typ_sym_filter = false;
265                 kobject_put(&type->typ_kobj);
266                 goto setup_ldt;
267         }
268 #endif
269 #ifdef CONFIG_PROC_FS
270         if (enable_proc && !type->typ_procroot) {
271                 type->typ_procroot = lprocfs_register(name,
272                                                       proc_lustre_root,
273                                                       NULL, type);
274                 if (IS_ERR(type->typ_procroot)) {
275                         rc = PTR_ERR(type->typ_procroot);
276                         type->typ_procroot = NULL;
277                         GOTO(failed, rc);
278                 }
279         }
280 #endif
281         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
282
283         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
284         if (rc)
285                 GOTO(failed, rc);
286 #ifdef HAVE_SERVER_SUPPORT
287 setup_ldt:
288 #endif
289         if (ldt) {
290                 rc = lu_device_type_init(ldt);
291                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
292                 wake_up_var(&type->typ_lu);
293                 if (rc)
294                         GOTO(failed, rc);
295         }
296
297         RETURN(0);
298
299 failed:
300         kobject_put(&type->typ_kobj);
301
302         RETURN(rc);
303 }
304 EXPORT_SYMBOL(class_register_type);
305
306 int class_unregister_type(const char *name)
307 {
308         struct obd_type *type = class_search_type(name);
309         int rc = 0;
310         ENTRY;
311
312         if (!type) {
313                 CERROR("unknown obd type\n");
314                 RETURN(-EINVAL);
315         }
316
317         if (atomic_read(&type->typ_refcnt)) {
318                 CERROR("type %s has refcount (%d)\n", name,
319                        atomic_read(&type->typ_refcnt));
320                 /* This is a bad situation, let's make the best of it */
321                 /* Remove ops, but leave the name for debugging */
322                 type->typ_dt_ops = NULL;
323                 type->typ_md_ops = NULL;
324                 GOTO(out_put, rc = -EBUSY);
325         }
326
327         /* Put the final ref */
328         kobject_put(&type->typ_kobj);
329 out_put:
330         /* Put the ref returned by class_search_type() */
331         kobject_put(&type->typ_kobj);
332
333         RETURN(rc);
334 } /* class_unregister_type */
335 EXPORT_SYMBOL(class_unregister_type);
336
337 /**
338  * Create a new obd device.
339  *
340  * Allocate the new obd_device and initialize it.
341  *
342  * \param[in] type_name obd device type string.
343  * \param[in] name      obd device name.
344  * \param[in] uuid      obd device UUID
345  *
346  * \retval newdev         pointer to created obd_device
347  * \retval ERR_PTR(errno) on error
348  */
349 struct obd_device *class_newdev(const char *type_name, const char *name,
350                                 const char *uuid)
351 {
352         struct obd_device *newdev;
353         struct obd_type *type = NULL;
354         ENTRY;
355
356         if (strlen(name) >= MAX_OBD_NAME) {
357                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
358                 RETURN(ERR_PTR(-EINVAL));
359         }
360
361         type = class_get_type(type_name);
362         if (type == NULL){
363                 CERROR("OBD: unknown type: %s\n", type_name);
364                 RETURN(ERR_PTR(-ENODEV));
365         }
366
367         newdev = obd_device_alloc();
368         if (newdev == NULL) {
369                 class_put_type(type);
370                 RETURN(ERR_PTR(-ENOMEM));
371         }
372         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
373         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
374         newdev->obd_type = type;
375         newdev->obd_minor = -1;
376
377         rwlock_init(&newdev->obd_pool_lock);
378         newdev->obd_pool_limit = 0;
379         newdev->obd_pool_slv = 0;
380
381         INIT_LIST_HEAD(&newdev->obd_exports);
382         newdev->obd_num_exports = 0;
383         newdev->obd_grant_check_threshold = 100;
384         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
385         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
386         INIT_LIST_HEAD(&newdev->obd_exports_timed);
387         INIT_LIST_HEAD(&newdev->obd_nid_stats);
388         spin_lock_init(&newdev->obd_nid_lock);
389         spin_lock_init(&newdev->obd_dev_lock);
390         mutex_init(&newdev->obd_dev_mutex);
391         spin_lock_init(&newdev->obd_osfs_lock);
392         /* newdev->obd_osfs_age must be set to a value in the distant
393          * past to guarantee a fresh statfs is fetched on mount. */
394         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
395
396         /* XXX belongs in setup not attach  */
397         init_rwsem(&newdev->obd_observer_link_sem);
398         /* recovery data */
399         spin_lock_init(&newdev->obd_recovery_task_lock);
400         init_waitqueue_head(&newdev->obd_next_transno_waitq);
401         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
402         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
403         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
404         INIT_LIST_HEAD(&newdev->obd_evict_list);
405         INIT_LIST_HEAD(&newdev->obd_lwp_list);
406
407         llog_group_init(&newdev->obd_olg);
408         /* Detach drops this */
409         atomic_set(&newdev->obd_refcount, 1);
410         lu_ref_init(&newdev->obd_reference);
411         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
412
413         newdev->obd_conn_inprogress = 0;
414
415         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
416
417         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
418                newdev->obd_name, newdev);
419
420         return newdev;
421 }
422
423 /**
424  * Free obd device.
425  *
426  * \param[in] obd obd_device to be freed
427  *
428  * \retval none
429  */
430 void class_free_dev(struct obd_device *obd)
431 {
432         struct obd_type *obd_type = obd->obd_type;
433
434         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
435                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
436         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
437                  "obd %p != obd_devs[%d] %p\n",
438                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
439         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
440                  "obd_refcount should be 0, not %d\n",
441                  atomic_read(&obd->obd_refcount));
442         LASSERT(obd_type != NULL);
443
444         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
445                obd->obd_name, obd->obd_type->typ_name);
446
447         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
448                          obd->obd_name, obd->obd_uuid.uuid);
449         if (obd->obd_stopping) {
450                 int err;
451
452                 /* If we're not stopping, we were never set up */
453                 err = obd_cleanup(obd);
454                 if (err)
455                         CERROR("Cleanup %s returned %d\n",
456                                 obd->obd_name, err);
457         }
458
459         obd_device_free(obd);
460
461         class_put_type(obd_type);
462 }
463
464 /**
465  * Unregister obd device.
466  *
467  * Free slot in obd_dev[] used by \a obd.
468  *
469  * \param[in] new_obd obd_device to be unregistered
470  *
471  * \retval none
472  */
473 void class_unregister_device(struct obd_device *obd)
474 {
475         write_lock(&obd_dev_lock);
476         if (obd->obd_minor >= 0) {
477                 LASSERT(obd_devs[obd->obd_minor] == obd);
478                 obd_devs[obd->obd_minor] = NULL;
479                 obd->obd_minor = -1;
480         }
481         write_unlock(&obd_dev_lock);
482 }
483
484 /**
485  * Register obd device.
486  *
487  * Find free slot in obd_devs[], fills it with \a new_obd.
488  *
489  * \param[in] new_obd obd_device to be registered
490  *
491  * \retval 0          success
492  * \retval -EEXIST    device with this name is registered
493  * \retval -EOVERFLOW obd_devs[] is full
494  */
495 int class_register_device(struct obd_device *new_obd)
496 {
497         int ret = 0;
498         int i;
499         int new_obd_minor = 0;
500         bool minor_assign = false;
501         bool retried = false;
502
503 again:
504         write_lock(&obd_dev_lock);
505         for (i = 0; i < class_devno_max(); i++) {
506                 struct obd_device *obd = class_num2obd(i);
507
508                 if (obd != NULL &&
509                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
510
511                         if (!retried) {
512                                 write_unlock(&obd_dev_lock);
513
514                                 /* the obd_device could be waited to be
515                                  * destroyed by the "obd_zombie_impexp_thread".
516                                  */
517                                 obd_zombie_barrier();
518                                 retried = true;
519                                 goto again;
520                         }
521
522                         CERROR("%s: already exists, won't add\n",
523                                obd->obd_name);
524                         /* in case we found a free slot before duplicate */
525                         minor_assign = false;
526                         ret = -EEXIST;
527                         break;
528                 }
529                 if (!minor_assign && obd == NULL) {
530                         new_obd_minor = i;
531                         minor_assign = true;
532                 }
533         }
534
535         if (minor_assign) {
536                 new_obd->obd_minor = new_obd_minor;
537                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
538                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
539                 obd_devs[new_obd_minor] = new_obd;
540         } else {
541                 if (ret == 0) {
542                         ret = -EOVERFLOW;
543                         CERROR("%s: all %u/%u devices used, increase "
544                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
545                                i, class_devno_max(), ret);
546                 }
547         }
548         write_unlock(&obd_dev_lock);
549
550         RETURN(ret);
551 }
552
553 static int class_name2dev_nolock(const char *name)
554 {
555         int i;
556
557         if (!name)
558                 return -1;
559
560         for (i = 0; i < class_devno_max(); i++) {
561                 struct obd_device *obd = class_num2obd(i);
562
563                 if (obd && strcmp(name, obd->obd_name) == 0) {
564                         /* Make sure we finished attaching before we give
565                            out any references */
566                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
567                         if (obd->obd_attached) {
568                                 return i;
569                         }
570                         break;
571                 }
572         }
573
574         return -1;
575 }
576
577 int class_name2dev(const char *name)
578 {
579         int i;
580
581         if (!name)
582                 return -1;
583
584         read_lock(&obd_dev_lock);
585         i = class_name2dev_nolock(name);
586         read_unlock(&obd_dev_lock);
587
588         return i;
589 }
590 EXPORT_SYMBOL(class_name2dev);
591
592 struct obd_device *class_name2obd(const char *name)
593 {
594         int dev = class_name2dev(name);
595
596         if (dev < 0 || dev > class_devno_max())
597                 return NULL;
598         return class_num2obd(dev);
599 }
600 EXPORT_SYMBOL(class_name2obd);
601
602 static int class_uuid2dev_nolock(struct obd_uuid *uuid)
603 {
604         int i;
605
606         for (i = 0; i < class_devno_max(); i++) {
607                 struct obd_device *obd = class_num2obd(i);
608
609                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
610                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
611                         return i;
612                 }
613         }
614
615         return -1;
616 }
617
618 int class_uuid2dev(struct obd_uuid *uuid)
619 {
620         int i;
621
622         read_lock(&obd_dev_lock);
623         i = class_uuid2dev_nolock(uuid);
624         read_unlock(&obd_dev_lock);
625
626         return i;
627 }
628 EXPORT_SYMBOL(class_uuid2dev);
629
630 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
631 {
632         int dev = class_uuid2dev(uuid);
633         if (dev < 0)
634                 return NULL;
635         return class_num2obd(dev);
636 }
637 EXPORT_SYMBOL(class_uuid2obd);
638
639 /**
640  * Get obd device from ::obd_devs[]
641  *
642  * \param num [in] array index
643  *
644  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
645  *         otherwise return the obd device there.
646  */
647 struct obd_device *class_num2obd(int num)
648 {
649         struct obd_device *obd = NULL;
650
651         if (num < class_devno_max()) {
652                 obd = obd_devs[num];
653                 if (obd == NULL)
654                         return NULL;
655
656                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
657                          "%p obd_magic %08x != %08x\n",
658                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
659                 LASSERTF(obd->obd_minor == num,
660                          "%p obd_minor %0d != %0d\n",
661                          obd, obd->obd_minor, num);
662         }
663
664         return obd;
665 }
666 EXPORT_SYMBOL(class_num2obd);
667
668 /**
669  * Find obd in obd_dev[] by name or uuid.
670  *
671  * Increment obd's refcount if found.
672  *
673  * \param[in] str obd name or uuid
674  *
675  * \retval NULL    if not found
676  * \retval target  pointer to found obd_device
677  */
678 struct obd_device *class_dev_by_str(const char *str)
679 {
680         struct obd_device *target = NULL;
681         struct obd_uuid tgtuuid;
682         int rc;
683
684         obd_str2uuid(&tgtuuid, str);
685
686         read_lock(&obd_dev_lock);
687         rc = class_uuid2dev_nolock(&tgtuuid);
688         if (rc < 0)
689                 rc = class_name2dev_nolock(str);
690
691         if (rc >= 0)
692                 target = class_num2obd(rc);
693
694         if (target != NULL)
695                 class_incref(target, "find", current);
696         read_unlock(&obd_dev_lock);
697
698         RETURN(target);
699 }
700 EXPORT_SYMBOL(class_dev_by_str);
701
702 /**
703  * Get obd devices count. Device in any
704  *    state are counted
705  * \retval obd device count
706  */
707 int get_devices_count(void)
708 {
709         int index, max_index = class_devno_max(), dev_count = 0;
710
711         read_lock(&obd_dev_lock);
712         for (index = 0; index <= max_index; index++) {
713                 struct obd_device *obd = class_num2obd(index);
714                 if (obd != NULL)
715                         dev_count++;
716         }
717         read_unlock(&obd_dev_lock);
718
719         return dev_count;
720 }
721 EXPORT_SYMBOL(get_devices_count);
722
723 void class_obd_list(void)
724 {
725         char *status;
726         int i;
727
728         read_lock(&obd_dev_lock);
729         for (i = 0; i < class_devno_max(); i++) {
730                 struct obd_device *obd = class_num2obd(i);
731
732                 if (obd == NULL)
733                         continue;
734                 if (obd->obd_stopping)
735                         status = "ST";
736                 else if (obd->obd_set_up)
737                         status = "UP";
738                 else if (obd->obd_attached)
739                         status = "AT";
740                 else
741                         status = "--";
742                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
743                          i, status, obd->obd_type->typ_name,
744                          obd->obd_name, obd->obd_uuid.uuid,
745                          atomic_read(&obd->obd_refcount));
746         }
747         read_unlock(&obd_dev_lock);
748 }
749
750 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
751  * specified, then only the client with that uuid is returned,
752  * otherwise any client connected to the tgt is returned.
753  */
754 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
755                                          const char *type_name,
756                                          struct obd_uuid *grp_uuid)
757 {
758         int i;
759
760         read_lock(&obd_dev_lock);
761         for (i = 0; i < class_devno_max(); i++) {
762                 struct obd_device *obd = class_num2obd(i);
763
764                 if (obd == NULL)
765                         continue;
766                 if ((strncmp(obd->obd_type->typ_name, type_name,
767                              strlen(type_name)) == 0)) {
768                         if (obd_uuid_equals(tgt_uuid,
769                                             &obd->u.cli.cl_target_uuid) &&
770                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
771                                                          &obd->obd_uuid) : 1)) {
772                                 read_unlock(&obd_dev_lock);
773                                 return obd;
774                         }
775                 }
776         }
777         read_unlock(&obd_dev_lock);
778
779         return NULL;
780 }
781 EXPORT_SYMBOL(class_find_client_obd);
782
783 /* Iterate the obd_device list looking devices have grp_uuid. Start
784  * searching at *next, and if a device is found, the next index to look
785  * at is saved in *next. If next is NULL, then the first matching device
786  * will always be returned.
787  */
788 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
789 {
790         int i;
791
792         if (next == NULL)
793                 i = 0;
794         else if (*next >= 0 && *next < class_devno_max())
795                 i = *next;
796         else
797                 return NULL;
798
799         read_lock(&obd_dev_lock);
800         for (; i < class_devno_max(); i++) {
801                 struct obd_device *obd = class_num2obd(i);
802
803                 if (obd == NULL)
804                         continue;
805                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
806                         if (next != NULL)
807                                 *next = i+1;
808                         read_unlock(&obd_dev_lock);
809                         return obd;
810                 }
811         }
812         read_unlock(&obd_dev_lock);
813
814         return NULL;
815 }
816 EXPORT_SYMBOL(class_devices_in_group);
817
818 /**
819  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
820  * adjust sptlrpc settings accordingly.
821  */
822 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
823 {
824         struct obd_device  *obd;
825         const char         *type;
826         int                 i, rc = 0, rc2;
827
828         LASSERT(namelen > 0);
829
830         read_lock(&obd_dev_lock);
831         for (i = 0; i < class_devno_max(); i++) {
832                 obd = class_num2obd(i);
833
834                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
835                         continue;
836
837                 /* only notify mdc, osc, osp, lwp, mdt, ost
838                  * because only these have a -sptlrpc llog */
839                 type = obd->obd_type->typ_name;
840                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
841                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
842                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
843                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
844                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
845                     strcmp(type, LUSTRE_OST_NAME) != 0)
846                         continue;
847
848                 if (strncmp(obd->obd_name, fsname, namelen))
849                         continue;
850
851                 class_incref(obd, __FUNCTION__, obd);
852                 read_unlock(&obd_dev_lock);
853                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
854                                          sizeof(KEY_SPTLRPC_CONF),
855                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
856                 rc = rc ? rc : rc2;
857                 class_decref(obd, __FUNCTION__, obd);
858                 read_lock(&obd_dev_lock);
859         }
860         read_unlock(&obd_dev_lock);
861         return rc;
862 }
863 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
864
865 void obd_cleanup_caches(void)
866 {
867         ENTRY;
868         if (obd_device_cachep) {
869                 kmem_cache_destroy(obd_device_cachep);
870                 obd_device_cachep = NULL;
871         }
872
873         EXIT;
874 }
875
876 int obd_init_caches(void)
877 {
878         int rc;
879         ENTRY;
880
881         LASSERT(obd_device_cachep == NULL);
882         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
883                                 sizeof(struct obd_device),
884                                 0, 0, 0, sizeof(struct obd_device), NULL);
885         if (!obd_device_cachep)
886                 GOTO(out, rc = -ENOMEM);
887
888         RETURN(0);
889 out:
890         obd_cleanup_caches();
891         RETURN(rc);
892 }
893
894 static const char export_handle_owner[] = "export";
895
896 /* map connection to client */
897 struct obd_export *class_conn2export(struct lustre_handle *conn)
898 {
899         struct obd_export *export;
900         ENTRY;
901
902         if (!conn) {
903                 CDEBUG(D_CACHE, "looking for null handle\n");
904                 RETURN(NULL);
905         }
906
907         if (conn->cookie == -1) {  /* this means assign a new connection */
908                 CDEBUG(D_CACHE, "want a new connection\n");
909                 RETURN(NULL);
910         }
911
912         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
913         export = class_handle2object(conn->cookie, export_handle_owner);
914         RETURN(export);
915 }
916 EXPORT_SYMBOL(class_conn2export);
917
918 struct obd_device *class_exp2obd(struct obd_export *exp)
919 {
920         if (exp)
921                 return exp->exp_obd;
922         return NULL;
923 }
924 EXPORT_SYMBOL(class_exp2obd);
925
926 struct obd_import *class_exp2cliimp(struct obd_export *exp)
927 {
928         struct obd_device *obd = exp->exp_obd;
929         if (obd == NULL)
930                 return NULL;
931         return obd->u.cli.cl_import;
932 }
933 EXPORT_SYMBOL(class_exp2cliimp);
934
935 /* Export management functions */
936 static void class_export_destroy(struct obd_export *exp)
937 {
938         struct obd_device *obd = exp->exp_obd;
939         ENTRY;
940
941         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
942         LASSERT(obd != NULL);
943
944         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
945                exp->exp_client_uuid.uuid, obd->obd_name);
946
947         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
948         ptlrpc_connection_put(exp->exp_connection);
949
950         LASSERT(list_empty(&exp->exp_outstanding_replies));
951         LASSERT(list_empty(&exp->exp_uncommitted_replies));
952         LASSERT(list_empty(&exp->exp_req_replay_queue));
953         LASSERT(list_empty(&exp->exp_hp_rpcs));
954         obd_destroy_export(exp);
955         /* self export doesn't hold a reference to an obd, although it
956          * exists until freeing of the obd */
957         if (exp != obd->obd_self_export)
958                 class_decref(obd, "export", exp);
959
960         OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
961         kfree_rcu(exp, exp_handle.h_rcu);
962         EXIT;
963 }
964
965 struct obd_export *class_export_get(struct obd_export *exp)
966 {
967         refcount_inc(&exp->exp_handle.h_ref);
968         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
969                refcount_read(&exp->exp_handle.h_ref));
970         return exp;
971 }
972 EXPORT_SYMBOL(class_export_get);
973
974 void class_export_put(struct obd_export *exp)
975 {
976         LASSERT(exp != NULL);
977         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
978         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
979         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
980                refcount_read(&exp->exp_handle.h_ref) - 1);
981
982         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
983                 struct obd_device *obd = exp->exp_obd;
984
985                 CDEBUG(D_IOCTL, "final put %p/%s\n",
986                        exp, exp->exp_client_uuid.uuid);
987
988                 /* release nid stat refererence */
989                 lprocfs_exp_cleanup(exp);
990
991                 if (exp == obd->obd_self_export) {
992                         /* self export should be destroyed without
993                          * zombie thread as it doesn't hold a
994                          * reference to obd and doesn't hold any
995                          * resources */
996                         class_export_destroy(exp);
997                         /* self export is destroyed, no class
998                          * references exist and it is safe to free
999                          * obd */
1000                         class_free_dev(obd);
1001                 } else {
1002                         LASSERT(!list_empty(&exp->exp_obd_chain));
1003                         obd_zombie_export_add(exp);
1004                 }
1005
1006         }
1007 }
1008 EXPORT_SYMBOL(class_export_put);
1009
1010 static void obd_zombie_exp_cull(struct work_struct *ws)
1011 {
1012         struct obd_export *export;
1013
1014         export = container_of(ws, struct obd_export, exp_zombie_work);
1015         class_export_destroy(export);
1016         LASSERT(atomic_read(&obd_stale_export_num) > 0);
1017         if (atomic_dec_and_test(&obd_stale_export_num))
1018                 wake_up_var(&obd_stale_export_num);
1019 }
1020
1021 /* Creates a new export, adds it to the hash table, and returns a
1022  * pointer to it. The refcount is 2: one for the hash reference, and
1023  * one for the pointer returned by this function. */
1024 static struct obd_export *__class_new_export(struct obd_device *obd,
1025                                              struct obd_uuid *cluuid,
1026                                              bool is_self)
1027 {
1028         struct obd_export *export;
1029         int rc = 0;
1030         ENTRY;
1031
1032         OBD_ALLOC_PTR(export);
1033         if (!export)
1034                 return ERR_PTR(-ENOMEM);
1035
1036         export->exp_conn_cnt = 0;
1037         export->exp_lock_hash = NULL;
1038         export->exp_flock_hash = NULL;
1039         /* 2 = class_handle_hash + last */
1040         refcount_set(&export->exp_handle.h_ref, 2);
1041         atomic_set(&export->exp_rpc_count, 0);
1042         atomic_set(&export->exp_cb_count, 0);
1043         atomic_set(&export->exp_locks_count, 0);
1044 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1045         INIT_LIST_HEAD(&export->exp_locks_list);
1046         spin_lock_init(&export->exp_locks_list_guard);
1047 #endif
1048         atomic_set(&export->exp_replay_count, 0);
1049         export->exp_obd = obd;
1050         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1051         spin_lock_init(&export->exp_uncommitted_replies_lock);
1052         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1053         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1054         INIT_HLIST_NODE(&export->exp_handle.h_link);
1055         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1056         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1057         class_handle_hash(&export->exp_handle, export_handle_owner);
1058         export->exp_last_request_time = ktime_get_real_seconds();
1059         spin_lock_init(&export->exp_lock);
1060         spin_lock_init(&export->exp_rpc_lock);
1061         INIT_HLIST_NODE(&export->exp_gen_hash);
1062         spin_lock_init(&export->exp_bl_list_lock);
1063         INIT_LIST_HEAD(&export->exp_bl_list);
1064         INIT_LIST_HEAD(&export->exp_stale_list);
1065         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1066
1067         export->exp_sp_peer = LUSTRE_SP_ANY;
1068         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1069         export->exp_client_uuid = *cluuid;
1070         obd_init_export(export);
1071
1072         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1073         export->exp_root_fid.f_seq = 0;
1074         export->exp_root_fid.f_oid = 0;
1075         export->exp_root_fid.f_ver = 0;
1076
1077         spin_lock(&obd->obd_dev_lock);
1078         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1079                 /* shouldn't happen, but might race */
1080                 if (obd->obd_stopping)
1081                         GOTO(exit_unlock, rc = -ENODEV);
1082
1083                 rc = obd_uuid_add(obd, export);
1084                 if (rc != 0) {
1085                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1086                                       obd->obd_name, cluuid->uuid, rc);
1087                         GOTO(exit_unlock, rc = -EALREADY);
1088                 }
1089         }
1090
1091         if (!is_self) {
1092                 class_incref(obd, "export", export);
1093                 list_add_tail(&export->exp_obd_chain_timed,
1094                               &obd->obd_exports_timed);
1095                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1096                 obd->obd_num_exports++;
1097         } else {
1098                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1099                 INIT_LIST_HEAD(&export->exp_obd_chain);
1100         }
1101         spin_unlock(&obd->obd_dev_lock);
1102         RETURN(export);
1103
1104 exit_unlock:
1105         spin_unlock(&obd->obd_dev_lock);
1106         class_handle_unhash(&export->exp_handle);
1107         obd_destroy_export(export);
1108         OBD_FREE_PTR(export);
1109         return ERR_PTR(rc);
1110 }
1111
1112 struct obd_export *class_new_export(struct obd_device *obd,
1113                                     struct obd_uuid *uuid)
1114 {
1115         return __class_new_export(obd, uuid, false);
1116 }
1117 EXPORT_SYMBOL(class_new_export);
1118
1119 struct obd_export *class_new_export_self(struct obd_device *obd,
1120                                          struct obd_uuid *uuid)
1121 {
1122         return __class_new_export(obd, uuid, true);
1123 }
1124
1125 void class_unlink_export(struct obd_export *exp)
1126 {
1127         class_handle_unhash(&exp->exp_handle);
1128
1129         if (exp->exp_obd->obd_self_export == exp) {
1130                 class_export_put(exp);
1131                 return;
1132         }
1133
1134         spin_lock(&exp->exp_obd->obd_dev_lock);
1135         /* delete an uuid-export hashitem from hashtables */
1136         if (exp != exp->exp_obd->obd_self_export)
1137                 obd_uuid_del(exp->exp_obd, exp);
1138
1139 #ifdef HAVE_SERVER_SUPPORT
1140         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1141                 struct tg_export_data   *ted = &exp->exp_target_data;
1142                 struct cfs_hash         *hash;
1143
1144                 /* Because obd_gen_hash will not be released until
1145                  * class_cleanup(), so hash should never be NULL here */
1146                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1147                 LASSERT(hash != NULL);
1148                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1149                              &exp->exp_gen_hash);
1150                 cfs_hash_putref(hash);
1151         }
1152 #endif /* HAVE_SERVER_SUPPORT */
1153
1154         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1155         list_del_init(&exp->exp_obd_chain_timed);
1156         exp->exp_obd->obd_num_exports--;
1157         spin_unlock(&exp->exp_obd->obd_dev_lock);
1158
1159         /* A reference is kept by obd_stale_exports list */
1160         obd_stale_export_put(exp);
1161 }
1162 EXPORT_SYMBOL(class_unlink_export);
1163
1164 /* Import management functions */
1165 static void obd_zombie_import_free(struct obd_import *imp)
1166 {
1167         struct obd_import_conn *imp_conn;
1168
1169         ENTRY;
1170         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1171                imp->imp_obd->obd_name);
1172
1173         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1174
1175         ptlrpc_connection_put(imp->imp_connection);
1176
1177         while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1178                                                     struct obd_import_conn,
1179                                                     oic_item)) != NULL) {
1180                 list_del_init(&imp_conn->oic_item);
1181                 ptlrpc_connection_put(imp_conn->oic_conn);
1182                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1183         }
1184
1185         LASSERT(imp->imp_sec == NULL);
1186         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1187                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1188         class_decref(imp->imp_obd, "import", imp);
1189         OBD_FREE_PTR(imp);
1190         EXIT;
1191 }
1192
1193 struct obd_import *class_import_get(struct obd_import *import)
1194 {
1195         refcount_inc(&import->imp_refcount);
1196         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1197                refcount_read(&import->imp_refcount),
1198                import->imp_obd->obd_name);
1199         return import;
1200 }
1201 EXPORT_SYMBOL(class_import_get);
1202
1203 void class_import_put(struct obd_import *imp)
1204 {
1205         ENTRY;
1206
1207         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1208
1209         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1210                refcount_read(&imp->imp_refcount) - 1,
1211                imp->imp_obd->obd_name);
1212
1213         if (refcount_dec_and_test(&imp->imp_refcount)) {
1214                 CDEBUG(D_INFO, "final put import %p\n", imp);
1215                 obd_zombie_import_add(imp);
1216         }
1217
1218         EXIT;
1219 }
1220 EXPORT_SYMBOL(class_import_put);
1221
1222 static void init_imp_at(struct imp_at *at) {
1223         int i;
1224         at_init(&at->iat_net_latency, 0, 0);
1225         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1226                 /* max service estimates are tracked on the server side, so
1227                    don't use the AT history here, just use the last reported
1228                    val. (But keep hist for proc histogram, worst_ever) */
1229                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1230                         AT_FLG_NOHIST);
1231         }
1232 }
1233
1234 static void obd_zombie_imp_cull(struct work_struct *ws)
1235 {
1236         struct obd_import *import;
1237
1238         import = container_of(ws, struct obd_import, imp_zombie_work);
1239         obd_zombie_import_free(import);
1240 }
1241
1242 struct obd_import *class_new_import(struct obd_device *obd)
1243 {
1244         struct obd_import *imp;
1245         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1246
1247         OBD_ALLOC(imp, sizeof(*imp));
1248         if (imp == NULL)
1249                 return NULL;
1250
1251         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1252         INIT_LIST_HEAD(&imp->imp_replay_list);
1253         INIT_LIST_HEAD(&imp->imp_sending_list);
1254         INIT_LIST_HEAD(&imp->imp_delayed_list);
1255         INIT_LIST_HEAD(&imp->imp_committed_list);
1256         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1257         imp->imp_known_replied_xid = 0;
1258         imp->imp_replay_cursor = &imp->imp_committed_list;
1259         spin_lock_init(&imp->imp_lock);
1260         imp->imp_last_success_conn = 0;
1261         imp->imp_state = LUSTRE_IMP_NEW;
1262         imp->imp_obd = class_incref(obd, "import", imp);
1263         rwlock_init(&imp->imp_sec_lock);
1264         init_waitqueue_head(&imp->imp_recovery_waitq);
1265         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1266
1267         if (curr_pid_ns && curr_pid_ns->child_reaper)
1268                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1269         else
1270                 imp->imp_sec_refpid = 1;
1271
1272         refcount_set(&imp->imp_refcount, 2);
1273         atomic_set(&imp->imp_unregistering, 0);
1274         atomic_set(&imp->imp_reqs, 0);
1275         atomic_set(&imp->imp_inflight, 0);
1276         atomic_set(&imp->imp_replay_inflight, 0);
1277         init_waitqueue_head(&imp->imp_replay_waitq);
1278         atomic_set(&imp->imp_inval_count, 0);
1279         atomic_set(&imp->imp_waiting, 0);
1280         INIT_LIST_HEAD(&imp->imp_conn_list);
1281         init_imp_at(&imp->imp_at);
1282
1283         /* the default magic is V2, will be used in connect RPC, and
1284          * then adjusted according to the flags in request/reply. */
1285         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1286
1287         return imp;
1288 }
1289 EXPORT_SYMBOL(class_new_import);
1290
1291 void class_destroy_import(struct obd_import *import)
1292 {
1293         LASSERT(import != NULL);
1294         LASSERT(import != LP_POISON);
1295
1296         spin_lock(&import->imp_lock);
1297         import->imp_generation++;
1298         spin_unlock(&import->imp_lock);
1299         class_import_put(import);
1300 }
1301 EXPORT_SYMBOL(class_destroy_import);
1302
1303 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1304
1305 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1306 {
1307         spin_lock(&exp->exp_locks_list_guard);
1308
1309         LASSERT(lock->l_exp_refs_nr >= 0);
1310
1311         if (lock->l_exp_refs_target != NULL &&
1312             lock->l_exp_refs_target != exp) {
1313                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1314                               exp, lock, lock->l_exp_refs_target);
1315         }
1316         if ((lock->l_exp_refs_nr ++) == 0) {
1317                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1318                 lock->l_exp_refs_target = exp;
1319         }
1320         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1321                lock, exp, lock->l_exp_refs_nr);
1322         spin_unlock(&exp->exp_locks_list_guard);
1323 }
1324 EXPORT_SYMBOL(__class_export_add_lock_ref);
1325
1326 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1327 {
1328         spin_lock(&exp->exp_locks_list_guard);
1329         LASSERT(lock->l_exp_refs_nr > 0);
1330         if (lock->l_exp_refs_target != exp) {
1331                 LCONSOLE_WARN("lock %p, "
1332                               "mismatching export pointers: %p, %p\n",
1333                               lock, lock->l_exp_refs_target, exp);
1334         }
1335         if (-- lock->l_exp_refs_nr == 0) {
1336                 list_del_init(&lock->l_exp_refs_link);
1337                 lock->l_exp_refs_target = NULL;
1338         }
1339         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1340                lock, exp, lock->l_exp_refs_nr);
1341         spin_unlock(&exp->exp_locks_list_guard);
1342 }
1343 EXPORT_SYMBOL(__class_export_del_lock_ref);
1344 #endif
1345
1346 /* A connection defines an export context in which preallocation can
1347    be managed. This releases the export pointer reference, and returns
1348    the export handle, so the export refcount is 1 when this function
1349    returns. */
1350 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1351                   struct obd_uuid *cluuid)
1352 {
1353         struct obd_export *export;
1354         LASSERT(conn != NULL);
1355         LASSERT(obd != NULL);
1356         LASSERT(cluuid != NULL);
1357         ENTRY;
1358
1359         export = class_new_export(obd, cluuid);
1360         if (IS_ERR(export))
1361                 RETURN(PTR_ERR(export));
1362
1363         conn->cookie = export->exp_handle.h_cookie;
1364         class_export_put(export);
1365
1366         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1367                cluuid->uuid, conn->cookie);
1368         RETURN(0);
1369 }
1370 EXPORT_SYMBOL(class_connect);
1371
1372 /* if export is involved in recovery then clean up related things */
1373 static void class_export_recovery_cleanup(struct obd_export *exp)
1374 {
1375         struct obd_device *obd = exp->exp_obd;
1376
1377         spin_lock(&obd->obd_recovery_task_lock);
1378         if (obd->obd_recovering) {
1379                 if (exp->exp_in_recovery) {
1380                         spin_lock(&exp->exp_lock);
1381                         exp->exp_in_recovery = 0;
1382                         spin_unlock(&exp->exp_lock);
1383                         LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
1384                         atomic_dec(&obd->obd_connected_clients);
1385                 }
1386
1387                 /* if called during recovery then should update
1388                  * obd_stale_clients counter,
1389                  * lightweight exports are not counted */
1390                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1391                         exp->exp_obd->obd_stale_clients++;
1392         }
1393         spin_unlock(&obd->obd_recovery_task_lock);
1394
1395         spin_lock(&exp->exp_lock);
1396         /** Cleanup req replay fields */
1397         if (exp->exp_req_replay_needed) {
1398                 exp->exp_req_replay_needed = 0;
1399
1400                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1401                 atomic_dec(&obd->obd_req_replay_clients);
1402         }
1403
1404         /** Cleanup lock replay data */
1405         if (exp->exp_lock_replay_needed) {
1406                 exp->exp_lock_replay_needed = 0;
1407
1408                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1409                 atomic_dec(&obd->obd_lock_replay_clients);
1410         }
1411         spin_unlock(&exp->exp_lock);
1412 }
1413
1414 /* This function removes 1-3 references from the export:
1415  * 1 - for export pointer passed
1416  * and if disconnect really need
1417  * 2 - removing from hash
1418  * 3 - in client_unlink_export
1419  * The export pointer passed to this function can destroyed */
1420 int class_disconnect(struct obd_export *export)
1421 {
1422         int already_disconnected;
1423         ENTRY;
1424
1425         if (export == NULL) {
1426                 CWARN("attempting to free NULL export %p\n", export);
1427                 RETURN(-EINVAL);
1428         }
1429
1430         spin_lock(&export->exp_lock);
1431         already_disconnected = export->exp_disconnected;
1432         export->exp_disconnected = 1;
1433 #ifdef HAVE_SERVER_SUPPORT
1434         /*  We hold references of export for uuid hash
1435          *  and nid_hash and export link at least. So
1436          *  it is safe to call rh*table_remove_fast in
1437          *  there.
1438          */
1439         obd_nid_del(export->exp_obd, export);
1440 #endif /* HAVE_SERVER_SUPPORT */
1441         spin_unlock(&export->exp_lock);
1442
1443         /* class_cleanup(), abort_recovery(), and class_fail_export()
1444          * all end up in here, and if any of them race we shouldn't
1445          * call extra class_export_puts(). */
1446         if (already_disconnected)
1447                 GOTO(no_disconn, already_disconnected);
1448
1449         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1450                export->exp_handle.h_cookie);
1451
1452         class_export_recovery_cleanup(export);
1453         class_unlink_export(export);
1454 no_disconn:
1455         class_export_put(export);
1456         RETURN(0);
1457 }
1458 EXPORT_SYMBOL(class_disconnect);
1459
1460 /* Return non-zero for a fully connected export */
1461 int class_connected_export(struct obd_export *exp)
1462 {
1463         int connected = 0;
1464
1465         if (exp) {
1466                 spin_lock(&exp->exp_lock);
1467                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1468                 spin_unlock(&exp->exp_lock);
1469         }
1470         return connected;
1471 }
1472 EXPORT_SYMBOL(class_connected_export);
1473
1474 static void class_disconnect_export_list(struct list_head *list,
1475                                          enum obd_option flags)
1476 {
1477         int rc;
1478         struct obd_export *exp;
1479         ENTRY;
1480
1481         /* It's possible that an export may disconnect itself, but
1482          * nothing else will be added to this list.
1483          */
1484         while ((exp = list_first_entry_or_null(list, struct obd_export,
1485                                                exp_obd_chain)) != NULL) {
1486                 /* need for safe call CDEBUG after obd_disconnect */
1487                 class_export_get(exp);
1488
1489                 spin_lock(&exp->exp_lock);
1490                 exp->exp_flags = flags;
1491                 spin_unlock(&exp->exp_lock);
1492
1493                 if (obd_uuid_equals(&exp->exp_client_uuid,
1494                                     &exp->exp_obd->obd_uuid)) {
1495                         CDEBUG(D_HA,
1496                                "exp %p export uuid == obd uuid, don't discon\n",
1497                                exp);
1498                         /* Need to delete this now so we don't end up pointing
1499                          * to work_list later when this export is cleaned up. */
1500                         list_del_init(&exp->exp_obd_chain);
1501                         class_export_put(exp);
1502                         continue;
1503                 }
1504
1505                 class_export_get(exp);
1506                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1507                        "last request at %lld\n",
1508                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1509                        exp, exp->exp_last_request_time);
1510                 /* release one export reference anyway */
1511                 rc = obd_disconnect(exp);
1512
1513                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1514                        obd_export_nid2str(exp), exp, rc);
1515                 class_export_put(exp);
1516         }
1517         EXIT;
1518 }
1519
1520 void class_disconnect_exports(struct obd_device *obd)
1521 {
1522         LIST_HEAD(work_list);
1523         ENTRY;
1524
1525         /* Move all of the exports from obd_exports to a work list, en masse. */
1526         spin_lock(&obd->obd_dev_lock);
1527         list_splice_init(&obd->obd_exports, &work_list);
1528         list_splice_init(&obd->obd_delayed_exports, &work_list);
1529         spin_unlock(&obd->obd_dev_lock);
1530
1531         if (!list_empty(&work_list)) {
1532                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1533                        "disconnecting them\n", obd->obd_minor, obd);
1534                 class_disconnect_export_list(&work_list,
1535                                              exp_flags_from_obd(obd));
1536         } else
1537                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1538                        obd->obd_minor, obd);
1539         EXIT;
1540 }
1541 EXPORT_SYMBOL(class_disconnect_exports);
1542
1543 /* Remove exports that have not completed recovery.
1544  */
1545 void class_disconnect_stale_exports(struct obd_device *obd,
1546                                     int (*test_export)(struct obd_export *))
1547 {
1548         LIST_HEAD(work_list);
1549         struct obd_export *exp, *n;
1550         int evicted = 0;
1551         ENTRY;
1552
1553         spin_lock(&obd->obd_dev_lock);
1554         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1555                                  exp_obd_chain) {
1556                 /* don't count self-export as client */
1557                 if (obd_uuid_equals(&exp->exp_client_uuid,
1558                                     &exp->exp_obd->obd_uuid))
1559                         continue;
1560
1561                 /* don't evict clients which have no slot in last_rcvd
1562                  * (e.g. lightweight connection) */
1563                 if (exp->exp_target_data.ted_lr_idx == -1)
1564                         continue;
1565
1566                 spin_lock(&exp->exp_lock);
1567                 if (exp->exp_failed || test_export(exp)) {
1568                         spin_unlock(&exp->exp_lock);
1569                         continue;
1570                 }
1571                 exp->exp_failed = 1;
1572                 atomic_inc(&exp->exp_obd->obd_eviction_count);
1573                 spin_unlock(&exp->exp_lock);
1574
1575                 list_move(&exp->exp_obd_chain, &work_list);
1576                 evicted++;
1577                 CWARN("%s: disconnect stale client %s@%s\n",
1578                       obd->obd_name, exp->exp_client_uuid.uuid,
1579                       obd_export_nid2str(exp));
1580                 print_export_data(exp, "EVICTING", 0, D_HA);
1581         }
1582         spin_unlock(&obd->obd_dev_lock);
1583
1584         if (evicted)
1585                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1586                               obd->obd_name, evicted);
1587
1588         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1589                                                  OBD_OPT_ABORT_RECOV);
1590         EXIT;
1591 }
1592 EXPORT_SYMBOL(class_disconnect_stale_exports);
1593
1594 void class_fail_export(struct obd_export *exp)
1595 {
1596         int rc, already_failed;
1597
1598         spin_lock(&exp->exp_lock);
1599         already_failed = exp->exp_failed;
1600         exp->exp_failed = 1;
1601         spin_unlock(&exp->exp_lock);
1602
1603         if (already_failed) {
1604                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1605                        exp, exp->exp_client_uuid.uuid);
1606                 return;
1607         }
1608
1609         atomic_inc(&exp->exp_obd->obd_eviction_count);
1610
1611         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1612                exp, exp->exp_client_uuid.uuid);
1613
1614         if (obd_dump_on_timeout)
1615                 libcfs_debug_dumplog();
1616
1617         /* need for safe call CDEBUG after obd_disconnect */
1618         class_export_get(exp);
1619
1620         /* Most callers into obd_disconnect are removing their own reference
1621          * (request, for example) in addition to the one from the hash table.
1622          * We don't have such a reference here, so make one. */
1623         class_export_get(exp);
1624         rc = obd_disconnect(exp);
1625         if (rc)
1626                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1627         else
1628                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1629                        exp, exp->exp_client_uuid.uuid);
1630         class_export_put(exp);
1631 }
1632 EXPORT_SYMBOL(class_fail_export);
1633
1634 #ifdef HAVE_SERVER_SUPPORT
1635
1636 static int take_first(struct obd_export *exp, void *data)
1637 {
1638         struct obd_export **expp = data;
1639
1640         if (*expp)
1641                 /* already have one */
1642                 return 0;
1643         if (exp->exp_failed)
1644                 /* Don't want this one */
1645                 return 0;
1646         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1647                 /* Cannot get a ref on this one */
1648                 return 0;
1649         *expp = exp;
1650         return 1;
1651 }
1652
1653 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1654 {
1655         struct lnet_nid nid_key;
1656         struct obd_export *doomed_exp;
1657         int exports_evicted = 0;
1658
1659         libcfs_strnid(&nid_key, nid);
1660
1661         spin_lock(&obd->obd_dev_lock);
1662         /* umount has run already, so evict thread should leave
1663          * its task to umount thread now */
1664         if (obd->obd_stopping) {
1665                 spin_unlock(&obd->obd_dev_lock);
1666                 return exports_evicted;
1667         }
1668         spin_unlock(&obd->obd_dev_lock);
1669
1670         doomed_exp = NULL;
1671         while (obd_nid_export_for_each(obd, &nid_key,
1672                                        take_first, &doomed_exp) > 0) {
1673
1674                 LASSERTF(doomed_exp != obd->obd_self_export,
1675                          "self-export is hashed by NID?\n");
1676
1677                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1678                               obd->obd_name,
1679                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1680                               obd_export_nid2str(doomed_exp));
1681
1682                 class_fail_export(doomed_exp);
1683                 class_export_put(doomed_exp);
1684                 exports_evicted++;
1685                 doomed_exp = NULL;
1686         }
1687
1688         if (!exports_evicted)
1689                 CDEBUG(D_HA,
1690                        "%s: can't disconnect NID '%s': no exports found\n",
1691                        obd->obd_name, nid);
1692         return exports_evicted;
1693 }
1694 EXPORT_SYMBOL(obd_export_evict_by_nid);
1695
1696 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1697 {
1698         struct obd_export *doomed_exp = NULL;
1699         struct obd_uuid doomed_uuid;
1700         int exports_evicted = 0;
1701
1702         spin_lock(&obd->obd_dev_lock);
1703         if (obd->obd_stopping) {
1704                 spin_unlock(&obd->obd_dev_lock);
1705                 return exports_evicted;
1706         }
1707         spin_unlock(&obd->obd_dev_lock);
1708
1709         obd_str2uuid(&doomed_uuid, uuid);
1710         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1711                 CERROR("%s: can't evict myself\n", obd->obd_name);
1712                 return exports_evicted;
1713         }
1714
1715         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1716         if (doomed_exp == NULL) {
1717                 CERROR("%s: can't disconnect %s: no exports found\n",
1718                        obd->obd_name, uuid);
1719         } else {
1720                 CWARN("%s: evicting %s at adminstrative request\n",
1721                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1722                 class_fail_export(doomed_exp);
1723                 class_export_put(doomed_exp);
1724                 obd_uuid_del(obd, doomed_exp);
1725                 exports_evicted++;
1726         }
1727
1728         return exports_evicted;
1729 }
1730 #endif /* HAVE_SERVER_SUPPORT */
1731
1732 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1733 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1734 EXPORT_SYMBOL(class_export_dump_hook);
1735 #endif
1736
1737 static void print_export_data(struct obd_export *exp, const char *status,
1738                               int locks, int debug_level)
1739 {
1740         struct ptlrpc_reply_state *rs;
1741         struct ptlrpc_reply_state *first_reply = NULL;
1742         int nreplies = 0;
1743
1744         spin_lock(&exp->exp_lock);
1745         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1746                             rs_exp_list) {
1747                 if (nreplies == 0)
1748                         first_reply = rs;
1749                 nreplies++;
1750         }
1751         spin_unlock(&exp->exp_lock);
1752
1753         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1754                "%p %s %llu stale:%d\n",
1755                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1756                obd_export_nid2str(exp),
1757                refcount_read(&exp->exp_handle.h_ref),
1758                atomic_read(&exp->exp_rpc_count),
1759                atomic_read(&exp->exp_cb_count),
1760                atomic_read(&exp->exp_locks_count),
1761                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1762                nreplies, first_reply, nreplies > 3 ? "..." : "",
1763                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1764 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1765         if (locks && class_export_dump_hook != NULL)
1766                 class_export_dump_hook(exp);
1767 #endif
1768 }
1769
1770 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1771 {
1772         struct obd_export *exp;
1773
1774         spin_lock(&obd->obd_dev_lock);
1775         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1776                 print_export_data(exp, "ACTIVE", locks, debug_level);
1777         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1778                 print_export_data(exp, "UNLINKED", locks, debug_level);
1779         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1780                 print_export_data(exp, "DELAYED", locks, debug_level);
1781         spin_unlock(&obd->obd_dev_lock);
1782 }
1783
1784 void obd_exports_barrier(struct obd_device *obd)
1785 {
1786         int waited = 2;
1787         LASSERT(list_empty(&obd->obd_exports));
1788         spin_lock(&obd->obd_dev_lock);
1789         while (!list_empty(&obd->obd_unlinked_exports)) {
1790                 spin_unlock(&obd->obd_dev_lock);
1791                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1792                 if (waited > 5 && is_power_of_2(waited)) {
1793                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1794                                       "more than %d seconds. "
1795                                       "The obd refcount = %d. Is it stuck?\n",
1796                                       obd->obd_name, waited,
1797                                       atomic_read(&obd->obd_refcount));
1798                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1799                 }
1800                 waited *= 2;
1801                 spin_lock(&obd->obd_dev_lock);
1802         }
1803         spin_unlock(&obd->obd_dev_lock);
1804 }
1805 EXPORT_SYMBOL(obd_exports_barrier);
1806
1807 /**
1808  * Add export to the obd_zombe thread and notify it.
1809  */
1810 static void obd_zombie_export_add(struct obd_export *exp) {
1811         atomic_inc(&obd_stale_export_num);
1812         spin_lock(&exp->exp_obd->obd_dev_lock);
1813         LASSERT(!list_empty(&exp->exp_obd_chain));
1814         list_del_init(&exp->exp_obd_chain);
1815         spin_unlock(&exp->exp_obd->obd_dev_lock);
1816         queue_work(zombie_wq, &exp->exp_zombie_work);
1817 }
1818
1819 /**
1820  * Add import to the obd_zombe thread and notify it.
1821  */
1822 static void obd_zombie_import_add(struct obd_import *imp) {
1823         LASSERT(imp->imp_sec == NULL);
1824
1825         queue_work(zombie_wq, &imp->imp_zombie_work);
1826 }
1827
1828 /**
1829  * wait when obd_zombie import/export queues become empty
1830  */
1831 void obd_zombie_barrier(void)
1832 {
1833         wait_var_event(&obd_stale_export_num,
1834                         atomic_read(&obd_stale_export_num) == 0);
1835         flush_workqueue(zombie_wq);
1836 }
1837 EXPORT_SYMBOL(obd_zombie_barrier);
1838
1839
1840 struct obd_export *obd_stale_export_get(void)
1841 {
1842         struct obd_export *exp = NULL;
1843         ENTRY;
1844
1845         spin_lock(&obd_stale_export_lock);
1846         if (!list_empty(&obd_stale_exports)) {
1847                 exp = list_first_entry(&obd_stale_exports,
1848                                        struct obd_export, exp_stale_list);
1849                 list_del_init(&exp->exp_stale_list);
1850         }
1851         spin_unlock(&obd_stale_export_lock);
1852
1853         if (exp) {
1854                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1855                        atomic_read(&obd_stale_export_num));
1856         }
1857         RETURN(exp);
1858 }
1859 EXPORT_SYMBOL(obd_stale_export_get);
1860
1861 void obd_stale_export_put(struct obd_export *exp)
1862 {
1863         ENTRY;
1864
1865         LASSERT(list_empty(&exp->exp_stale_list));
1866         if (exp->exp_lock_hash &&
1867             atomic_read(&exp->exp_lock_hash->hs_count)) {
1868                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1869                        atomic_read(&obd_stale_export_num));
1870
1871                 spin_lock_bh(&exp->exp_bl_list_lock);
1872                 spin_lock(&obd_stale_export_lock);
1873                 /* Add to the tail if there is no blocked locks,
1874                  * to the head otherwise. */
1875                 if (list_empty(&exp->exp_bl_list))
1876                         list_add_tail(&exp->exp_stale_list,
1877                                       &obd_stale_exports);
1878                 else
1879                         list_add(&exp->exp_stale_list,
1880                                  &obd_stale_exports);
1881
1882                 spin_unlock(&obd_stale_export_lock);
1883                 spin_unlock_bh(&exp->exp_bl_list_lock);
1884         } else {
1885                 class_export_put(exp);
1886         }
1887         EXIT;
1888 }
1889 EXPORT_SYMBOL(obd_stale_export_put);
1890
1891 /**
1892  * Adjust the position of the export in the stale list,
1893  * i.e. move to the head of the list if is needed.
1894  **/
1895 void obd_stale_export_adjust(struct obd_export *exp)
1896 {
1897         LASSERT(exp != NULL);
1898         spin_lock_bh(&exp->exp_bl_list_lock);
1899         spin_lock(&obd_stale_export_lock);
1900
1901         if (!list_empty(&exp->exp_stale_list) &&
1902             !list_empty(&exp->exp_bl_list))
1903                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1904
1905         spin_unlock(&obd_stale_export_lock);
1906         spin_unlock_bh(&exp->exp_bl_list_lock);
1907 }
1908 EXPORT_SYMBOL(obd_stale_export_adjust);
1909
1910 /**
1911  * start destroy zombie import/export thread
1912  */
1913 int obd_zombie_impexp_init(void)
1914 {
1915         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1916                                            0, CFS_CPT_ANY,
1917                                            cfs_cpt_number(cfs_cpt_tab));
1918
1919         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1920 }
1921
1922 /**
1923  * stop destroy zombie import/export thread
1924  */
1925 void obd_zombie_impexp_stop(void)
1926 {
1927         destroy_workqueue(zombie_wq);
1928         LASSERT(list_empty(&obd_stale_exports));
1929 }
1930
1931 /***** Kernel-userspace comm helpers *******/
1932
1933 /* Get length of entire message, including header */
1934 int kuc_len(int payload_len)
1935 {
1936         return sizeof(struct kuc_hdr) + payload_len;
1937 }
1938 EXPORT_SYMBOL(kuc_len);
1939
1940 /* Get a pointer to kuc header, given a ptr to the payload
1941  * @param p Pointer to payload area
1942  * @returns Pointer to kuc header
1943  */
1944 struct kuc_hdr * kuc_ptr(void *p)
1945 {
1946         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1947         LASSERT(lh->kuc_magic == KUC_MAGIC);
1948         return lh;
1949 }
1950 EXPORT_SYMBOL(kuc_ptr);
1951
1952 /* Alloc space for a message, and fill in header
1953  * @return Pointer to payload area
1954  */
1955 void *kuc_alloc(int payload_len, int transport, int type)
1956 {
1957         struct kuc_hdr *lh;
1958         int len = kuc_len(payload_len);
1959
1960         OBD_ALLOC(lh, len);
1961         if (lh == NULL)
1962                 return ERR_PTR(-ENOMEM);
1963
1964         lh->kuc_magic = KUC_MAGIC;
1965         lh->kuc_transport = transport;
1966         lh->kuc_msgtype = type;
1967         lh->kuc_msglen = len;
1968
1969         return (void *)(lh + 1);
1970 }
1971 EXPORT_SYMBOL(kuc_alloc);
1972
1973 /* Takes pointer to payload area */
1974 void kuc_free(void *p, int payload_len)
1975 {
1976         struct kuc_hdr *lh = kuc_ptr(p);
1977         OBD_FREE(lh, kuc_len(payload_len));
1978 }
1979 EXPORT_SYMBOL(kuc_free);
1980
1981 struct obd_request_slot_waiter {
1982         struct list_head        orsw_entry;
1983         wait_queue_head_t       orsw_waitq;
1984         bool                    orsw_signaled;
1985 };
1986
1987 static bool obd_request_slot_avail(struct client_obd *cli,
1988                                    struct obd_request_slot_waiter *orsw)
1989 {
1990         bool avail;
1991
1992         spin_lock(&cli->cl_loi_list_lock);
1993         avail = !!list_empty(&orsw->orsw_entry);
1994         spin_unlock(&cli->cl_loi_list_lock);
1995
1996         return avail;
1997 };
1998
1999 /*
2000  * For network flow control, the RPC sponsor needs to acquire a credit
2001  * before sending the RPC. The credits count for a connection is defined
2002  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2003  * the subsequent RPC sponsors need to wait until others released their
2004  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2005  */
2006 int obd_get_request_slot(struct client_obd *cli)
2007 {
2008         struct obd_request_slot_waiter   orsw;
2009         int                              rc;
2010
2011         spin_lock(&cli->cl_loi_list_lock);
2012         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2013                 cli->cl_rpcs_in_flight++;
2014                 spin_unlock(&cli->cl_loi_list_lock);
2015                 return 0;
2016         }
2017
2018         init_waitqueue_head(&orsw.orsw_waitq);
2019         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2020         orsw.orsw_signaled = false;
2021         spin_unlock(&cli->cl_loi_list_lock);
2022
2023         rc = l_wait_event_abortable(orsw.orsw_waitq,
2024                                     obd_request_slot_avail(cli, &orsw) ||
2025                                     orsw.orsw_signaled);
2026
2027         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2028          * freed but other (such as obd_put_request_slot) is using it. */
2029         spin_lock(&cli->cl_loi_list_lock);
2030         if (rc != 0) {
2031                 if (!orsw.orsw_signaled) {
2032                         if (list_empty(&orsw.orsw_entry))
2033                                 cli->cl_rpcs_in_flight--;
2034                         else
2035                                 list_del(&orsw.orsw_entry);
2036                 }
2037                 rc = -EINTR;
2038         }
2039
2040         if (orsw.orsw_signaled) {
2041                 LASSERT(list_empty(&orsw.orsw_entry));
2042
2043                 rc = -EINTR;
2044         }
2045         spin_unlock(&cli->cl_loi_list_lock);
2046
2047         return rc;
2048 }
2049 EXPORT_SYMBOL(obd_get_request_slot);
2050
2051 void obd_put_request_slot(struct client_obd *cli)
2052 {
2053         struct obd_request_slot_waiter *orsw;
2054
2055         spin_lock(&cli->cl_loi_list_lock);
2056         cli->cl_rpcs_in_flight--;
2057
2058         /* If there is free slot, wakeup the first waiter. */
2059         if (!list_empty(&cli->cl_flight_waiters) &&
2060             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2061                 orsw = list_first_entry(&cli->cl_flight_waiters,
2062                                         struct obd_request_slot_waiter,
2063                                         orsw_entry);
2064                 list_del_init(&orsw->orsw_entry);
2065                 cli->cl_rpcs_in_flight++;
2066                 wake_up(&orsw->orsw_waitq);
2067         }
2068         spin_unlock(&cli->cl_loi_list_lock);
2069 }
2070 EXPORT_SYMBOL(obd_put_request_slot);
2071
2072 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2073 {
2074         return cli->cl_max_rpcs_in_flight;
2075 }
2076 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2077
2078 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2079 {
2080         struct obd_request_slot_waiter *orsw;
2081         __u32                           old;
2082         int                             diff;
2083         int                             i;
2084         int                             rc;
2085
2086         if (max > OBD_MAX_RIF_MAX || max < 1)
2087                 return -ERANGE;
2088
2089         CDEBUG(D_INFO, "%s: max = %u max_mod = %u rif = %u\n",
2090                cli->cl_import->imp_obd->obd_name, max,
2091                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2092
2093         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2094                    LUSTRE_MDC_NAME) == 0) {
2095                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2096                  * strictly lower that max_rpcs_in_flight */
2097                 if (max < 2) {
2098                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2099                                cli->cl_import->imp_obd->obd_name);
2100                         return -ERANGE;
2101                 }
2102                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2103                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2104                         if (rc != 0)
2105                                 return rc;
2106                 }
2107         }
2108
2109         spin_lock(&cli->cl_loi_list_lock);
2110         old = cli->cl_max_rpcs_in_flight;
2111         cli->cl_max_rpcs_in_flight = max;
2112         client_adjust_max_dirty(cli);
2113
2114         diff = max - old;
2115
2116         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2117         for (i = 0; i < diff; i++) {
2118                 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2119                                                 struct obd_request_slot_waiter,
2120                                                 orsw_entry);
2121                 if (!orsw)
2122                         break;
2123
2124                 list_del_init(&orsw->orsw_entry);
2125                 cli->cl_rpcs_in_flight++;
2126                 wake_up(&orsw->orsw_waitq);
2127         }
2128         spin_unlock(&cli->cl_loi_list_lock);
2129
2130         return 0;
2131 }
2132 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2133
2134 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2135 {
2136         return cli->cl_max_mod_rpcs_in_flight;
2137 }
2138 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2139
2140 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2141 {
2142         struct obd_connect_data *ocd;
2143         __u16 maxmodrpcs;
2144         __u16 prev;
2145
2146         if (max > OBD_MAX_RIF_MAX || max < 1)
2147                 return -ERANGE;
2148
2149         ocd = &cli->cl_import->imp_connect_data;
2150         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2151                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2152                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2153
2154         if (max == OBD_MAX_RIF_MAX)
2155                 max = OBD_MAX_RIF_MAX - 1;
2156
2157         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2158          * increase this value, also bump up max_rpcs_in_flight to match.
2159          */
2160         if (max >= cli->cl_max_rpcs_in_flight) {
2161                 CDEBUG(D_INFO,
2162                        "%s: increasing max_rpcs_in_flight=%u to allow larger max_mod_rpcs_in_flight=%u\n",
2163                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2164                 obd_set_max_rpcs_in_flight(cli, max + 1);
2165         }
2166
2167         /* cannot exceed max modify RPCs in flight supported by the server,
2168          * but verify ocd_connect_flags is at least initialized first.  If
2169          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2170          */
2171         if (!ocd->ocd_connect_flags) {
2172                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2173         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2174                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2175                 if (maxmodrpcs == 0) { /* connection not finished yet */
2176                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2177                         CDEBUG(D_INFO,
2178                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2179                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2180                 }
2181         } else {
2182                 maxmodrpcs = 1;
2183         }
2184         if (max > maxmodrpcs) {
2185                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2186                        cli->cl_import->imp_obd->obd_name,
2187                        max, maxmodrpcs);
2188                 return -ERANGE;
2189         }
2190
2191         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2192
2193         prev = cli->cl_max_mod_rpcs_in_flight;
2194         cli->cl_max_mod_rpcs_in_flight = max;
2195
2196         /* wakeup waiters if limit has been increased */
2197         if (cli->cl_max_mod_rpcs_in_flight > prev)
2198                 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2199
2200         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2201
2202         return 0;
2203 }
2204 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2205
2206 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2207                                struct seq_file *seq)
2208 {
2209         unsigned long mod_tot = 0, mod_cum;
2210         int i;
2211
2212         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2213         lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2214                              ":", true, "");
2215         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2216                    cli->cl_mod_rpcs_in_flight);
2217
2218         seq_printf(seq, "\n\t\t\tmodify\n");
2219         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2220
2221         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2222
2223         mod_cum = 0;
2224         for (i = 0; i < OBD_HIST_MAX; i++) {
2225                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2226
2227                 mod_cum += mod;
2228                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2229                            i, mod, pct(mod, mod_tot),
2230                            pct(mod_cum, mod_tot));
2231                 if (mod_cum == mod_tot)
2232                         break;
2233         }
2234
2235         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2236
2237         return 0;
2238 }
2239 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2240
2241 /* The number of modify RPCs sent in parallel is limited
2242  * because the server has a finite number of slots per client to
2243  * store request result and ensure reply reconstruction when needed.
2244  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2245  * that takes into account server limit and cl_max_rpcs_in_flight
2246  * value.
2247  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2248  * one close request is allowed above the maximum.
2249  */
2250 struct mod_waiter {
2251         struct client_obd *cli;
2252         bool close_req;
2253         bool woken;
2254         wait_queue_entry_t wqe;
2255 };
2256 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2257                                   unsigned int mode, int flags, void *key)
2258 {
2259         struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2260         struct client_obd *cli = w->cli;
2261         bool close_req = w->close_req;
2262         bool avail;
2263         int ret;
2264
2265         /* As woken_wake_function() doesn't remove us from the wait_queue,
2266          * we use own flag to ensure we're called just once.
2267          */
2268         if (w->woken)
2269                 return 0;
2270
2271         /* A slot is available if
2272          * - number of modify RPCs in flight is less than the max
2273          * - it's a close RPC and no other close request is in flight
2274          */
2275         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2276                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2277         if (avail) {
2278                 cli->cl_mod_rpcs_in_flight++;
2279                 if (w->close_req)
2280                         cli->cl_close_rpcs_in_flight++;
2281                 ret = woken_wake_function(wq_entry, mode, flags, key);
2282                 w->woken = true;
2283         } else if (cli->cl_close_rpcs_in_flight)
2284                 /* No other waiter could be woken */
2285                 ret = -1;
2286         else if (key == NULL)
2287                 /* This was not a wakeup from a close completion, so there is no
2288                  * point seeing if there are close waiters to be woken
2289                  */
2290                 ret = -1;
2291         else
2292                 /* There might be be a close we could wake, keep looking */
2293                 ret = 0;
2294         return ret;
2295 }
2296
2297 /* Get a modify RPC slot from the obd client @cli according
2298  * to the kind of operation @opc that is going to be sent
2299  * and the intent @it of the operation if it applies.
2300  * If the maximum number of modify RPCs in flight is reached
2301  * the thread is put to sleep.
2302  * Returns the tag to be set in the request message. Tag 0
2303  * is reserved for non-modifying requests.
2304  */
2305 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2306 {
2307         struct mod_waiter wait = {
2308                 .cli = cli,
2309                 .close_req = (opc == MDS_CLOSE),
2310                 .woken = false,
2311         };
2312         __u16                   i, max;
2313
2314         init_wait(&wait.wqe);
2315         wait.wqe.func = claim_mod_rpc_function;
2316
2317         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2318         __add_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2319         /* This wakeup will only succeed if the maximums haven't
2320          * been reached.  If that happens, WQ_FLAG_WOKEN will be cleared
2321          * and there will be no need to wait.
2322          */
2323         wake_up_locked(&cli->cl_mod_rpcs_waitq);
2324         /* XXX: handle spurious wakeups (from unknown yet source */
2325         while (wait.woken == false) {
2326                 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2327                 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2328                            MAX_SCHEDULE_TIMEOUT);
2329                 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2330         }
2331         __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2332
2333         max = cli->cl_max_mod_rpcs_in_flight;
2334         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2335                          cli->cl_mod_rpcs_in_flight);
2336         /* find a free tag */
2337         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2338                                 max + 1);
2339         LASSERT(i < OBD_MAX_RIF_MAX);
2340         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2341         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2342         /* tag 0 is reserved for non-modify RPCs */
2343
2344         CDEBUG(D_RPCTRACE,
2345                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2346                cli->cl_import->imp_obd->obd_name,
2347                i + 1, opc, max);
2348
2349         return i + 1;
2350 }
2351 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2352
2353 /* Put a modify RPC slot from the obd client @cli according
2354  * to the kind of operation @opc that has been sent.
2355  */
2356 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2357 {
2358         bool                    close_req = false;
2359
2360         if (tag == 0)
2361                 return;
2362
2363         if (opc == MDS_CLOSE)
2364                 close_req = true;
2365
2366         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2367         cli->cl_mod_rpcs_in_flight--;
2368         if (close_req)
2369                 cli->cl_close_rpcs_in_flight--;
2370         /* release the tag in the bitmap */
2371         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2372         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2373         __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2374                              (void *)close_req);
2375         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2376 }
2377 EXPORT_SYMBOL(obd_put_mod_rpc_slot);