Whamcloud - gitweb
LU-9806 obdclass: wait for all exports to go
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
50
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
54
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58                               const char *status, int locks, int debug_level);
59
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
63
64 /*
65  * support functions: we could use inter-module communication, but this
66  * is more portable to other OS's
67  */
68 static struct obd_device *obd_device_alloc(void)
69 {
70         struct obd_device *obd;
71
72         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
73         if (obd != NULL) {
74                 obd->obd_magic = OBD_DEVICE_MAGIC;
75         }
76         return obd;
77 }
78
79 static void obd_device_free(struct obd_device *obd)
80 {
81         LASSERT(obd != NULL);
82         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84         if (obd->obd_namespace != NULL) {
85                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86                        obd, obd->obd_namespace, obd->obd_force);
87                 LBUG();
88         }
89         lu_ref_fini(&obd->obd_reference);
90         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 }
92
93 struct obd_type *class_search_type(const char *name)
94 {
95         struct kobject *kobj = kset_find_obj(lustre_kset, name);
96
97         if (kobj && kobj->ktype == &class_ktype)
98                 return container_of(kobj, struct obd_type, typ_kobj);
99
100         kobject_put(kobj);
101         return NULL;
102 }
103 EXPORT_SYMBOL(class_search_type);
104
105 struct obd_type *class_get_type(const char *name)
106 {
107         struct obd_type *type;
108
109         type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
111         if (!type) {
112                 const char *modname = name;
113
114 #ifdef HAVE_SERVER_SUPPORT
115                 if (strcmp(modname, "obdfilter") == 0)
116                         modname = "ofd";
117
118                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119                         modname = LUSTRE_OSP_NAME;
120
121                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122                         modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
124
125                 if (!request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 if (try_module_get(type->typ_dt_ops->o_owner)) {
136                         atomic_inc(&type->typ_refcnt);
137                         /* class_search_type() returned a counted reference,
138                          * but we don't need that count any more as
139                          * we have one through typ_refcnt.
140                          */
141                         kobject_put(&type->typ_kobj);
142                 } else {
143                         kobject_put(&type->typ_kobj);
144                         type = NULL;
145                 }
146         }
147         return type;
148 }
149 EXPORT_SYMBOL(class_get_type);
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         module_put(type->typ_dt_ops->o_owner);
155         atomic_dec(&type->typ_refcnt);
156 }
157 EXPORT_SYMBOL(class_put_type);
158
159 static void class_sysfs_release(struct kobject *kobj)
160 {
161         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
162
163         debugfs_remove_recursive(type->typ_debugfs_entry);
164         type->typ_debugfs_entry = NULL;
165
166         if (type->typ_lu)
167                 lu_device_type_fini(type->typ_lu);
168
169 #ifdef CONFIG_PROC_FS
170         if (type->typ_name && type->typ_procroot)
171                 remove_proc_subtree(type->typ_name, proc_lustre_root);
172 #endif
173         OBD_FREE(type, sizeof(*type));
174 }
175
176 static struct kobj_type class_ktype = {
177         .sysfs_ops      = &lustre_sysfs_ops,
178         .release        = class_sysfs_release,
179 };
180
181 #ifdef HAVE_SERVER_SUPPORT
182 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
183 {
184         struct dentry *symlink;
185         struct obd_type *type;
186         int rc;
187
188         type = class_search_type(name);
189         if (type) {
190                 kobject_put(&type->typ_kobj);
191                 return ERR_PTR(-EEXIST);
192         }
193
194         OBD_ALLOC(type, sizeof(*type));
195         if (!type)
196                 return ERR_PTR(-ENOMEM);
197
198         type->typ_kobj.kset = lustre_kset;
199         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
200                                   &lustre_kset->kobj, "%s", name);
201         if (rc)
202                 return ERR_PTR(rc);
203
204         symlink = debugfs_create_dir(name, debugfs_lustre_root);
205         type->typ_debugfs_entry = symlink;
206         type->typ_sym_filter = true;
207
208         if (enable_proc) {
209                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
210                                                       NULL, NULL);
211                 if (IS_ERR(type->typ_procroot)) {
212                         CERROR("%s: can't create compat proc entry: %d\n",
213                                name, (int)PTR_ERR(type->typ_procroot));
214                         type->typ_procroot = NULL;
215                 }
216         }
217
218         return type;
219 }
220 EXPORT_SYMBOL(class_add_symlinks);
221 #endif /* HAVE_SERVER_SUPPORT */
222
223 #define CLASS_MAX_NAME 1024
224
225 int class_register_type(const struct obd_ops *dt_ops,
226                         const struct md_ops *md_ops,
227                         bool enable_proc,
228                         const char *name, struct lu_device_type *ldt)
229 {
230         struct obd_type *type;
231         int rc;
232
233         ENTRY;
234         /* sanity check */
235         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
236
237         type = class_search_type(name);
238         if (type) {
239 #ifdef HAVE_SERVER_SUPPORT
240                 if (type->typ_sym_filter)
241                         goto dir_exist;
242 #endif /* HAVE_SERVER_SUPPORT */
243                 kobject_put(&type->typ_kobj);
244                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
245                 RETURN(-EEXIST);
246         }
247
248         OBD_ALLOC(type, sizeof(*type));
249         if (type == NULL)
250                 RETURN(-ENOMEM);
251
252         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
253         type->typ_kobj.kset = lustre_kset;
254         kobject_init(&type->typ_kobj, &class_ktype);
255 #ifdef HAVE_SERVER_SUPPORT
256 dir_exist:
257 #endif /* HAVE_SERVER_SUPPORT */
258
259         type->typ_dt_ops = dt_ops;
260         type->typ_md_ops = md_ops;
261
262 #ifdef HAVE_SERVER_SUPPORT
263         if (type->typ_sym_filter) {
264                 type->typ_sym_filter = false;
265                 kobject_put(&type->typ_kobj);
266                 goto setup_ldt;
267         }
268 #endif
269 #ifdef CONFIG_PROC_FS
270         if (enable_proc && !type->typ_procroot) {
271                 type->typ_procroot = lprocfs_register(name,
272                                                       proc_lustre_root,
273                                                       NULL, type);
274                 if (IS_ERR(type->typ_procroot)) {
275                         rc = PTR_ERR(type->typ_procroot);
276                         type->typ_procroot = NULL;
277                         GOTO(failed, rc);
278                 }
279         }
280 #endif
281         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
282
283         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
284         if (rc)
285                 GOTO(failed, rc);
286 #ifdef HAVE_SERVER_SUPPORT
287 setup_ldt:
288 #endif
289         if (ldt) {
290                 rc = lu_device_type_init(ldt);
291                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
292                 wake_up_var(&type->typ_lu);
293                 if (rc)
294                         GOTO(failed, rc);
295         }
296
297         RETURN(0);
298
299 failed:
300         kobject_put(&type->typ_kobj);
301
302         RETURN(rc);
303 }
304 EXPORT_SYMBOL(class_register_type);
305
306 int class_unregister_type(const char *name)
307 {
308         struct obd_type *type = class_search_type(name);
309         int rc = 0;
310         ENTRY;
311
312         if (!type) {
313                 CERROR("unknown obd type\n");
314                 RETURN(-EINVAL);
315         }
316
317         if (atomic_read(&type->typ_refcnt)) {
318                 CERROR("type %s has refcount (%d)\n", name,
319                        atomic_read(&type->typ_refcnt));
320                 /* This is a bad situation, let's make the best of it */
321                 /* Remove ops, but leave the name for debugging */
322                 type->typ_dt_ops = NULL;
323                 type->typ_md_ops = NULL;
324                 GOTO(out_put, rc = -EBUSY);
325         }
326
327         /* Put the final ref */
328         kobject_put(&type->typ_kobj);
329 out_put:
330         /* Put the ref returned by class_search_type() */
331         kobject_put(&type->typ_kobj);
332
333         RETURN(rc);
334 } /* class_unregister_type */
335 EXPORT_SYMBOL(class_unregister_type);
336
337 /**
338  * Create a new obd device.
339  *
340  * Allocate the new obd_device and initialize it.
341  *
342  * \param[in] type_name obd device type string.
343  * \param[in] name      obd device name.
344  * \param[in] uuid      obd device UUID
345  *
346  * \retval newdev         pointer to created obd_device
347  * \retval ERR_PTR(errno) on error
348  */
349 struct obd_device *class_newdev(const char *type_name, const char *name,
350                                 const char *uuid)
351 {
352         struct obd_device *newdev;
353         struct obd_type *type = NULL;
354         ENTRY;
355
356         if (strlen(name) >= MAX_OBD_NAME) {
357                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
358                 RETURN(ERR_PTR(-EINVAL));
359         }
360
361         type = class_get_type(type_name);
362         if (type == NULL){
363                 CERROR("OBD: unknown type: %s\n", type_name);
364                 RETURN(ERR_PTR(-ENODEV));
365         }
366
367         newdev = obd_device_alloc();
368         if (newdev == NULL) {
369                 class_put_type(type);
370                 RETURN(ERR_PTR(-ENOMEM));
371         }
372         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
373         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
374         newdev->obd_type = type;
375         newdev->obd_minor = -1;
376
377         rwlock_init(&newdev->obd_pool_lock);
378         newdev->obd_pool_limit = 0;
379         newdev->obd_pool_slv = 0;
380
381         INIT_LIST_HEAD(&newdev->obd_exports);
382         newdev->obd_num_exports = 0;
383         newdev->obd_grant_check_threshold = 100;
384         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
385         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
386         INIT_LIST_HEAD(&newdev->obd_exports_timed);
387         INIT_LIST_HEAD(&newdev->obd_nid_stats);
388         spin_lock_init(&newdev->obd_nid_lock);
389         spin_lock_init(&newdev->obd_dev_lock);
390         mutex_init(&newdev->obd_dev_mutex);
391         spin_lock_init(&newdev->obd_osfs_lock);
392         /* newdev->obd_osfs_age must be set to a value in the distant
393          * past to guarantee a fresh statfs is fetched on mount. */
394         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
395
396         /* XXX belongs in setup not attach  */
397         init_rwsem(&newdev->obd_observer_link_sem);
398         /* recovery data */
399         spin_lock_init(&newdev->obd_recovery_task_lock);
400         init_waitqueue_head(&newdev->obd_next_transno_waitq);
401         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
402         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
403         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
404         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
405         INIT_LIST_HEAD(&newdev->obd_evict_list);
406         INIT_LIST_HEAD(&newdev->obd_lwp_list);
407
408         llog_group_init(&newdev->obd_olg);
409         /* Detach drops this */
410         atomic_set(&newdev->obd_refcount, 1);
411         lu_ref_init(&newdev->obd_reference);
412         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
413
414         newdev->obd_conn_inprogress = 0;
415
416         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
417
418         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
419                newdev->obd_name, newdev);
420
421         return newdev;
422 }
423
424 /**
425  * Free obd device.
426  *
427  * \param[in] obd obd_device to be freed
428  *
429  * \retval none
430  */
431 void class_free_dev(struct obd_device *obd)
432 {
433         struct obd_type *obd_type = obd->obd_type;
434
435         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
436                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
437         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
438                  "obd %p != obd_devs[%d] %p\n",
439                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
440         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
441                  "obd_refcount should be 0, not %d\n",
442                  atomic_read(&obd->obd_refcount));
443         LASSERT(obd_type != NULL);
444
445         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
446                obd->obd_name, obd->obd_type->typ_name);
447
448         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
449                          obd->obd_name, obd->obd_uuid.uuid);
450         if (obd->obd_stopping) {
451                 int err;
452
453                 /* If we're not stopping, we were never set up */
454                 err = obd_cleanup(obd);
455                 if (err)
456                         CERROR("Cleanup %s returned %d\n",
457                                 obd->obd_name, err);
458         }
459
460         obd_device_free(obd);
461
462         class_put_type(obd_type);
463 }
464
465 /**
466  * Unregister obd device.
467  *
468  * Free slot in obd_dev[] used by \a obd.
469  *
470  * \param[in] new_obd obd_device to be unregistered
471  *
472  * \retval none
473  */
474 void class_unregister_device(struct obd_device *obd)
475 {
476         write_lock(&obd_dev_lock);
477         if (obd->obd_minor >= 0) {
478                 LASSERT(obd_devs[obd->obd_minor] == obd);
479                 obd_devs[obd->obd_minor] = NULL;
480                 obd->obd_minor = -1;
481         }
482         write_unlock(&obd_dev_lock);
483 }
484
485 /**
486  * Register obd device.
487  *
488  * Find free slot in obd_devs[], fills it with \a new_obd.
489  *
490  * \param[in] new_obd obd_device to be registered
491  *
492  * \retval 0          success
493  * \retval -EEXIST    device with this name is registered
494  * \retval -EOVERFLOW obd_devs[] is full
495  */
496 int class_register_device(struct obd_device *new_obd)
497 {
498         int ret = 0;
499         int i;
500         int new_obd_minor = 0;
501         bool minor_assign = false;
502         bool retried = false;
503
504 again:
505         write_lock(&obd_dev_lock);
506         for (i = 0; i < class_devno_max(); i++) {
507                 struct obd_device *obd = class_num2obd(i);
508
509                 if (obd != NULL &&
510                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
511
512                         if (!retried) {
513                                 write_unlock(&obd_dev_lock);
514
515                                 /* the obd_device could be waited to be
516                                  * destroyed by the "obd_zombie_impexp_thread".
517                                  */
518                                 obd_zombie_barrier();
519                                 retried = true;
520                                 goto again;
521                         }
522
523                         CERROR("%s: already exists, won't add\n",
524                                obd->obd_name);
525                         /* in case we found a free slot before duplicate */
526                         minor_assign = false;
527                         ret = -EEXIST;
528                         break;
529                 }
530                 if (!minor_assign && obd == NULL) {
531                         new_obd_minor = i;
532                         minor_assign = true;
533                 }
534         }
535
536         if (minor_assign) {
537                 new_obd->obd_minor = new_obd_minor;
538                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
539                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
540                 obd_devs[new_obd_minor] = new_obd;
541         } else {
542                 if (ret == 0) {
543                         ret = -EOVERFLOW;
544                         CERROR("%s: all %u/%u devices used, increase "
545                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
546                                i, class_devno_max(), ret);
547                 }
548         }
549         write_unlock(&obd_dev_lock);
550
551         RETURN(ret);
552 }
553
554 static int class_name2dev_nolock(const char *name)
555 {
556         int i;
557
558         if (!name)
559                 return -1;
560
561         for (i = 0; i < class_devno_max(); i++) {
562                 struct obd_device *obd = class_num2obd(i);
563
564                 if (obd && strcmp(name, obd->obd_name) == 0) {
565                         /* Make sure we finished attaching before we give
566                            out any references */
567                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
568                         if (obd->obd_attached) {
569                                 return i;
570                         }
571                         break;
572                 }
573         }
574
575         return -1;
576 }
577
578 int class_name2dev(const char *name)
579 {
580         int i;
581
582         if (!name)
583                 return -1;
584
585         read_lock(&obd_dev_lock);
586         i = class_name2dev_nolock(name);
587         read_unlock(&obd_dev_lock);
588
589         return i;
590 }
591 EXPORT_SYMBOL(class_name2dev);
592
593 struct obd_device *class_name2obd(const char *name)
594 {
595         int dev = class_name2dev(name);
596
597         if (dev < 0 || dev > class_devno_max())
598                 return NULL;
599         return class_num2obd(dev);
600 }
601 EXPORT_SYMBOL(class_name2obd);
602
603 static int class_uuid2dev_nolock(struct obd_uuid *uuid)
604 {
605         int i;
606
607         for (i = 0; i < class_devno_max(); i++) {
608                 struct obd_device *obd = class_num2obd(i);
609
610                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
611                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
612                         return i;
613                 }
614         }
615
616         return -1;
617 }
618
619 int class_uuid2dev(struct obd_uuid *uuid)
620 {
621         int i;
622
623         read_lock(&obd_dev_lock);
624         i = class_uuid2dev_nolock(uuid);
625         read_unlock(&obd_dev_lock);
626
627         return i;
628 }
629 EXPORT_SYMBOL(class_uuid2dev);
630
631 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
632 {
633         int dev = class_uuid2dev(uuid);
634         if (dev < 0)
635                 return NULL;
636         return class_num2obd(dev);
637 }
638 EXPORT_SYMBOL(class_uuid2obd);
639
640 /**
641  * Get obd device from ::obd_devs[]
642  *
643  * \param num [in] array index
644  *
645  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
646  *         otherwise return the obd device there.
647  */
648 struct obd_device *class_num2obd(int num)
649 {
650         struct obd_device *obd = NULL;
651
652         if (num < class_devno_max()) {
653                 obd = obd_devs[num];
654                 if (obd == NULL)
655                         return NULL;
656
657                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
658                          "%p obd_magic %08x != %08x\n",
659                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
660                 LASSERTF(obd->obd_minor == num,
661                          "%p obd_minor %0d != %0d\n",
662                          obd, obd->obd_minor, num);
663         }
664
665         return obd;
666 }
667 EXPORT_SYMBOL(class_num2obd);
668
669 /**
670  * Find obd in obd_dev[] by name or uuid.
671  *
672  * Increment obd's refcount if found.
673  *
674  * \param[in] str obd name or uuid
675  *
676  * \retval NULL    if not found
677  * \retval target  pointer to found obd_device
678  */
679 struct obd_device *class_dev_by_str(const char *str)
680 {
681         struct obd_device *target = NULL;
682         struct obd_uuid tgtuuid;
683         int rc;
684
685         obd_str2uuid(&tgtuuid, str);
686
687         read_lock(&obd_dev_lock);
688         rc = class_uuid2dev_nolock(&tgtuuid);
689         if (rc < 0)
690                 rc = class_name2dev_nolock(str);
691
692         if (rc >= 0)
693                 target = class_num2obd(rc);
694
695         if (target != NULL)
696                 class_incref(target, "find", current);
697         read_unlock(&obd_dev_lock);
698
699         RETURN(target);
700 }
701 EXPORT_SYMBOL(class_dev_by_str);
702
703 /**
704  * Get obd devices count. Device in any
705  *    state are counted
706  * \retval obd device count
707  */
708 int get_devices_count(void)
709 {
710         int index, max_index = class_devno_max(), dev_count = 0;
711
712         read_lock(&obd_dev_lock);
713         for (index = 0; index <= max_index; index++) {
714                 struct obd_device *obd = class_num2obd(index);
715                 if (obd != NULL)
716                         dev_count++;
717         }
718         read_unlock(&obd_dev_lock);
719
720         return dev_count;
721 }
722 EXPORT_SYMBOL(get_devices_count);
723
724 void class_obd_list(void)
725 {
726         char *status;
727         int i;
728
729         read_lock(&obd_dev_lock);
730         for (i = 0; i < class_devno_max(); i++) {
731                 struct obd_device *obd = class_num2obd(i);
732
733                 if (obd == NULL)
734                         continue;
735                 if (obd->obd_stopping)
736                         status = "ST";
737                 else if (obd->obd_set_up)
738                         status = "UP";
739                 else if (obd->obd_attached)
740                         status = "AT";
741                 else
742                         status = "--";
743                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
744                          i, status, obd->obd_type->typ_name,
745                          obd->obd_name, obd->obd_uuid.uuid,
746                          atomic_read(&obd->obd_refcount));
747         }
748         read_unlock(&obd_dev_lock);
749 }
750
751 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
752  * specified, then only the client with that uuid is returned,
753  * otherwise any client connected to the tgt is returned.
754  */
755 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
756                                          const char *type_name,
757                                          struct obd_uuid *grp_uuid)
758 {
759         int i;
760
761         read_lock(&obd_dev_lock);
762         for (i = 0; i < class_devno_max(); i++) {
763                 struct obd_device *obd = class_num2obd(i);
764
765                 if (obd == NULL)
766                         continue;
767                 if ((strncmp(obd->obd_type->typ_name, type_name,
768                              strlen(type_name)) == 0)) {
769                         if (obd_uuid_equals(tgt_uuid,
770                                             &obd->u.cli.cl_target_uuid) &&
771                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
772                                                          &obd->obd_uuid) : 1)) {
773                                 read_unlock(&obd_dev_lock);
774                                 return obd;
775                         }
776                 }
777         }
778         read_unlock(&obd_dev_lock);
779
780         return NULL;
781 }
782 EXPORT_SYMBOL(class_find_client_obd);
783
784 /* Iterate the obd_device list looking devices have grp_uuid. Start
785  * searching at *next, and if a device is found, the next index to look
786  * at is saved in *next. If next is NULL, then the first matching device
787  * will always be returned.
788  */
789 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
790 {
791         int i;
792
793         if (next == NULL)
794                 i = 0;
795         else if (*next >= 0 && *next < class_devno_max())
796                 i = *next;
797         else
798                 return NULL;
799
800         read_lock(&obd_dev_lock);
801         for (; i < class_devno_max(); i++) {
802                 struct obd_device *obd = class_num2obd(i);
803
804                 if (obd == NULL)
805                         continue;
806                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
807                         if (next != NULL)
808                                 *next = i+1;
809                         read_unlock(&obd_dev_lock);
810                         return obd;
811                 }
812         }
813         read_unlock(&obd_dev_lock);
814
815         return NULL;
816 }
817 EXPORT_SYMBOL(class_devices_in_group);
818
819 /**
820  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
821  * adjust sptlrpc settings accordingly.
822  */
823 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
824 {
825         struct obd_device  *obd;
826         const char         *type;
827         int                 i, rc = 0, rc2;
828
829         LASSERT(namelen > 0);
830
831         read_lock(&obd_dev_lock);
832         for (i = 0; i < class_devno_max(); i++) {
833                 obd = class_num2obd(i);
834
835                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
836                         continue;
837
838                 /* only notify mdc, osc, osp, lwp, mdt, ost
839                  * because only these have a -sptlrpc llog */
840                 type = obd->obd_type->typ_name;
841                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
842                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
843                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
844                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
845                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
846                     strcmp(type, LUSTRE_OST_NAME) != 0)
847                         continue;
848
849                 if (strncmp(obd->obd_name, fsname, namelen))
850                         continue;
851
852                 class_incref(obd, __FUNCTION__, obd);
853                 read_unlock(&obd_dev_lock);
854                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
855                                          sizeof(KEY_SPTLRPC_CONF),
856                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
857                 rc = rc ? rc : rc2;
858                 class_decref(obd, __FUNCTION__, obd);
859                 read_lock(&obd_dev_lock);
860         }
861         read_unlock(&obd_dev_lock);
862         return rc;
863 }
864 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
865
866 void obd_cleanup_caches(void)
867 {
868         ENTRY;
869         if (obd_device_cachep) {
870                 kmem_cache_destroy(obd_device_cachep);
871                 obd_device_cachep = NULL;
872         }
873
874         EXIT;
875 }
876
877 int obd_init_caches(void)
878 {
879         int rc;
880         ENTRY;
881
882         LASSERT(obd_device_cachep == NULL);
883         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
884                                 sizeof(struct obd_device),
885                                 0, 0, 0, sizeof(struct obd_device), NULL);
886         if (!obd_device_cachep)
887                 GOTO(out, rc = -ENOMEM);
888
889         RETURN(0);
890 out:
891         obd_cleanup_caches();
892         RETURN(rc);
893 }
894
895 static const char export_handle_owner[] = "export";
896
897 /* map connection to client */
898 struct obd_export *class_conn2export(struct lustre_handle *conn)
899 {
900         struct obd_export *export;
901         ENTRY;
902
903         if (!conn) {
904                 CDEBUG(D_CACHE, "looking for null handle\n");
905                 RETURN(NULL);
906         }
907
908         if (conn->cookie == -1) {  /* this means assign a new connection */
909                 CDEBUG(D_CACHE, "want a new connection\n");
910                 RETURN(NULL);
911         }
912
913         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
914         export = class_handle2object(conn->cookie, export_handle_owner);
915         RETURN(export);
916 }
917 EXPORT_SYMBOL(class_conn2export);
918
919 struct obd_device *class_exp2obd(struct obd_export *exp)
920 {
921         if (exp)
922                 return exp->exp_obd;
923         return NULL;
924 }
925 EXPORT_SYMBOL(class_exp2obd);
926
927 struct obd_import *class_exp2cliimp(struct obd_export *exp)
928 {
929         struct obd_device *obd = exp->exp_obd;
930         if (obd == NULL)
931                 return NULL;
932         return obd->u.cli.cl_import;
933 }
934 EXPORT_SYMBOL(class_exp2cliimp);
935
936 /* Export management functions */
937 static void class_export_destroy(struct obd_export *exp)
938 {
939         struct obd_device *obd = exp->exp_obd;
940         ENTRY;
941
942         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
943         LASSERT(obd != NULL);
944
945         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
946                exp->exp_client_uuid.uuid, obd->obd_name);
947
948         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
949         ptlrpc_connection_put(exp->exp_connection);
950
951         LASSERT(list_empty(&exp->exp_outstanding_replies));
952         LASSERT(list_empty(&exp->exp_uncommitted_replies));
953         LASSERT(list_empty(&exp->exp_req_replay_queue));
954         LASSERT(list_empty(&exp->exp_hp_rpcs));
955         obd_destroy_export(exp);
956         /* self export doesn't hold a reference to an obd, although it
957          * exists until freeing of the obd */
958         if (exp != obd->obd_self_export)
959                 class_decref(obd, "export", exp);
960
961         OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
962         kfree_rcu(exp, exp_handle.h_rcu);
963         EXIT;
964 }
965
966 struct obd_export *class_export_get(struct obd_export *exp)
967 {
968         refcount_inc(&exp->exp_handle.h_ref);
969         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
970                refcount_read(&exp->exp_handle.h_ref));
971         return exp;
972 }
973 EXPORT_SYMBOL(class_export_get);
974
975 void class_export_put(struct obd_export *exp)
976 {
977         LASSERT(exp != NULL);
978         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
979         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
980         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
981                refcount_read(&exp->exp_handle.h_ref) - 1);
982
983         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
984                 struct obd_device *obd = exp->exp_obd;
985
986                 CDEBUG(D_IOCTL, "final put %p/%s\n",
987                        exp, exp->exp_client_uuid.uuid);
988
989                 /* release nid stat refererence */
990                 lprocfs_exp_cleanup(exp);
991
992                 if (exp == obd->obd_self_export) {
993                         /* self export should be destroyed without
994                          * zombie thread as it doesn't hold a
995                          * reference to obd and doesn't hold any
996                          * resources */
997                         class_export_destroy(exp);
998                         /* self export is destroyed, no class
999                          * references exist and it is safe to free
1000                          * obd */
1001                         class_free_dev(obd);
1002                 } else {
1003                         LASSERT(!list_empty(&exp->exp_obd_chain));
1004                         obd_zombie_export_add(exp);
1005                 }
1006
1007         }
1008 }
1009 EXPORT_SYMBOL(class_export_put);
1010
1011 static void obd_zombie_exp_cull(struct work_struct *ws)
1012 {
1013         struct obd_export *export;
1014
1015         export = container_of(ws, struct obd_export, exp_zombie_work);
1016         class_export_destroy(export);
1017         LASSERT(atomic_read(&obd_stale_export_num) > 0);
1018         if (atomic_dec_and_test(&obd_stale_export_num))
1019                 wake_up_var(&obd_stale_export_num);
1020 }
1021
1022 /* Creates a new export, adds it to the hash table, and returns a
1023  * pointer to it. The refcount is 2: one for the hash reference, and
1024  * one for the pointer returned by this function. */
1025 static struct obd_export *__class_new_export(struct obd_device *obd,
1026                                              struct obd_uuid *cluuid,
1027                                              bool is_self)
1028 {
1029         struct obd_export *export;
1030         int rc = 0;
1031         ENTRY;
1032
1033         OBD_ALLOC_PTR(export);
1034         if (!export)
1035                 return ERR_PTR(-ENOMEM);
1036
1037         export->exp_conn_cnt = 0;
1038         export->exp_lock_hash = NULL;
1039         export->exp_flock_hash = NULL;
1040         /* 2 = class_handle_hash + last */
1041         refcount_set(&export->exp_handle.h_ref, 2);
1042         atomic_set(&export->exp_rpc_count, 0);
1043         atomic_set(&export->exp_cb_count, 0);
1044         atomic_set(&export->exp_locks_count, 0);
1045 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1046         INIT_LIST_HEAD(&export->exp_locks_list);
1047         spin_lock_init(&export->exp_locks_list_guard);
1048 #endif
1049         atomic_set(&export->exp_replay_count, 0);
1050         export->exp_obd = obd;
1051         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1052         spin_lock_init(&export->exp_uncommitted_replies_lock);
1053         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1054         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1055         INIT_HLIST_NODE(&export->exp_handle.h_link);
1056         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1057         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1058         class_handle_hash(&export->exp_handle, export_handle_owner);
1059         export->exp_last_request_time = ktime_get_real_seconds();
1060         spin_lock_init(&export->exp_lock);
1061         spin_lock_init(&export->exp_rpc_lock);
1062         INIT_HLIST_NODE(&export->exp_gen_hash);
1063         spin_lock_init(&export->exp_bl_list_lock);
1064         INIT_LIST_HEAD(&export->exp_bl_list);
1065         INIT_LIST_HEAD(&export->exp_stale_list);
1066         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1067
1068         export->exp_sp_peer = LUSTRE_SP_ANY;
1069         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1070         export->exp_client_uuid = *cluuid;
1071         obd_init_export(export);
1072
1073         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1074         export->exp_root_fid.f_seq = 0;
1075         export->exp_root_fid.f_oid = 0;
1076         export->exp_root_fid.f_ver = 0;
1077
1078         spin_lock(&obd->obd_dev_lock);
1079         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1080                 /* shouldn't happen, but might race */
1081                 if (obd->obd_stopping)
1082                         GOTO(exit_unlock, rc = -ENODEV);
1083
1084                 rc = obd_uuid_add(obd, export);
1085                 if (rc != 0) {
1086                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1087                                       obd->obd_name, cluuid->uuid, rc);
1088                         GOTO(exit_unlock, rc = -EALREADY);
1089                 }
1090         }
1091
1092         if (!is_self) {
1093                 class_incref(obd, "export", export);
1094                 list_add_tail(&export->exp_obd_chain_timed,
1095                               &obd->obd_exports_timed);
1096                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1097                 obd->obd_num_exports++;
1098         } else {
1099                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1100                 INIT_LIST_HEAD(&export->exp_obd_chain);
1101         }
1102         spin_unlock(&obd->obd_dev_lock);
1103         RETURN(export);
1104
1105 exit_unlock:
1106         spin_unlock(&obd->obd_dev_lock);
1107         class_handle_unhash(&export->exp_handle);
1108         obd_destroy_export(export);
1109         OBD_FREE_PTR(export);
1110         return ERR_PTR(rc);
1111 }
1112
1113 struct obd_export *class_new_export(struct obd_device *obd,
1114                                     struct obd_uuid *uuid)
1115 {
1116         return __class_new_export(obd, uuid, false);
1117 }
1118 EXPORT_SYMBOL(class_new_export);
1119
1120 struct obd_export *class_new_export_self(struct obd_device *obd,
1121                                          struct obd_uuid *uuid)
1122 {
1123         return __class_new_export(obd, uuid, true);
1124 }
1125
1126 void class_unlink_export(struct obd_export *exp)
1127 {
1128         class_handle_unhash(&exp->exp_handle);
1129
1130         if (exp->exp_obd->obd_self_export == exp) {
1131                 class_export_put(exp);
1132                 return;
1133         }
1134
1135         spin_lock(&exp->exp_obd->obd_dev_lock);
1136         /* delete an uuid-export hashitem from hashtables */
1137         if (exp != exp->exp_obd->obd_self_export)
1138                 obd_uuid_del(exp->exp_obd, exp);
1139
1140 #ifdef HAVE_SERVER_SUPPORT
1141         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1142                 struct tg_export_data   *ted = &exp->exp_target_data;
1143                 struct cfs_hash         *hash;
1144
1145                 /* Because obd_gen_hash will not be released until
1146                  * class_cleanup(), so hash should never be NULL here */
1147                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1148                 LASSERT(hash != NULL);
1149                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1150                              &exp->exp_gen_hash);
1151                 cfs_hash_putref(hash);
1152         }
1153 #endif /* HAVE_SERVER_SUPPORT */
1154
1155         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1156         list_del_init(&exp->exp_obd_chain_timed);
1157         exp->exp_obd->obd_num_exports--;
1158         spin_unlock(&exp->exp_obd->obd_dev_lock);
1159
1160         /* A reference is kept by obd_stale_exports list */
1161         obd_stale_export_put(exp);
1162 }
1163 EXPORT_SYMBOL(class_unlink_export);
1164
1165 /* Import management functions */
1166 static void obd_zombie_import_free(struct obd_import *imp)
1167 {
1168         struct obd_import_conn *imp_conn;
1169
1170         ENTRY;
1171         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1172                imp->imp_obd->obd_name);
1173
1174         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1175
1176         ptlrpc_connection_put(imp->imp_connection);
1177
1178         while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1179                                                     struct obd_import_conn,
1180                                                     oic_item)) != NULL) {
1181                 list_del_init(&imp_conn->oic_item);
1182                 ptlrpc_connection_put(imp_conn->oic_conn);
1183                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1184         }
1185
1186         LASSERT(imp->imp_sec == NULL);
1187         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1188                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1189         class_decref(imp->imp_obd, "import", imp);
1190         OBD_FREE_PTR(imp);
1191         EXIT;
1192 }
1193
1194 struct obd_import *class_import_get(struct obd_import *import)
1195 {
1196         refcount_inc(&import->imp_refcount);
1197         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1198                refcount_read(&import->imp_refcount),
1199                import->imp_obd->obd_name);
1200         return import;
1201 }
1202 EXPORT_SYMBOL(class_import_get);
1203
1204 void class_import_put(struct obd_import *imp)
1205 {
1206         ENTRY;
1207
1208         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1209
1210         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1211                refcount_read(&imp->imp_refcount) - 1,
1212                imp->imp_obd->obd_name);
1213
1214         if (refcount_dec_and_test(&imp->imp_refcount)) {
1215                 CDEBUG(D_INFO, "final put import %p\n", imp);
1216                 obd_zombie_import_add(imp);
1217         }
1218
1219         EXIT;
1220 }
1221 EXPORT_SYMBOL(class_import_put);
1222
1223 static void init_imp_at(struct imp_at *at) {
1224         int i;
1225         at_init(&at->iat_net_latency, 0, 0);
1226         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1227                 /* max service estimates are tracked on the server side, so
1228                    don't use the AT history here, just use the last reported
1229                    val. (But keep hist for proc histogram, worst_ever) */
1230                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1231                         AT_FLG_NOHIST);
1232         }
1233 }
1234
1235 static void obd_zombie_imp_cull(struct work_struct *ws)
1236 {
1237         struct obd_import *import;
1238
1239         import = container_of(ws, struct obd_import, imp_zombie_work);
1240         obd_zombie_import_free(import);
1241 }
1242
1243 struct obd_import *class_new_import(struct obd_device *obd)
1244 {
1245         struct obd_import *imp;
1246         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1247
1248         OBD_ALLOC(imp, sizeof(*imp));
1249         if (imp == NULL)
1250                 return NULL;
1251
1252         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1253         INIT_LIST_HEAD(&imp->imp_replay_list);
1254         INIT_LIST_HEAD(&imp->imp_sending_list);
1255         INIT_LIST_HEAD(&imp->imp_delayed_list);
1256         INIT_LIST_HEAD(&imp->imp_committed_list);
1257         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1258         imp->imp_known_replied_xid = 0;
1259         imp->imp_replay_cursor = &imp->imp_committed_list;
1260         spin_lock_init(&imp->imp_lock);
1261         imp->imp_last_success_conn = 0;
1262         imp->imp_state = LUSTRE_IMP_NEW;
1263         imp->imp_obd = class_incref(obd, "import", imp);
1264         rwlock_init(&imp->imp_sec_lock);
1265         init_waitqueue_head(&imp->imp_recovery_waitq);
1266         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1267
1268         if (curr_pid_ns && curr_pid_ns->child_reaper)
1269                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1270         else
1271                 imp->imp_sec_refpid = 1;
1272
1273         refcount_set(&imp->imp_refcount, 2);
1274         atomic_set(&imp->imp_unregistering, 0);
1275         atomic_set(&imp->imp_reqs, 0);
1276         atomic_set(&imp->imp_inflight, 0);
1277         atomic_set(&imp->imp_replay_inflight, 0);
1278         init_waitqueue_head(&imp->imp_replay_waitq);
1279         atomic_set(&imp->imp_inval_count, 0);
1280         atomic_set(&imp->imp_waiting, 0);
1281         INIT_LIST_HEAD(&imp->imp_conn_list);
1282         init_imp_at(&imp->imp_at);
1283
1284         /* the default magic is V2, will be used in connect RPC, and
1285          * then adjusted according to the flags in request/reply. */
1286         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1287
1288         return imp;
1289 }
1290 EXPORT_SYMBOL(class_new_import);
1291
1292 void class_destroy_import(struct obd_import *import)
1293 {
1294         LASSERT(import != NULL);
1295         LASSERT(import != LP_POISON);
1296
1297         spin_lock(&import->imp_lock);
1298         import->imp_generation++;
1299         spin_unlock(&import->imp_lock);
1300         class_import_put(import);
1301 }
1302 EXPORT_SYMBOL(class_destroy_import);
1303
1304 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1305
1306 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1307 {
1308         spin_lock(&exp->exp_locks_list_guard);
1309
1310         LASSERT(lock->l_exp_refs_nr >= 0);
1311
1312         if (lock->l_exp_refs_target != NULL &&
1313             lock->l_exp_refs_target != exp) {
1314                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1315                               exp, lock, lock->l_exp_refs_target);
1316         }
1317         if ((lock->l_exp_refs_nr ++) == 0) {
1318                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1319                 lock->l_exp_refs_target = exp;
1320         }
1321         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1322                lock, exp, lock->l_exp_refs_nr);
1323         spin_unlock(&exp->exp_locks_list_guard);
1324 }
1325 EXPORT_SYMBOL(__class_export_add_lock_ref);
1326
1327 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1328 {
1329         spin_lock(&exp->exp_locks_list_guard);
1330         LASSERT(lock->l_exp_refs_nr > 0);
1331         if (lock->l_exp_refs_target != exp) {
1332                 LCONSOLE_WARN("lock %p, "
1333                               "mismatching export pointers: %p, %p\n",
1334                               lock, lock->l_exp_refs_target, exp);
1335         }
1336         if (-- lock->l_exp_refs_nr == 0) {
1337                 list_del_init(&lock->l_exp_refs_link);
1338                 lock->l_exp_refs_target = NULL;
1339         }
1340         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1341                lock, exp, lock->l_exp_refs_nr);
1342         spin_unlock(&exp->exp_locks_list_guard);
1343 }
1344 EXPORT_SYMBOL(__class_export_del_lock_ref);
1345 #endif
1346
1347 /* A connection defines an export context in which preallocation can
1348    be managed. This releases the export pointer reference, and returns
1349    the export handle, so the export refcount is 1 when this function
1350    returns. */
1351 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1352                   struct obd_uuid *cluuid)
1353 {
1354         struct obd_export *export;
1355         LASSERT(conn != NULL);
1356         LASSERT(obd != NULL);
1357         LASSERT(cluuid != NULL);
1358         ENTRY;
1359
1360         export = class_new_export(obd, cluuid);
1361         if (IS_ERR(export))
1362                 RETURN(PTR_ERR(export));
1363
1364         conn->cookie = export->exp_handle.h_cookie;
1365         class_export_put(export);
1366
1367         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1368                cluuid->uuid, conn->cookie);
1369         RETURN(0);
1370 }
1371 EXPORT_SYMBOL(class_connect);
1372
1373 /* if export is involved in recovery then clean up related things */
1374 static void class_export_recovery_cleanup(struct obd_export *exp)
1375 {
1376         struct obd_device *obd = exp->exp_obd;
1377
1378         spin_lock(&obd->obd_recovery_task_lock);
1379         if (obd->obd_recovering) {
1380                 if (exp->exp_in_recovery) {
1381                         spin_lock(&exp->exp_lock);
1382                         exp->exp_in_recovery = 0;
1383                         spin_unlock(&exp->exp_lock);
1384                         LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
1385                         atomic_dec(&obd->obd_connected_clients);
1386                 }
1387
1388                 /* if called during recovery then should update
1389                  * obd_stale_clients counter,
1390                  * lightweight exports are not counted */
1391                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1392                         exp->exp_obd->obd_stale_clients++;
1393         }
1394         spin_unlock(&obd->obd_recovery_task_lock);
1395
1396         spin_lock(&exp->exp_lock);
1397         /** Cleanup req replay fields */
1398         if (exp->exp_req_replay_needed) {
1399                 exp->exp_req_replay_needed = 0;
1400
1401                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1402                 atomic_dec(&obd->obd_req_replay_clients);
1403         }
1404
1405         /** Cleanup lock replay data */
1406         if (exp->exp_lock_replay_needed) {
1407                 exp->exp_lock_replay_needed = 0;
1408
1409                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1410                 atomic_dec(&obd->obd_lock_replay_clients);
1411         }
1412         spin_unlock(&exp->exp_lock);
1413 }
1414
1415 /* This function removes 1-3 references from the export:
1416  * 1 - for export pointer passed
1417  * and if disconnect really need
1418  * 2 - removing from hash
1419  * 3 - in client_unlink_export
1420  * The export pointer passed to this function can destroyed */
1421 int class_disconnect(struct obd_export *export)
1422 {
1423         int already_disconnected;
1424         ENTRY;
1425
1426         if (export == NULL) {
1427                 CWARN("attempting to free NULL export %p\n", export);
1428                 RETURN(-EINVAL);
1429         }
1430
1431         spin_lock(&export->exp_lock);
1432         already_disconnected = export->exp_disconnected;
1433         export->exp_disconnected = 1;
1434 #ifdef HAVE_SERVER_SUPPORT
1435         /*  We hold references of export for uuid hash
1436          *  and nid_hash and export link at least. So
1437          *  it is safe to call rh*table_remove_fast in
1438          *  there.
1439          */
1440         obd_nid_del(export->exp_obd, export);
1441 #endif /* HAVE_SERVER_SUPPORT */
1442         spin_unlock(&export->exp_lock);
1443
1444         /* class_cleanup(), abort_recovery(), and class_fail_export()
1445          * all end up in here, and if any of them race we shouldn't
1446          * call extra class_export_puts(). */
1447         if (already_disconnected)
1448                 GOTO(no_disconn, already_disconnected);
1449
1450         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1451                export->exp_handle.h_cookie);
1452
1453         class_export_recovery_cleanup(export);
1454         class_unlink_export(export);
1455 no_disconn:
1456         class_export_put(export);
1457         RETURN(0);
1458 }
1459 EXPORT_SYMBOL(class_disconnect);
1460
1461 /* Return non-zero for a fully connected export */
1462 int class_connected_export(struct obd_export *exp)
1463 {
1464         int connected = 0;
1465
1466         if (exp) {
1467                 spin_lock(&exp->exp_lock);
1468                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1469                 spin_unlock(&exp->exp_lock);
1470         }
1471         return connected;
1472 }
1473 EXPORT_SYMBOL(class_connected_export);
1474
1475 static void class_disconnect_export_list(struct list_head *list,
1476                                          enum obd_option flags)
1477 {
1478         int rc;
1479         struct obd_export *exp;
1480         ENTRY;
1481
1482         /* It's possible that an export may disconnect itself, but
1483          * nothing else will be added to this list.
1484          */
1485         while ((exp = list_first_entry_or_null(list, struct obd_export,
1486                                                exp_obd_chain)) != NULL) {
1487                 /* need for safe call CDEBUG after obd_disconnect */
1488                 class_export_get(exp);
1489
1490                 spin_lock(&exp->exp_lock);
1491                 exp->exp_flags = flags;
1492                 spin_unlock(&exp->exp_lock);
1493
1494                 if (obd_uuid_equals(&exp->exp_client_uuid,
1495                                     &exp->exp_obd->obd_uuid)) {
1496                         CDEBUG(D_HA,
1497                                "exp %p export uuid == obd uuid, don't discon\n",
1498                                exp);
1499                         /* Need to delete this now so we don't end up pointing
1500                          * to work_list later when this export is cleaned up. */
1501                         list_del_init(&exp->exp_obd_chain);
1502                         class_export_put(exp);
1503                         continue;
1504                 }
1505
1506                 class_export_get(exp);
1507                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1508                        "last request at %lld\n",
1509                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1510                        exp, exp->exp_last_request_time);
1511                 /* release one export reference anyway */
1512                 rc = obd_disconnect(exp);
1513
1514                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1515                        obd_export_nid2str(exp), exp, rc);
1516                 class_export_put(exp);
1517         }
1518         EXIT;
1519 }
1520
1521 void class_disconnect_exports(struct obd_device *obd)
1522 {
1523         LIST_HEAD(work_list);
1524         ENTRY;
1525
1526         /* Move all of the exports from obd_exports to a work list, en masse. */
1527         spin_lock(&obd->obd_dev_lock);
1528         list_splice_init(&obd->obd_exports, &work_list);
1529         list_splice_init(&obd->obd_delayed_exports, &work_list);
1530         spin_unlock(&obd->obd_dev_lock);
1531
1532         if (!list_empty(&work_list)) {
1533                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1534                        "disconnecting them\n", obd->obd_minor, obd);
1535                 class_disconnect_export_list(&work_list,
1536                                              exp_flags_from_obd(obd));
1537         } else
1538                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1539                        obd->obd_minor, obd);
1540         EXIT;
1541 }
1542 EXPORT_SYMBOL(class_disconnect_exports);
1543
1544 /* Remove exports that have not completed recovery.
1545  */
1546 void class_disconnect_stale_exports(struct obd_device *obd,
1547                                     int (*test_export)(struct obd_export *))
1548 {
1549         LIST_HEAD(work_list);
1550         struct obd_export *exp, *n;
1551         int evicted = 0;
1552         ENTRY;
1553
1554         spin_lock(&obd->obd_dev_lock);
1555         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1556                                  exp_obd_chain) {
1557                 /* don't count self-export as client */
1558                 if (obd_uuid_equals(&exp->exp_client_uuid,
1559                                     &exp->exp_obd->obd_uuid))
1560                         continue;
1561
1562                 /* don't evict clients which have no slot in last_rcvd
1563                  * (e.g. lightweight connection) */
1564                 if (exp->exp_target_data.ted_lr_idx == -1)
1565                         continue;
1566
1567                 spin_lock(&exp->exp_lock);
1568                 if (exp->exp_failed || test_export(exp)) {
1569                         spin_unlock(&exp->exp_lock);
1570                         continue;
1571                 }
1572                 exp->exp_failed = 1;
1573                 atomic_inc(&exp->exp_obd->obd_eviction_count);
1574                 spin_unlock(&exp->exp_lock);
1575
1576                 list_move(&exp->exp_obd_chain, &work_list);
1577                 evicted++;
1578                 CWARN("%s: disconnect stale client %s@%s\n",
1579                       obd->obd_name, exp->exp_client_uuid.uuid,
1580                       obd_export_nid2str(exp));
1581                 print_export_data(exp, "EVICTING", 0, D_HA);
1582         }
1583         spin_unlock(&obd->obd_dev_lock);
1584
1585         if (evicted)
1586                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1587                               obd->obd_name, evicted);
1588
1589         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1590                                                  OBD_OPT_ABORT_RECOV);
1591         EXIT;
1592 }
1593 EXPORT_SYMBOL(class_disconnect_stale_exports);
1594
1595 void class_fail_export(struct obd_export *exp)
1596 {
1597         int rc, already_failed;
1598
1599         spin_lock(&exp->exp_lock);
1600         already_failed = exp->exp_failed;
1601         exp->exp_failed = 1;
1602         spin_unlock(&exp->exp_lock);
1603
1604         if (already_failed) {
1605                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1606                        exp, exp->exp_client_uuid.uuid);
1607                 return;
1608         }
1609
1610         atomic_inc(&exp->exp_obd->obd_eviction_count);
1611
1612         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1613                exp, exp->exp_client_uuid.uuid);
1614
1615         if (obd_dump_on_timeout)
1616                 libcfs_debug_dumplog();
1617
1618         /* need for safe call CDEBUG after obd_disconnect */
1619         class_export_get(exp);
1620
1621         /* Most callers into obd_disconnect are removing their own reference
1622          * (request, for example) in addition to the one from the hash table.
1623          * We don't have such a reference here, so make one. */
1624         class_export_get(exp);
1625         rc = obd_disconnect(exp);
1626         if (rc)
1627                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1628         else
1629                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1630                        exp, exp->exp_client_uuid.uuid);
1631         class_export_put(exp);
1632 }
1633 EXPORT_SYMBOL(class_fail_export);
1634
1635 #ifdef HAVE_SERVER_SUPPORT
1636
1637 static int take_first(struct obd_export *exp, void *data)
1638 {
1639         struct obd_export **expp = data;
1640
1641         if (*expp)
1642                 /* already have one */
1643                 return 0;
1644         if (exp->exp_failed)
1645                 /* Don't want this one */
1646                 return 0;
1647         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1648                 /* Cannot get a ref on this one */
1649                 return 0;
1650         *expp = exp;
1651         return 1;
1652 }
1653
1654 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1655 {
1656         struct lnet_nid nid_key;
1657         struct obd_export *doomed_exp;
1658         int exports_evicted = 0;
1659
1660         libcfs_strnid(&nid_key, nid);
1661
1662         spin_lock(&obd->obd_dev_lock);
1663         /* umount has run already, so evict thread should leave
1664          * its task to umount thread now */
1665         if (obd->obd_stopping) {
1666                 spin_unlock(&obd->obd_dev_lock);
1667                 return exports_evicted;
1668         }
1669         spin_unlock(&obd->obd_dev_lock);
1670
1671         doomed_exp = NULL;
1672         while (obd_nid_export_for_each(obd, &nid_key,
1673                                        take_first, &doomed_exp) > 0) {
1674
1675                 LASSERTF(doomed_exp != obd->obd_self_export,
1676                          "self-export is hashed by NID?\n");
1677
1678                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1679                               obd->obd_name,
1680                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1681                               obd_export_nid2str(doomed_exp));
1682
1683                 class_fail_export(doomed_exp);
1684                 class_export_put(doomed_exp);
1685                 exports_evicted++;
1686                 doomed_exp = NULL;
1687         }
1688
1689         if (!exports_evicted)
1690                 CDEBUG(D_HA,
1691                        "%s: can't disconnect NID '%s': no exports found\n",
1692                        obd->obd_name, nid);
1693         return exports_evicted;
1694 }
1695 EXPORT_SYMBOL(obd_export_evict_by_nid);
1696
1697 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1698 {
1699         struct obd_export *doomed_exp = NULL;
1700         struct obd_uuid doomed_uuid;
1701         int exports_evicted = 0;
1702
1703         spin_lock(&obd->obd_dev_lock);
1704         if (obd->obd_stopping) {
1705                 spin_unlock(&obd->obd_dev_lock);
1706                 return exports_evicted;
1707         }
1708         spin_unlock(&obd->obd_dev_lock);
1709
1710         obd_str2uuid(&doomed_uuid, uuid);
1711         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1712                 CERROR("%s: can't evict myself\n", obd->obd_name);
1713                 return exports_evicted;
1714         }
1715
1716         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1717         if (doomed_exp == NULL) {
1718                 CERROR("%s: can't disconnect %s: no exports found\n",
1719                        obd->obd_name, uuid);
1720         } else {
1721                 CWARN("%s: evicting %s at adminstrative request\n",
1722                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1723                 class_fail_export(doomed_exp);
1724                 class_export_put(doomed_exp);
1725                 obd_uuid_del(obd, doomed_exp);
1726                 exports_evicted++;
1727         }
1728
1729         return exports_evicted;
1730 }
1731 #endif /* HAVE_SERVER_SUPPORT */
1732
1733 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1734 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1735 EXPORT_SYMBOL(class_export_dump_hook);
1736 #endif
1737
1738 static void print_export_data(struct obd_export *exp, const char *status,
1739                               int locks, int debug_level)
1740 {
1741         struct ptlrpc_reply_state *rs;
1742         struct ptlrpc_reply_state *first_reply = NULL;
1743         int nreplies = 0;
1744
1745         spin_lock(&exp->exp_lock);
1746         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1747                             rs_exp_list) {
1748                 if (nreplies == 0)
1749                         first_reply = rs;
1750                 nreplies++;
1751         }
1752         spin_unlock(&exp->exp_lock);
1753
1754         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1755                "%p %s %llu stale:%d\n",
1756                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1757                obd_export_nid2str(exp),
1758                refcount_read(&exp->exp_handle.h_ref),
1759                atomic_read(&exp->exp_rpc_count),
1760                atomic_read(&exp->exp_cb_count),
1761                atomic_read(&exp->exp_locks_count),
1762                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1763                nreplies, first_reply, nreplies > 3 ? "..." : "",
1764                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1765 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1766         if (locks && class_export_dump_hook != NULL)
1767                 class_export_dump_hook(exp);
1768 #endif
1769 }
1770
1771 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1772 {
1773         struct obd_export *exp;
1774
1775         spin_lock(&obd->obd_dev_lock);
1776         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1777                 print_export_data(exp, "ACTIVE", locks, debug_level);
1778         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1779                 print_export_data(exp, "UNLINKED", locks, debug_level);
1780         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1781                 print_export_data(exp, "DELAYED", locks, debug_level);
1782         spin_unlock(&obd->obd_dev_lock);
1783 }
1784
1785 void obd_exports_barrier(struct obd_device *obd)
1786 {
1787         int waited = 2;
1788         LASSERT(list_empty(&obd->obd_exports));
1789         spin_lock(&obd->obd_dev_lock);
1790         while (!list_empty(&obd->obd_unlinked_exports)) {
1791                 spin_unlock(&obd->obd_dev_lock);
1792                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1793                 if (waited > 5 && is_power_of_2(waited)) {
1794                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1795                                       "more than %d seconds. "
1796                                       "The obd refcount = %d. Is it stuck?\n",
1797                                       obd->obd_name, waited,
1798                                       atomic_read(&obd->obd_refcount));
1799                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1800                 }
1801                 waited *= 2;
1802                 spin_lock(&obd->obd_dev_lock);
1803         }
1804         spin_unlock(&obd->obd_dev_lock);
1805 }
1806 EXPORT_SYMBOL(obd_exports_barrier);
1807
1808 /**
1809  * Add export to the obd_zombe thread and notify it.
1810  */
1811 static void obd_zombie_export_add(struct obd_export *exp) {
1812         atomic_inc(&obd_stale_export_num);
1813         spin_lock(&exp->exp_obd->obd_dev_lock);
1814         LASSERT(!list_empty(&exp->exp_obd_chain));
1815         list_del_init(&exp->exp_obd_chain);
1816         spin_unlock(&exp->exp_obd->obd_dev_lock);
1817         queue_work(zombie_wq, &exp->exp_zombie_work);
1818 }
1819
1820 /**
1821  * Add import to the obd_zombe thread and notify it.
1822  */
1823 static void obd_zombie_import_add(struct obd_import *imp) {
1824         LASSERT(imp->imp_sec == NULL);
1825
1826         queue_work(zombie_wq, &imp->imp_zombie_work);
1827 }
1828
1829 /**
1830  * wait when obd_zombie import/export queues become empty
1831  */
1832 void obd_zombie_barrier(void)
1833 {
1834         wait_var_event(&obd_stale_export_num,
1835                         atomic_read(&obd_stale_export_num) == 0);
1836         flush_workqueue(zombie_wq);
1837 }
1838 EXPORT_SYMBOL(obd_zombie_barrier);
1839
1840
1841 struct obd_export *obd_stale_export_get(void)
1842 {
1843         struct obd_export *exp = NULL;
1844         ENTRY;
1845
1846         spin_lock(&obd_stale_export_lock);
1847         if (!list_empty(&obd_stale_exports)) {
1848                 exp = list_first_entry(&obd_stale_exports,
1849                                        struct obd_export, exp_stale_list);
1850                 list_del_init(&exp->exp_stale_list);
1851         }
1852         spin_unlock(&obd_stale_export_lock);
1853
1854         if (exp) {
1855                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1856                        atomic_read(&obd_stale_export_num));
1857         }
1858         RETURN(exp);
1859 }
1860 EXPORT_SYMBOL(obd_stale_export_get);
1861
1862 void obd_stale_export_put(struct obd_export *exp)
1863 {
1864         ENTRY;
1865
1866         LASSERT(list_empty(&exp->exp_stale_list));
1867         if (exp->exp_lock_hash &&
1868             atomic_read(&exp->exp_lock_hash->hs_count)) {
1869                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1870                        atomic_read(&obd_stale_export_num));
1871
1872                 spin_lock_bh(&exp->exp_bl_list_lock);
1873                 spin_lock(&obd_stale_export_lock);
1874                 /* Add to the tail if there is no blocked locks,
1875                  * to the head otherwise. */
1876                 if (list_empty(&exp->exp_bl_list))
1877                         list_add_tail(&exp->exp_stale_list,
1878                                       &obd_stale_exports);
1879                 else
1880                         list_add(&exp->exp_stale_list,
1881                                  &obd_stale_exports);
1882
1883                 spin_unlock(&obd_stale_export_lock);
1884                 spin_unlock_bh(&exp->exp_bl_list_lock);
1885         } else {
1886                 class_export_put(exp);
1887         }
1888         EXIT;
1889 }
1890 EXPORT_SYMBOL(obd_stale_export_put);
1891
1892 /**
1893  * Adjust the position of the export in the stale list,
1894  * i.e. move to the head of the list if is needed.
1895  **/
1896 void obd_stale_export_adjust(struct obd_export *exp)
1897 {
1898         LASSERT(exp != NULL);
1899         spin_lock_bh(&exp->exp_bl_list_lock);
1900         spin_lock(&obd_stale_export_lock);
1901
1902         if (!list_empty(&exp->exp_stale_list) &&
1903             !list_empty(&exp->exp_bl_list))
1904                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1905
1906         spin_unlock(&obd_stale_export_lock);
1907         spin_unlock_bh(&exp->exp_bl_list_lock);
1908 }
1909 EXPORT_SYMBOL(obd_stale_export_adjust);
1910
1911 /**
1912  * start destroy zombie import/export thread
1913  */
1914 int obd_zombie_impexp_init(void)
1915 {
1916         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1917                                            0, CFS_CPT_ANY,
1918                                            cfs_cpt_number(cfs_cpt_tab));
1919
1920         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1921 }
1922
1923 /**
1924  * stop destroy zombie import/export thread
1925  */
1926 void obd_zombie_impexp_stop(void)
1927 {
1928         destroy_workqueue(zombie_wq);
1929         LASSERT(list_empty(&obd_stale_exports));
1930 }
1931
1932 /***** Kernel-userspace comm helpers *******/
1933
1934 /* Get length of entire message, including header */
1935 int kuc_len(int payload_len)
1936 {
1937         return sizeof(struct kuc_hdr) + payload_len;
1938 }
1939 EXPORT_SYMBOL(kuc_len);
1940
1941 /* Get a pointer to kuc header, given a ptr to the payload
1942  * @param p Pointer to payload area
1943  * @returns Pointer to kuc header
1944  */
1945 struct kuc_hdr * kuc_ptr(void *p)
1946 {
1947         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1948         LASSERT(lh->kuc_magic == KUC_MAGIC);
1949         return lh;
1950 }
1951 EXPORT_SYMBOL(kuc_ptr);
1952
1953 /* Alloc space for a message, and fill in header
1954  * @return Pointer to payload area
1955  */
1956 void *kuc_alloc(int payload_len, int transport, int type)
1957 {
1958         struct kuc_hdr *lh;
1959         int len = kuc_len(payload_len);
1960
1961         OBD_ALLOC(lh, len);
1962         if (lh == NULL)
1963                 return ERR_PTR(-ENOMEM);
1964
1965         lh->kuc_magic = KUC_MAGIC;
1966         lh->kuc_transport = transport;
1967         lh->kuc_msgtype = type;
1968         lh->kuc_msglen = len;
1969
1970         return (void *)(lh + 1);
1971 }
1972 EXPORT_SYMBOL(kuc_alloc);
1973
1974 /* Takes pointer to payload area */
1975 void kuc_free(void *p, int payload_len)
1976 {
1977         struct kuc_hdr *lh = kuc_ptr(p);
1978         OBD_FREE(lh, kuc_len(payload_len));
1979 }
1980 EXPORT_SYMBOL(kuc_free);
1981
1982 struct obd_request_slot_waiter {
1983         struct list_head        orsw_entry;
1984         wait_queue_head_t       orsw_waitq;
1985         bool                    orsw_signaled;
1986 };
1987
1988 static bool obd_request_slot_avail(struct client_obd *cli,
1989                                    struct obd_request_slot_waiter *orsw)
1990 {
1991         bool avail;
1992
1993         spin_lock(&cli->cl_loi_list_lock);
1994         avail = !!list_empty(&orsw->orsw_entry);
1995         spin_unlock(&cli->cl_loi_list_lock);
1996
1997         return avail;
1998 };
1999
2000 /*
2001  * For network flow control, the RPC sponsor needs to acquire a credit
2002  * before sending the RPC. The credits count for a connection is defined
2003  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2004  * the subsequent RPC sponsors need to wait until others released their
2005  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2006  */
2007 int obd_get_request_slot(struct client_obd *cli)
2008 {
2009         struct obd_request_slot_waiter   orsw;
2010         int                              rc;
2011
2012         spin_lock(&cli->cl_loi_list_lock);
2013         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2014                 cli->cl_rpcs_in_flight++;
2015                 spin_unlock(&cli->cl_loi_list_lock);
2016                 return 0;
2017         }
2018
2019         init_waitqueue_head(&orsw.orsw_waitq);
2020         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2021         orsw.orsw_signaled = false;
2022         spin_unlock(&cli->cl_loi_list_lock);
2023
2024         rc = l_wait_event_abortable(orsw.orsw_waitq,
2025                                     obd_request_slot_avail(cli, &orsw) ||
2026                                     orsw.orsw_signaled);
2027
2028         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2029          * freed but other (such as obd_put_request_slot) is using it. */
2030         spin_lock(&cli->cl_loi_list_lock);
2031         if (rc != 0) {
2032                 if (!orsw.orsw_signaled) {
2033                         if (list_empty(&orsw.orsw_entry))
2034                                 cli->cl_rpcs_in_flight--;
2035                         else
2036                                 list_del(&orsw.orsw_entry);
2037                 }
2038                 rc = -EINTR;
2039         }
2040
2041         if (orsw.orsw_signaled) {
2042                 LASSERT(list_empty(&orsw.orsw_entry));
2043
2044                 rc = -EINTR;
2045         }
2046         spin_unlock(&cli->cl_loi_list_lock);
2047
2048         return rc;
2049 }
2050 EXPORT_SYMBOL(obd_get_request_slot);
2051
2052 void obd_put_request_slot(struct client_obd *cli)
2053 {
2054         struct obd_request_slot_waiter *orsw;
2055
2056         spin_lock(&cli->cl_loi_list_lock);
2057         cli->cl_rpcs_in_flight--;
2058
2059         /* If there is free slot, wakeup the first waiter. */
2060         if (!list_empty(&cli->cl_flight_waiters) &&
2061             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2062                 orsw = list_first_entry(&cli->cl_flight_waiters,
2063                                         struct obd_request_slot_waiter,
2064                                         orsw_entry);
2065                 list_del_init(&orsw->orsw_entry);
2066                 cli->cl_rpcs_in_flight++;
2067                 wake_up(&orsw->orsw_waitq);
2068         }
2069         spin_unlock(&cli->cl_loi_list_lock);
2070 }
2071 EXPORT_SYMBOL(obd_put_request_slot);
2072
2073 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2074 {
2075         return cli->cl_max_rpcs_in_flight;
2076 }
2077 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2078
2079 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2080 {
2081         struct obd_request_slot_waiter *orsw;
2082         __u32                           old;
2083         int                             diff;
2084         int                             i;
2085         int                             rc;
2086
2087         if (max > OBD_MAX_RIF_MAX || max < 1)
2088                 return -ERANGE;
2089
2090         CDEBUG(D_INFO, "%s: max = %u max_mod = %u rif = %u\n",
2091                cli->cl_import->imp_obd->obd_name, max,
2092                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2093
2094         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2095                    LUSTRE_MDC_NAME) == 0) {
2096                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2097                  * strictly lower that max_rpcs_in_flight */
2098                 if (max < 2) {
2099                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2100                                cli->cl_import->imp_obd->obd_name);
2101                         return -ERANGE;
2102                 }
2103                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2104                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2105                         if (rc != 0)
2106                                 return rc;
2107                 }
2108         }
2109
2110         spin_lock(&cli->cl_loi_list_lock);
2111         old = cli->cl_max_rpcs_in_flight;
2112         cli->cl_max_rpcs_in_flight = max;
2113         client_adjust_max_dirty(cli);
2114
2115         diff = max - old;
2116
2117         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2118         for (i = 0; i < diff; i++) {
2119                 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2120                                                 struct obd_request_slot_waiter,
2121                                                 orsw_entry);
2122                 if (!orsw)
2123                         break;
2124
2125                 list_del_init(&orsw->orsw_entry);
2126                 cli->cl_rpcs_in_flight++;
2127                 wake_up(&orsw->orsw_waitq);
2128         }
2129         spin_unlock(&cli->cl_loi_list_lock);
2130
2131         return 0;
2132 }
2133 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2134
2135 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2136 {
2137         return cli->cl_max_mod_rpcs_in_flight;
2138 }
2139 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2140
2141 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2142 {
2143         struct obd_connect_data *ocd;
2144         __u16 maxmodrpcs;
2145         __u16 prev;
2146
2147         if (max > OBD_MAX_RIF_MAX || max < 1)
2148                 return -ERANGE;
2149
2150         ocd = &cli->cl_import->imp_connect_data;
2151         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2152                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2153                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2154
2155         if (max == OBD_MAX_RIF_MAX)
2156                 max = OBD_MAX_RIF_MAX - 1;
2157
2158         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2159          * increase this value, also bump up max_rpcs_in_flight to match.
2160          */
2161         if (max >= cli->cl_max_rpcs_in_flight) {
2162                 CDEBUG(D_INFO,
2163                        "%s: increasing max_rpcs_in_flight=%u to allow larger max_mod_rpcs_in_flight=%u\n",
2164                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2165                 obd_set_max_rpcs_in_flight(cli, max + 1);
2166         }
2167
2168         /* cannot exceed max modify RPCs in flight supported by the server,
2169          * but verify ocd_connect_flags is at least initialized first.  If
2170          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2171          */
2172         if (!ocd->ocd_connect_flags) {
2173                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2174         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2175                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2176                 if (maxmodrpcs == 0) { /* connection not finished yet */
2177                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2178                         CDEBUG(D_INFO,
2179                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2180                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2181                 }
2182         } else {
2183                 maxmodrpcs = 1;
2184         }
2185         if (max > maxmodrpcs) {
2186                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2187                        cli->cl_import->imp_obd->obd_name,
2188                        max, maxmodrpcs);
2189                 return -ERANGE;
2190         }
2191
2192         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2193
2194         prev = cli->cl_max_mod_rpcs_in_flight;
2195         cli->cl_max_mod_rpcs_in_flight = max;
2196
2197         /* wakeup waiters if limit has been increased */
2198         if (cli->cl_max_mod_rpcs_in_flight > prev)
2199                 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2200
2201         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2202
2203         return 0;
2204 }
2205 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2206
2207 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2208                                struct seq_file *seq)
2209 {
2210         unsigned long mod_tot = 0, mod_cum;
2211         int i;
2212
2213         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2214         lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2215                              ":", true, "");
2216         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2217                    cli->cl_mod_rpcs_in_flight);
2218
2219         seq_printf(seq, "\n\t\t\tmodify\n");
2220         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2221
2222         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2223
2224         mod_cum = 0;
2225         for (i = 0; i < OBD_HIST_MAX; i++) {
2226                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2227
2228                 mod_cum += mod;
2229                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2230                            i, mod, pct(mod, mod_tot),
2231                            pct(mod_cum, mod_tot));
2232                 if (mod_cum == mod_tot)
2233                         break;
2234         }
2235
2236         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2237
2238         return 0;
2239 }
2240 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2241
2242 /* The number of modify RPCs sent in parallel is limited
2243  * because the server has a finite number of slots per client to
2244  * store request result and ensure reply reconstruction when needed.
2245  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2246  * that takes into account server limit and cl_max_rpcs_in_flight
2247  * value.
2248  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2249  * one close request is allowed above the maximum.
2250  */
2251 struct mod_waiter {
2252         struct client_obd *cli;
2253         bool close_req;
2254         bool woken;
2255         wait_queue_entry_t wqe;
2256 };
2257 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2258                                   unsigned int mode, int flags, void *key)
2259 {
2260         struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2261         struct client_obd *cli = w->cli;
2262         bool close_req = w->close_req;
2263         bool avail;
2264         int ret;
2265
2266         /* As woken_wake_function() doesn't remove us from the wait_queue,
2267          * we use own flag to ensure we're called just once.
2268          */
2269         if (w->woken)
2270                 return 0;
2271
2272         /* A slot is available if
2273          * - number of modify RPCs in flight is less than the max
2274          * - it's a close RPC and no other close request is in flight
2275          */
2276         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2277                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2278         if (avail) {
2279                 cli->cl_mod_rpcs_in_flight++;
2280                 if (w->close_req)
2281                         cli->cl_close_rpcs_in_flight++;
2282                 ret = woken_wake_function(wq_entry, mode, flags, key);
2283                 w->woken = true;
2284         } else if (cli->cl_close_rpcs_in_flight)
2285                 /* No other waiter could be woken */
2286                 ret = -1;
2287         else if (key == NULL)
2288                 /* This was not a wakeup from a close completion, so there is no
2289                  * point seeing if there are close waiters to be woken
2290                  */
2291                 ret = -1;
2292         else
2293                 /* There might be be a close we could wake, keep looking */
2294                 ret = 0;
2295         return ret;
2296 }
2297
2298 /* Get a modify RPC slot from the obd client @cli according
2299  * to the kind of operation @opc that is going to be sent
2300  * and the intent @it of the operation if it applies.
2301  * If the maximum number of modify RPCs in flight is reached
2302  * the thread is put to sleep.
2303  * Returns the tag to be set in the request message. Tag 0
2304  * is reserved for non-modifying requests.
2305  */
2306 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2307 {
2308         struct mod_waiter wait = {
2309                 .cli = cli,
2310                 .close_req = (opc == MDS_CLOSE),
2311                 .woken = false,
2312         };
2313         __u16                   i, max;
2314
2315         init_wait(&wait.wqe);
2316         wait.wqe.func = claim_mod_rpc_function;
2317
2318         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2319         __add_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2320         /* This wakeup will only succeed if the maximums haven't
2321          * been reached.  If that happens, WQ_FLAG_WOKEN will be cleared
2322          * and there will be no need to wait.
2323          */
2324         wake_up_locked(&cli->cl_mod_rpcs_waitq);
2325         /* XXX: handle spurious wakeups (from unknown yet source */
2326         while (wait.woken == false) {
2327                 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2328                 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2329                            MAX_SCHEDULE_TIMEOUT);
2330                 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2331         }
2332         __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2333
2334         max = cli->cl_max_mod_rpcs_in_flight;
2335         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2336                          cli->cl_mod_rpcs_in_flight);
2337         /* find a free tag */
2338         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2339                                 max + 1);
2340         LASSERT(i < OBD_MAX_RIF_MAX);
2341         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2342         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2343         /* tag 0 is reserved for non-modify RPCs */
2344
2345         CDEBUG(D_RPCTRACE,
2346                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2347                cli->cl_import->imp_obd->obd_name,
2348                i + 1, opc, max);
2349
2350         return i + 1;
2351 }
2352 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2353
2354 /* Put a modify RPC slot from the obd client @cli according
2355  * to the kind of operation @opc that has been sent.
2356  */
2357 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2358 {
2359         bool                    close_req = false;
2360
2361         if (tag == 0)
2362                 return;
2363
2364         if (opc == MDS_CLOSE)
2365                 close_req = true;
2366
2367         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2368         cli->cl_mod_rpcs_in_flight--;
2369         if (close_req)
2370                 cli->cl_close_rpcs_in_flight--;
2371         /* release the tag in the bitmap */
2372         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2373         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2374         __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2375                              (void *)close_req);
2376         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2377 }
2378 EXPORT_SYMBOL(obd_put_mod_rpc_slot);